Compare commits

...

10 Commits

Author SHA1 Message Date
Dhruv Manilawala
f3c483a545 Fix is_airflow_* function, add docs 2024-12-30 21:15:23 +05:30
Wei Lee
c59cdd25de refactor(AIR303): refactor utility functions with is_airflow_builtin_or_provider 2024-12-30 23:10:08 +09:00
Wei Lee
8026d7712c feat(AIR302): airflow.hooks.base_hook.BaseHook → airflow.hooks.base.BaseHook 2024-12-30 23:10:08 +09:00
Wei Lee
7ca2d283c1 feat(AIR302): argument appbuilder is now removed in BaseAuthManager and its subclasses 2024-12-30 23:10:08 +09:00
Wei Lee
316126cf38 feat(AIR302): extension "executors", "operators", "sensors", "hooks" have been removed in AirflowPlugin 2024-12-30 23:10:08 +09:00
Wei Lee
39d545b738 feat(AIR302): argument "filename_template" is removed in task handlers 2024-12-30 23:10:08 +09:00
Wei Lee
aa049d5071 feat(AIR302): argument sla is removed and argument task_concurrency is renamed as max_active_tis_per_dag in all airflow operators 2024-12-30 23:10:08 +09:00
Wei Lee
1743f029f1 refactor(AIR302): refactor regex usage 2024-12-30 23:10:08 +09:00
Wei Lee
1483804487 feat(AIR302): add function removed_class_attribute and the following rules
* `airflow.providers_manager.ProvidersManager.dataset_factories` → `airflow.providers_manager.ProvidersManager.asset_factories`
* `airflow.providers_manager.ProvidersManager.dataset_uri_handlers` → `airflow.providers_manager.ProvidersManager.asset_uri_handlers`
* `airflow.providers_manager.ProvidersManager.dataset_to_openlineage_converters` → `airflow.providers_manager.ProvidersManager.asset_to_openlineage_converters`
* `airflow.lineage.hook.DatasetLineageInfo.dataset`  → `airflow.lineage.hook.AssetLineageInfo.asset`
2024-12-30 23:10:08 +09:00
Wei Lee
e292b7b277 feat(AIR302): extend the following rules
Any class in Airflow that inherits these class should not have these methods

* `airflow.secrets.base_secrets.BaseSecretsBackend.get_conn_uri` → `airflow.secrets.base_secrets.BaseSecretsBackend.get_conn_value`
* `airflow.secrets.base_secrets.BaseSecretsBackend.get_connections` → `airflow.secrets.base_secrets.BaseSecretsBackend.get_connection`
* `airflow.hooks.base.BaseHook.get_connections` → use `get_connection`
* `airflow.datasets.BaseDataset.iter_datasets` → `airflow.sdk.definitions.asset.BaseAsset.iter_assets`
* `airflow.datasets.BaseDataset.iter_dataset_aliases` → `airflow.sdk.definitions.asset.BaseAsset.iter_asset_aliases`
2024-12-30 23:10:08 +09:00
11 changed files with 1441 additions and 751 deletions

View File

@@ -0,0 +1,29 @@
from airflow.plugins_manager import AirflowPlugin
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
# --- Invalid extensions start
operators = [PluginOperator]
sensors = [PluginSensorOperator]
hooks = [PluginHook]
executors = [PluginExecutor]
# --- Invalid extensions end
macros = [plugin_macro]
flask_blueprints = [bp]
appbuilder_views = [v_appbuilder_package]
appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel]
global_operator_extra_links = [
AirflowLink(),
GithubLink(),
]
operator_extra_links = [
GoogleLink(),
AirflowLink2(),
CustomOpLink(),
CustomBaseIndexOpLink(1),
]
timetables = [CustomCronDataIntervalTimetable]
listeners = [empty_listener, ClassBasedListener()]
ti_deps = [CustomTestTriggerRule()]
priority_weight_strategies = [CustomPriorityWeightStrategy]

View File

@@ -1,14 +1,17 @@
from datetime import timedelta
from airflow import DAG, dag
from airflow.timetables.simple import NullTimetable
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.operators import trigger_dagrun
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.providers.standard.operators import datetime
from airflow.sensors.weekday import DayOfWeekSensor, BranchDayOfWeekOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.amazon.aws.log.s3_task_handler import S3TaskHandler
from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager
from airflow.providers.google.cloud.log.gcs_task_handler import GCSTaskHandler
from airflow.providers.standard.operators import datetime, trigger_dagrun
from airflow.providers.standard.sensors import weekday
from airflow.sensors.weekday import BranchDayOfWeekOperator, DayOfWeekSensor
from airflow.timetables.simple import NullTimetable
DAG(dag_id="class_schedule", schedule="@hourly")
@@ -54,10 +57,12 @@ def decorator_deprecated_operator_args():
)
branch_dt_op = datetime.BranchDateTimeOperator(
task_id="branch_dt_op", use_task_execution_day=True
task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5
)
branch_dt_op2 = BranchDateTimeOperator(
task_id="branch_dt_op2", use_task_execution_day=True
task_id="branch_dt_op2",
use_task_execution_day=True,
sla=timedelta(seconds=10),
)
dof_task_sensor = weekday.DayOfWeekSensor(
@@ -76,3 +81,12 @@ def decorator_deprecated_operator_args():
branch_dt_op >> branch_dt_op2
dof_task_sensor >> dof_task_sensor2
bdow_op >> bdow_op2
# deprecated filename_template arugment in FileTaskHandler
S3TaskHandler(filename_template="/tmp/test")
HdfsTaskHandler(filename_template="/tmp/test")
ElasticsearchTaskHandler(filename_template="/tmp/test")
GCSTaskHandler(filename_template="/tmp/test")
FabAuthManager(None)

View File

@@ -0,0 +1,59 @@
from airflow.datasets.manager import DatasetManager
from airflow.lineage.hook import DatasetLineageInfo, HookLineageCollector
from airflow.providers.amazon.auth_manager.aws_auth_manager import AwsAuthManager
from airflow.providers.apache.beam.hooks import BeamHook, NotAir302HookError
from airflow.providers.google.cloud.secrets.secret_manager import (
CloudSecretManagerBackend,
)
from airflow.providers.hashicorp.secrets.vault import NotAir302SecretError, VaultBackend
from airflow.providers_manager import ProvidersManager
from airflow.secrets.base_secrets import BaseSecretsBackend
dm = DatasetManager()
dm.register_dataset_change()
dm.create_datasets()
dm.notify_dataset_created()
dm.notify_dataset_changed()
dm.notify_dataset_alias_created()
hlc = HookLineageCollector()
hlc.create_dataset()
hlc.add_input_dataset()
hlc.add_output_dataset()
hlc.collected_datasets()
aam = AwsAuthManager()
aam.is_authorized_dataset()
pm = ProvidersManager()
pm.initialize_providers_asset_uri_resources()
pm.dataset_factories
base_secret_backend = BaseSecretsBackend()
base_secret_backend.get_conn_uri()
base_secret_backend.get_connections()
csm_backend = CloudSecretManagerBackend()
csm_backend.get_conn_uri()
csm_backend.get_connections()
vault_backend = VaultBackend()
vault_backend.get_conn_uri()
vault_backend.get_connections()
not_an_error = NotAir302SecretError()
not_an_error.get_conn_uri()
beam_hook = BeamHook()
beam_hook.get_conn_uri()
not_an_error = NotAir302HookError()
not_an_error.get_conn_uri()
provider_manager = ProvidersManager()
provider_manager.dataset_factories
provider_manager.dataset_uri_handlers
provider_manager.dataset_to_openlineage_converters
dl_info = DatasetLineageInfo()
dl_info.dataset

View File

@@ -36,6 +36,7 @@ from airflow.datasets.manager import (
dataset_manager,
resolve_dataset_manager,
)
from airflow.hooks.base_hook import BaseHook
from airflow.lineage.hook import DatasetLineageInfo
from airflow.listeners.spec.dataset import on_dataset_changed, on_dataset_created
from airflow.metrics.validators import AllowListValidator, BlockListValidator
@@ -96,6 +97,10 @@ from airflow.www.utils import get_sensitive_variables_fields, should_hide_value_
PY36, PY37, PY38, PY39, PY310, PY311, PY312
DatasetFromRoot
dataset_from_root = DatasetFromRoot()
dataset_from_root.iter_datasets()
dataset_from_root.iter_dataset_aliases()
# airflow.api_connexion.security
requires_access, requires_access_dataset
@@ -119,9 +124,24 @@ DatasetAny
expand_alias_to_datasets
Metadata
dataset_to_test_method_call = Dataset()
dataset_to_test_method_call.iter_datasets()
dataset_to_test_method_call.iter_dataset_aliases()
alias_to_test_method_call = DatasetAlias()
alias_to_test_method_call.iter_datasets()
alias_to_test_method_call.iter_dataset_aliases()
any_to_test_method_call = DatasetAny()
any_to_test_method_call.iter_datasets()
any_to_test_method_call.iter_dataset_aliases()
# airflow.datasets.manager
DatasetManager, dataset_manager, resolve_dataset_manager
# airflow.hooks
BaseHook()
# airflow.lineage.hook
DatasetLineageInfo
@@ -253,34 +273,3 @@ has_access_dataset
# airflow.www.utils
get_sensitive_variables_fields, should_hide_value_for_key
from airflow.datasets.manager import DatasetManager
dm = DatasetManager()
dm.register_dataset_change()
dm.create_datasets()
dm.notify_dataset_created()
dm.notify_dataset_changed()
dm.notify_dataset_alias_created()
from airflow.lineage.hook import HookLineageCollector
hlc = HookLineageCollector()
hlc.create_dataset()
hlc.add_input_dataset()
hlc.add_output_dataset()
hlc.collected_datasets()
from airflow.providers.amazon.auth_manager.aws_auth_manager import AwsAuthManager
aam = AwsAuthManager()
aam.is_authorized_dataset()
from airflow.providers_manager import ProvidersManager
pm = ProvidersManager()
pm.initialize_providers_asset_uri_resources()
pm.dataset_factories

View File

@@ -279,6 +279,9 @@ pub(crate) fn expression(expr: &Expr, checker: &mut Checker) {
);
}
}
if checker.enabled(Rule::Airflow3Removal) {
airflow::rules::removed_in_3(checker, expr);
}
if checker.enabled(Rule::MixedCaseVariableInGlobalScope) {
if matches!(checker.semantic.current_scope().kind, ScopeKind::Module) {
pep8_naming::rules::mixed_case_variable_in_global_scope(

View File

@@ -16,6 +16,8 @@ mod tests {
#[test_case(Rule::AirflowDagNoScheduleArgument, Path::new("AIR301.py"))]
#[test_case(Rule::Airflow3Removal, Path::new("AIR302_args.py"))]
#[test_case(Rule::Airflow3Removal, Path::new("AIR302_names.py"))]
#[test_case(Rule::Airflow3Removal, Path::new("AIR302_class_attribute.py"))]
#[test_case(Rule::Airflow3Removal, Path::new("AIR302_airflow_plugin.py"))]
#[test_case(Rule::Airflow3MovedToProvider, Path::new("AIR303.py"))]
fn rules(rule_code: Rule, path: &Path) -> Result<()> {
let snapshot = format!("{}_{}", rule_code.noqa_code(), path.to_string_lossy());

View File

@@ -1,8 +1,12 @@
use ruff_diagnostics::{Diagnostic, Edit, Fix, FixAvailability, Violation};
use ruff_macros::{derive_message_formats, ViolationMetadata};
use ruff_python_ast::{name::QualifiedName, Arguments, Expr, ExprAttribute, ExprCall};
use ruff_python_ast::{
name::QualifiedName, Arguments, Expr, ExprAttribute, ExprCall, ExprContext, ExprName,
StmtClassDef,
};
use ruff_python_semantic::analyze::typing;
use ruff_python_semantic::Modules;
use ruff_python_semantic::ScopeKind;
use ruff_text_size::Ranged;
use crate::checkers::ast::Checker;
@@ -123,38 +127,106 @@ fn removed_argument(checker: &mut Checker, qualname: &QualifiedName, arguments:
None::<&str>,
));
}
["airflow", .., "operators", "trigger_dagrun", "TriggerDagRunOperator"] => {
checker.diagnostics.extend(diagnostic_for_argument(
arguments,
"execution_date",
Some("logical_date"),
));
_ => {
if is_airflow_auth_manager(qualname.segments()) {
if !arguments.is_empty() {
checker.diagnostics.push(Diagnostic::new(
Airflow3Removal {
// deprecated: (*arguments).to_string(),
deprecated: "appbuilder".to_string(),
replacement: Replacement::Message(
"The constructor takes no parameter now.",
),
},
arguments.range(),
));
}
} else if is_airflow_task_handler(qualname.segments()) {
checker.diagnostics.extend(diagnostic_for_argument(
arguments,
"filename_template",
None::<&str>,
));
} else if is_airflow_operator(qualname.segments()) {
checker
.diagnostics
.extend(diagnostic_for_argument(arguments, "sla", None::<&str>));
checker.diagnostics.extend(diagnostic_for_argument(
arguments,
"task_concurrency",
Some("max_active_tis_per_dag"),
));
match qualname.segments() {
["airflow", .., "operators", "trigger_dagrun", "TriggerDagRunOperator"] => {
checker.diagnostics.extend(diagnostic_for_argument(
arguments,
"execution_date",
Some("logical_date"),
));
}
["airflow", .., "operators", "datetime", "BranchDateTimeOperator"] => {
checker.diagnostics.extend(diagnostic_for_argument(
arguments,
"use_task_execution_day",
Some("use_task_logical_date"),
));
}
["airflow", .., "operators", "weekday", "DayOfWeekSensor"] => {
checker.diagnostics.extend(diagnostic_for_argument(
arguments,
"use_task_execution_day",
Some("use_task_logical_date"),
));
}
["airflow", .., "operators", "weekday", "BranchDayOfWeekOperator"] => {
checker.diagnostics.extend(diagnostic_for_argument(
arguments,
"use_task_execution_day",
Some("use_task_logical_date"),
));
}
_ => {}
}
}
}
["airflow", .., "operators", "datetime", "BranchDateTimeOperator"] => {
checker.diagnostics.extend(diagnostic_for_argument(
arguments,
"use_task_execution_day",
Some("use_task_logical_date"),
));
}
["airflow", .., "operators", "weekday", "DayOfWeekSensor"] => {
checker.diagnostics.extend(diagnostic_for_argument(
arguments,
"use_task_execution_day",
Some("use_task_logical_date"),
));
}
["airflow", .., "operators", "weekday", "BranchDayOfWeekOperator"] => {
checker.diagnostics.extend(diagnostic_for_argument(
arguments,
"use_task_execution_day",
Some("use_task_logical_date"),
));
}
_ => {}
};
}
fn removed_class_attribute(checker: &mut Checker, expr: &Expr) {
let Expr::Attribute(ExprAttribute { attr, value, .. }) = expr else {
return;
};
let Some(qualname) = typing::resolve_assignment(value, checker.semantic()) else {
return;
};
let replacement = match *qualname.segments() {
["airflow", "providers_manager", "ProvidersManager"] => match attr.as_str() {
"dataset_factories" => Some(Replacement::Name("asset_factories")),
"dataset_uri_handlers" => Some(Replacement::Name("asset_uri_handlers")),
"dataset_to_openlineage_converters" => {
Some(Replacement::Name("asset_to_openlineage_converters"))
}
&_ => None,
},
["airflow", "lineage", "hook"] => match attr.as_str() {
"dataset" => Some(Replacement::Name("asset")),
&_ => None,
},
_ => None,
};
if let Some(replacement) = replacement {
checker.diagnostics.push(Diagnostic::new(
Airflow3Removal {
deprecated: attr.to_string(),
replacement,
},
attr.range(),
));
}
}
fn removed_method(checker: &mut Checker, expr: &Expr) {
let Expr::Call(ExprCall { func, .. }) = expr else {
return;
@@ -196,7 +268,27 @@ fn removed_method(checker: &mut Checker, expr: &Expr) {
)),
&_ => None,
},
_ => None,
["airflow", "datasets", ..] | ["airflow", "Dataset"] => match attr.as_str() {
"iter_datasets" => Some(Replacement::Name("iter_assets")),
"iter_dataset_aliases" => Some(Replacement::Name("iter_asset_aliases")),
&_ => None,
},
_ => {
if is_airflow_secret_backend(qualname.segments()) {
match attr.as_str() {
"get_conn_uri" => Some(Replacement::Name("get_conn_value")),
"get_connections" => Some(Replacement::Name("get_connection")),
&_ => None,
}
} else if is_airflow_hook(qualname.segments()) {
match attr.as_str() {
"get_connections" => Some(Replacement::Name("get_connection")),
&_ => None,
}
} else {
None
}
}
};
if let Some(replacement) = replacement {
checker.diagnostics.push(Diagnostic::new(
@@ -390,6 +482,11 @@ fn removed_name(checker: &mut Checker, expr: &Expr, ranged: impl Ranged) {
qualname.to_string(),
Replacement::Name("airflow.lineage.hook.AssetLineageInfo"),
)),
// airflow.hooks
["airflow", "hooks", "base_hook", "BaseHook"] => Some((
qualname.to_string(),
Replacement::Name("airflow.hooks.base.BaseHook"),
)),
// airflow.operators
["airflow", "operators", "subdag", ..] => {
Some((
@@ -646,6 +743,37 @@ fn removed_name(checker: &mut Checker, expr: &Expr, ranged: impl Ranged) {
}
}
fn removed_airflow_plugin_extension(
checker: &mut Checker,
expr: &Expr,
name: &str,
class_def: &StmtClassDef,
) {
if matches!(name, "executors" | "operators" | "sensors" | "hooks") {
if class_def.bases().iter().any(|expr| {
checker
.semantic()
.resolve_qualified_name(expr)
.is_some_and(|qualified_name| {
matches!(
qualified_name.segments(),
["airflow", "plugins_manager", "AirflowPlugin"]
)
})
}) {
checker.diagnostics.push(Diagnostic::new(
Airflow3Removal {
deprecated: name.to_string(),
replacement: Replacement::Message(
"This extension should just be imported as a regular python module.",
),
},
expr.range(),
));
}
}
}
/// AIR302
pub(crate) fn removed_in_3(checker: &mut Checker, expr: &Expr) {
if !checker.semantic().seen_module(Modules::AIRFLOW) {
@@ -662,8 +790,104 @@ pub(crate) fn removed_in_3(checker: &mut Checker, expr: &Expr) {
removed_method(checker, expr);
}
Expr::Attribute(ExprAttribute { attr: ranged, .. }) => removed_name(checker, expr, ranged),
ranged @ Expr::Name(_) => removed_name(checker, expr, ranged),
Expr::Attribute(ExprAttribute { attr: ranged, .. }) => {
removed_name(checker, expr, ranged);
removed_class_attribute(checker, expr);
}
ranged @ Expr::Name(ExprName { id, ctx, .. }) => {
removed_name(checker, expr, ranged);
if ctx == &ExprContext::Store {
if let ScopeKind::Class(class_def) = &checker.semantic().current_scope().kind {
removed_airflow_plugin_extension(checker, expr, id, class_def);
}
}
}
_ => {}
}
}
/// Check whether the symbol is coming from the `secrets` builtin or provider module which ends
/// with `Backend`.
fn is_airflow_secret_backend(segments: &[&str]) -> bool {
is_airflow_builtin_or_provider(segments, "secrets", "Backend")
}
/// Check whether the symbol is coming from the `hooks` builtin or provider module which ends
/// with `Hook`.
fn is_airflow_hook(segments: &[&str]) -> bool {
is_airflow_builtin_or_provider(segments, "hooks", "Hook")
}
/// Check whether the symbol is coming from the `operators` builtin or provider module which ends
/// with `Operator`.
fn is_airflow_operator(segments: &[&str]) -> bool {
is_airflow_builtin_or_provider(segments, "operators", "Operator")
}
/// Check whether the symbol is coming from the `log` builtin or provider module which ends
/// with `TaskHandler`.
fn is_airflow_task_handler(segments: &[&str]) -> bool {
is_airflow_builtin_or_provider(segments, "log", "TaskHandler")
}
/// Check whether the segments corresponding to the fully qualified name points to a symbol that's
/// either a builtin or coming from one of the providers in Airflow.
///
/// The pattern it looks for are:
/// - `airflow.providers.**.<module>.**.*<symbol_suffix>` for providers
/// - `airflow.<module>.**.*<symbol_suffix>` for builtins
///
/// where `**` is one or more segments separated by a dot, and `*` is one or more characters.
///
/// Examples for the above patterns:
/// - `airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend` (provider)
/// - `airflow.secrets.base_secrets.BaseSecretsBackend` (builtin)
fn is_airflow_builtin_or_provider(segments: &[&str], module: &str, symbol_suffix: &str) -> bool {
match segments {
["airflow", "providers", rest @ ..] => {
if let (Some(pos), Some(last_element)) =
(rest.iter().position(|&s| s == module), rest.last())
{
// Check that the module is not the last element i.e., there's a symbol that's
// being used from the `module` that ends with `symbol_suffix`.
pos + 1 < rest.len() && last_element.ends_with(symbol_suffix)
} else {
false
}
}
["airflow", first, rest @ ..] => {
if let Some(last) = rest.last() {
first == module && last.ends_with(symbol_suffix)
} else {
false
}
}
_ => false,
}
}
fn is_airflow_auth_manager(segments: &[&str]) -> bool {
match segments {
["airflow", "auth", "manager", rest @ ..] => {
if let Some(last_element) = rest.last() {
last_element.ends_with("AuthManager")
} else {
false
}
}
["airflow", "providers", rest @ ..] => {
if let (Some(pos), Some(last_element)) =
(rest.iter().position(|&s| s == "auth_manager"), rest.last())
{
pos + 1 < rest.len() && last_element.ends_with("AuthManager")
} else {
false
}
}
_ => false,
}
}

View File

@@ -0,0 +1,43 @@
---
source: crates/ruff_linter/src/rules/airflow/mod.rs
snapshot_kind: text
---
AIR302_airflow_plugin.py:7:5: AIR302 `operators` is removed in Airflow 3.0; This extension should just be imported as a regular python module.
|
5 | name = "test_plugin"
6 | # --- Invalid extensions start
7 | operators = [PluginOperator]
| ^^^^^^^^^ AIR302
8 | sensors = [PluginSensorOperator]
9 | hooks = [PluginHook]
|
AIR302_airflow_plugin.py:8:5: AIR302 `sensors` is removed in Airflow 3.0; This extension should just be imported as a regular python module.
|
6 | # --- Invalid extensions start
7 | operators = [PluginOperator]
8 | sensors = [PluginSensorOperator]
| ^^^^^^^ AIR302
9 | hooks = [PluginHook]
10 | executors = [PluginExecutor]
|
AIR302_airflow_plugin.py:9:5: AIR302 `hooks` is removed in Airflow 3.0; This extension should just be imported as a regular python module.
|
7 | operators = [PluginOperator]
8 | sensors = [PluginSensorOperator]
9 | hooks = [PluginHook]
| ^^^^^ AIR302
10 | executors = [PluginExecutor]
11 | # --- Invalid extensions end
|
AIR302_airflow_plugin.py:10:5: AIR302 `executors` is removed in Airflow 3.0; This extension should just be imported as a regular python module.
|
8 | sensors = [PluginSensorOperator]
9 | hooks = [PluginHook]
10 | executors = [PluginExecutor]
| ^^^^^^^^^ AIR302
11 | # --- Invalid extensions end
12 | macros = [plugin_macro]
|

View File

@@ -2,175 +2,251 @@
source: crates/ruff_linter/src/rules/airflow/mod.rs
snapshot_kind: text
---
AIR302_args.py:15:39: AIR302 [*] `schedule_interval` is removed in Airflow 3.0
AIR302_args.py:18:39: AIR302 [*] `schedule_interval` is removed in Airflow 3.0
|
13 | DAG(dag_id="class_schedule", schedule="@hourly")
14 |
15 | DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")
16 | DAG(dag_id="class_schedule", schedule="@hourly")
17 |
18 | DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")
| ^^^^^^^^^^^^^^^^^ AIR302
16 |
17 | DAG(dag_id="class_timetable", timetable=NullTimetable())
19 |
20 | DAG(dag_id="class_timetable", timetable=NullTimetable())
|
= help: Use `schedule` instead
Safe fix
12 12 |
13 13 | DAG(dag_id="class_schedule", schedule="@hourly")
14 14 |
15 |-DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")
15 |+DAG(dag_id="class_schedule_interval", schedule="@hourly")
16 16 |
17 17 | DAG(dag_id="class_timetable", timetable=NullTimetable())
18 18 |
15 15 |
16 16 | DAG(dag_id="class_schedule", schedule="@hourly")
17 17 |
18 |-DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")
18 |+DAG(dag_id="class_schedule_interval", schedule="@hourly")
19 19 |
20 20 | DAG(dag_id="class_timetable", timetable=NullTimetable())
21 21 |
AIR302_args.py:17:31: AIR302 [*] `timetable` is removed in Airflow 3.0
AIR302_args.py:20:31: AIR302 [*] `timetable` is removed in Airflow 3.0
|
15 | DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")
16 |
17 | DAG(dag_id="class_timetable", timetable=NullTimetable())
18 | DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")
19 |
20 | DAG(dag_id="class_timetable", timetable=NullTimetable())
| ^^^^^^^^^ AIR302
|
= help: Use `schedule` instead
Safe fix
14 14 |
15 15 | DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")
16 16 |
17 |-DAG(dag_id="class_timetable", timetable=NullTimetable())
17 |+DAG(dag_id="class_timetable", schedule=NullTimetable())
18 18 |
17 17 |
18 18 | DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")
19 19 |
20 20 | def sla_callback(*arg, **kwargs):
20 |-DAG(dag_id="class_timetable", timetable=NullTimetable())
20 |+DAG(dag_id="class_timetable", schedule=NullTimetable())
21 21 |
22 22 |
23 23 | def sla_callback(*arg, **kwargs):
AIR302_args.py:24:34: AIR302 `sla_miss_callback` is removed in Airflow 3.0
AIR302_args.py:27:34: AIR302 `sla_miss_callback` is removed in Airflow 3.0
|
24 | DAG(dag_id="class_sla_callback", sla_miss_callback=sla_callback)
27 | DAG(dag_id="class_sla_callback", sla_miss_callback=sla_callback)
| ^^^^^^^^^^^^^^^^^ AIR302
|
AIR302_args.py:32:6: AIR302 [*] `schedule_interval` is removed in Airflow 3.0
AIR302_args.py:35:6: AIR302 [*] `schedule_interval` is removed in Airflow 3.0
|
32 | @dag(schedule_interval="0 * * * *")
35 | @dag(schedule_interval="0 * * * *")
| ^^^^^^^^^^^^^^^^^ AIR302
33 | def decorator_schedule_interval():
34 | pass
36 | def decorator_schedule_interval():
37 | pass
|
= help: Use `schedule` instead
Safe fix
29 29 | pass
30 30 |
31 31 |
32 |-@dag(schedule_interval="0 * * * *")
32 |+@dag(schedule="0 * * * *")
33 33 | def decorator_schedule_interval():
34 34 | pass
35 35 |
32 32 | pass
33 33 |
34 34 |
35 |-@dag(schedule_interval="0 * * * *")
35 |+@dag(schedule="0 * * * *")
36 36 | def decorator_schedule_interval():
37 37 | pass
38 38 |
AIR302_args.py:37:6: AIR302 [*] `timetable` is removed in Airflow 3.0
AIR302_args.py:40:6: AIR302 [*] `timetable` is removed in Airflow 3.0
|
37 | @dag(timetable=NullTimetable())
40 | @dag(timetable=NullTimetable())
| ^^^^^^^^^ AIR302
38 | def decorator_timetable():
39 | pass
41 | def decorator_timetable():
42 | pass
|
= help: Use `schedule` instead
Safe fix
34 34 | pass
35 35 |
36 36 |
37 |-@dag(timetable=NullTimetable())
37 |+@dag(schedule=NullTimetable())
38 38 | def decorator_timetable():
39 39 | pass
40 40 |
37 37 | pass
38 38 |
39 39 |
40 |-@dag(timetable=NullTimetable())
40 |+@dag(schedule=NullTimetable())
41 41 | def decorator_timetable():
42 42 | pass
43 43 |
AIR302_args.py:42:6: AIR302 `sla_miss_callback` is removed in Airflow 3.0
AIR302_args.py:45:6: AIR302 `sla_miss_callback` is removed in Airflow 3.0
|
42 | @dag(sla_miss_callback=sla_callback)
45 | @dag(sla_miss_callback=sla_callback)
| ^^^^^^^^^^^^^^^^^ AIR302
43 | def decorator_sla_callback():
44 | pass
46 | def decorator_sla_callback():
47 | pass
|
AIR302_args.py:50:39: AIR302 [*] `execution_date` is removed in Airflow 3.0
|
48 | def decorator_deprecated_operator_args():
49 | trigger_dagrun_op = trigger_dagrun.TriggerDagRunOperator(
50 | task_id="trigger_dagrun_op1", execution_date="2024-12-04"
| ^^^^^^^^^^^^^^ AIR302
51 | )
52 | trigger_dagrun_op2 = TriggerDagRunOperator(
|
= help: Use `logical_date` instead
Safe fix
47 47 | @dag()
48 48 | def decorator_deprecated_operator_args():
49 49 | trigger_dagrun_op = trigger_dagrun.TriggerDagRunOperator(
50 |- task_id="trigger_dagrun_op1", execution_date="2024-12-04"
50 |+ task_id="trigger_dagrun_op1", logical_date="2024-12-04"
51 51 | )
52 52 | trigger_dagrun_op2 = TriggerDagRunOperator(
53 53 | task_id="trigger_dagrun_op2", execution_date="2024-12-04"
AIR302_args.py:53:39: AIR302 [*] `execution_date` is removed in Airflow 3.0
|
51 | )
52 | trigger_dagrun_op2 = TriggerDagRunOperator(
53 | task_id="trigger_dagrun_op2", execution_date="2024-12-04"
51 | def decorator_deprecated_operator_args():
52 | trigger_dagrun_op = trigger_dagrun.TriggerDagRunOperator(
53 | task_id="trigger_dagrun_op1", execution_date="2024-12-04"
| ^^^^^^^^^^^^^^ AIR302
54 | )
55 | trigger_dagrun_op2 = TriggerDagRunOperator(
|
= help: Use `logical_date` instead
Safe fix
50 50 | task_id="trigger_dagrun_op1", execution_date="2024-12-04"
51 51 | )
52 52 | trigger_dagrun_op2 = TriggerDagRunOperator(
53 |- task_id="trigger_dagrun_op2", execution_date="2024-12-04"
53 |+ task_id="trigger_dagrun_op2", logical_date="2024-12-04"
50 50 | @dag()
51 51 | def decorator_deprecated_operator_args():
52 52 | trigger_dagrun_op = trigger_dagrun.TriggerDagRunOperator(
53 |- task_id="trigger_dagrun_op1", execution_date="2024-12-04"
53 |+ task_id="trigger_dagrun_op1", logical_date="2024-12-04"
54 54 | )
55 55 |
56 56 | branch_dt_op = datetime.BranchDateTimeOperator(
55 55 | trigger_dagrun_op2 = TriggerDagRunOperator(
56 56 | task_id="trigger_dagrun_op2", execution_date="2024-12-04"
AIR302_args.py:57:33: AIR302 [*] `use_task_execution_day` is removed in Airflow 3.0
AIR302_args.py:56:39: AIR302 [*] `execution_date` is removed in Airflow 3.0
|
56 | branch_dt_op = datetime.BranchDateTimeOperator(
57 | task_id="branch_dt_op", use_task_execution_day=True
54 | )
55 | trigger_dagrun_op2 = TriggerDagRunOperator(
56 | task_id="trigger_dagrun_op2", execution_date="2024-12-04"
| ^^^^^^^^^^^^^^ AIR302
57 | )
|
= help: Use `logical_date` instead
Safe fix
53 53 | task_id="trigger_dagrun_op1", execution_date="2024-12-04"
54 54 | )
55 55 | trigger_dagrun_op2 = TriggerDagRunOperator(
56 |- task_id="trigger_dagrun_op2", execution_date="2024-12-04"
56 |+ task_id="trigger_dagrun_op2", logical_date="2024-12-04"
57 57 | )
58 58 |
59 59 | branch_dt_op = datetime.BranchDateTimeOperator(
AIR302_args.py:60:33: AIR302 [*] `use_task_execution_day` is removed in Airflow 3.0
|
59 | branch_dt_op = datetime.BranchDateTimeOperator(
60 | task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5
| ^^^^^^^^^^^^^^^^^^^^^^ AIR302
58 | )
59 | branch_dt_op2 = BranchDateTimeOperator(
|
= help: Use `use_task_logical_date` instead
Safe fix
54 54 | )
55 55 |
56 56 | branch_dt_op = datetime.BranchDateTimeOperator(
57 |- task_id="branch_dt_op", use_task_execution_day=True
57 |+ task_id="branch_dt_op", use_task_logical_date=True
58 58 | )
59 59 | branch_dt_op2 = BranchDateTimeOperator(
60 60 | task_id="branch_dt_op2", use_task_execution_day=True
AIR302_args.py:60:34: AIR302 [*] `use_task_execution_day` is removed in Airflow 3.0
|
58 | )
59 | branch_dt_op2 = BranchDateTimeOperator(
60 | task_id="branch_dt_op2", use_task_execution_day=True
| ^^^^^^^^^^^^^^^^^^^^^^ AIR302
61 | )
62 | branch_dt_op2 = BranchDateTimeOperator(
|
= help: Use `use_task_logical_date` instead
Safe fix
57 57 | task_id="branch_dt_op", use_task_execution_day=True
58 58 | )
59 59 | branch_dt_op2 = BranchDateTimeOperator(
60 |- task_id="branch_dt_op2", use_task_execution_day=True
60 |+ task_id="branch_dt_op2", use_task_logical_date=True
57 57 | )
58 58 |
59 59 | branch_dt_op = datetime.BranchDateTimeOperator(
60 |- task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5
60 |+ task_id="branch_dt_op", use_task_logical_date=True, task_concurrency=5
61 61 | )
62 62 |
63 63 | dof_task_sensor = weekday.DayOfWeekSensor(
62 62 | branch_dt_op2 = BranchDateTimeOperator(
63 63 | task_id="branch_dt_op2",
AIR302_args.py:60:62: AIR302 [*] `task_concurrency` is removed in Airflow 3.0
|
59 | branch_dt_op = datetime.BranchDateTimeOperator(
60 | task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5
| ^^^^^^^^^^^^^^^^ AIR302
61 | )
62 | branch_dt_op2 = BranchDateTimeOperator(
|
= help: Use `max_active_tis_per_dag` instead
Safe fix
57 57 | )
58 58 |
59 59 | branch_dt_op = datetime.BranchDateTimeOperator(
60 |- task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5
60 |+ task_id="branch_dt_op", use_task_execution_day=True, max_active_tis_per_dag=5
61 61 | )
62 62 | branch_dt_op2 = BranchDateTimeOperator(
63 63 | task_id="branch_dt_op2",
AIR302_args.py:64:9: AIR302 [*] `use_task_execution_day` is removed in Airflow 3.0
|
62 | branch_dt_op2 = BranchDateTimeOperator(
63 | task_id="branch_dt_op2",
64 | use_task_execution_day=True,
| ^^^^^^^^^^^^^^^^^^^^^^ AIR302
65 | sla=timedelta(seconds=10),
66 | )
|
= help: Use `use_task_logical_date` instead
Safe fix
61 61 | )
62 62 | branch_dt_op2 = BranchDateTimeOperator(
63 63 | task_id="branch_dt_op2",
64 |- use_task_execution_day=True,
64 |+ use_task_logical_date=True,
65 65 | sla=timedelta(seconds=10),
66 66 | )
67 67 |
AIR302_args.py:65:9: AIR302 `sla` is removed in Airflow 3.0
|
63 | task_id="branch_dt_op2",
64 | use_task_execution_day=True,
65 | sla=timedelta(seconds=10),
| ^^^ AIR302
66 | )
|
AIR302_args.py:87:15: AIR302 `filename_template` is removed in Airflow 3.0
|
86 | # deprecated filename_template arugment in FileTaskHandler
87 | S3TaskHandler(filename_template="/tmp/test")
| ^^^^^^^^^^^^^^^^^ AIR302
88 | HdfsTaskHandler(filename_template="/tmp/test")
89 | ElasticsearchTaskHandler(filename_template="/tmp/test")
|
AIR302_args.py:88:17: AIR302 `filename_template` is removed in Airflow 3.0
|
86 | # deprecated filename_template arugment in FileTaskHandler
87 | S3TaskHandler(filename_template="/tmp/test")
88 | HdfsTaskHandler(filename_template="/tmp/test")
| ^^^^^^^^^^^^^^^^^ AIR302
89 | ElasticsearchTaskHandler(filename_template="/tmp/test")
90 | GCSTaskHandler(filename_template="/tmp/test")
|
AIR302_args.py:89:26: AIR302 `filename_template` is removed in Airflow 3.0
|
87 | S3TaskHandler(filename_template="/tmp/test")
88 | HdfsTaskHandler(filename_template="/tmp/test")
89 | ElasticsearchTaskHandler(filename_template="/tmp/test")
| ^^^^^^^^^^^^^^^^^ AIR302
90 | GCSTaskHandler(filename_template="/tmp/test")
|
AIR302_args.py:90:16: AIR302 `filename_template` is removed in Airflow 3.0
|
88 | HdfsTaskHandler(filename_template="/tmp/test")
89 | ElasticsearchTaskHandler(filename_template="/tmp/test")
90 | GCSTaskHandler(filename_template="/tmp/test")
| ^^^^^^^^^^^^^^^^^ AIR302
91 |
92 | FabAuthManager(None)
|
AIR302_args.py:92:15: AIR302 `appbuilder` is removed in Airflow 3.0; The constructor takes no parameter now.
|
90 | GCSTaskHandler(filename_template="/tmp/test")
91 |
92 | FabAuthManager(None)
| ^^^^^^ AIR302
|

View File

@@ -0,0 +1,220 @@
---
source: crates/ruff_linter/src/rules/airflow/mod.rs
snapshot_kind: text
---
AIR302_class_attribute.py:13:4: AIR302 `register_dataset_change` is removed in Airflow 3.0
|
12 | dm = DatasetManager()
13 | dm.register_dataset_change()
| ^^^^^^^^^^^^^^^^^^^^^^^ AIR302
14 | dm.create_datasets()
15 | dm.notify_dataset_created()
|
= help: Use `register_asset_change` instead
AIR302_class_attribute.py:14:4: AIR302 `create_datasets` is removed in Airflow 3.0
|
12 | dm = DatasetManager()
13 | dm.register_dataset_change()
14 | dm.create_datasets()
| ^^^^^^^^^^^^^^^ AIR302
15 | dm.notify_dataset_created()
16 | dm.notify_dataset_changed()
|
= help: Use `create_assets` instead
AIR302_class_attribute.py:15:4: AIR302 `notify_dataset_created` is removed in Airflow 3.0
|
13 | dm.register_dataset_change()
14 | dm.create_datasets()
15 | dm.notify_dataset_created()
| ^^^^^^^^^^^^^^^^^^^^^^ AIR302
16 | dm.notify_dataset_changed()
17 | dm.notify_dataset_alias_created()
|
= help: Use `notify_asset_created` instead
AIR302_class_attribute.py:16:4: AIR302 `notify_dataset_changed` is removed in Airflow 3.0
|
14 | dm.create_datasets()
15 | dm.notify_dataset_created()
16 | dm.notify_dataset_changed()
| ^^^^^^^^^^^^^^^^^^^^^^ AIR302
17 | dm.notify_dataset_alias_created()
|
= help: Use `notify_asset_changed` instead
AIR302_class_attribute.py:17:4: AIR302 `notify_dataset_alias_created` is removed in Airflow 3.0
|
15 | dm.notify_dataset_created()
16 | dm.notify_dataset_changed()
17 | dm.notify_dataset_alias_created()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR302
18 |
19 | hlc = HookLineageCollector()
|
= help: Use `notify_asset_alias_created` instead
AIR302_class_attribute.py:20:5: AIR302 `create_dataset` is removed in Airflow 3.0
|
19 | hlc = HookLineageCollector()
20 | hlc.create_dataset()
| ^^^^^^^^^^^^^^ AIR302
21 | hlc.add_input_dataset()
22 | hlc.add_output_dataset()
|
= help: Use `create_asset` instead
AIR302_class_attribute.py:21:5: AIR302 `add_input_dataset` is removed in Airflow 3.0
|
19 | hlc = HookLineageCollector()
20 | hlc.create_dataset()
21 | hlc.add_input_dataset()
| ^^^^^^^^^^^^^^^^^ AIR302
22 | hlc.add_output_dataset()
23 | hlc.collected_datasets()
|
= help: Use `add_input_asset` instead
AIR302_class_attribute.py:22:5: AIR302 `add_output_dataset` is removed in Airflow 3.0
|
20 | hlc.create_dataset()
21 | hlc.add_input_dataset()
22 | hlc.add_output_dataset()
| ^^^^^^^^^^^^^^^^^^ AIR302
23 | hlc.collected_datasets()
|
= help: Use `add_output_asset` instead
AIR302_class_attribute.py:23:5: AIR302 `collected_datasets` is removed in Airflow 3.0
|
21 | hlc.add_input_dataset()
22 | hlc.add_output_dataset()
23 | hlc.collected_datasets()
| ^^^^^^^^^^^^^^^^^^ AIR302
24 |
25 | aam = AwsAuthManager()
|
= help: Use `collected_assets` instead
AIR302_class_attribute.py:26:5: AIR302 `is_authorized_dataset` is removed in Airflow 3.0
|
25 | aam = AwsAuthManager()
26 | aam.is_authorized_dataset()
| ^^^^^^^^^^^^^^^^^^^^^ AIR302
27 |
28 | pm = ProvidersManager()
|
= help: Use `is_authorized_asset` instead
AIR302_class_attribute.py:30:4: AIR302 `dataset_factories` is removed in Airflow 3.0
|
28 | pm = ProvidersManager()
29 | pm.initialize_providers_asset_uri_resources()
30 | pm.dataset_factories
| ^^^^^^^^^^^^^^^^^ AIR302
31 |
32 | base_secret_backend = BaseSecretsBackend()
|
= help: Use `asset_factories` instead
AIR302_class_attribute.py:33:21: AIR302 `get_conn_uri` is removed in Airflow 3.0
|
32 | base_secret_backend = BaseSecretsBackend()
33 | base_secret_backend.get_conn_uri()
| ^^^^^^^^^^^^ AIR302
34 | base_secret_backend.get_connections()
|
= help: Use `get_conn_value` instead
AIR302_class_attribute.py:34:21: AIR302 `get_connections` is removed in Airflow 3.0
|
32 | base_secret_backend = BaseSecretsBackend()
33 | base_secret_backend.get_conn_uri()
34 | base_secret_backend.get_connections()
| ^^^^^^^^^^^^^^^ AIR302
35 |
36 | csm_backend = CloudSecretManagerBackend()
|
= help: Use `get_connection` instead
AIR302_class_attribute.py:37:13: AIR302 `get_conn_uri` is removed in Airflow 3.0
|
36 | csm_backend = CloudSecretManagerBackend()
37 | csm_backend.get_conn_uri()
| ^^^^^^^^^^^^ AIR302
38 | csm_backend.get_connections()
|
= help: Use `get_conn_value` instead
AIR302_class_attribute.py:38:13: AIR302 `get_connections` is removed in Airflow 3.0
|
36 | csm_backend = CloudSecretManagerBackend()
37 | csm_backend.get_conn_uri()
38 | csm_backend.get_connections()
| ^^^^^^^^^^^^^^^ AIR302
39 |
40 | vault_backend = VaultBackend()
|
= help: Use `get_connection` instead
AIR302_class_attribute.py:41:15: AIR302 `get_conn_uri` is removed in Airflow 3.0
|
40 | vault_backend = VaultBackend()
41 | vault_backend.get_conn_uri()
| ^^^^^^^^^^^^ AIR302
42 | vault_backend.get_connections()
|
= help: Use `get_conn_value` instead
AIR302_class_attribute.py:42:15: AIR302 `get_connections` is removed in Airflow 3.0
|
40 | vault_backend = VaultBackend()
41 | vault_backend.get_conn_uri()
42 | vault_backend.get_connections()
| ^^^^^^^^^^^^^^^ AIR302
43 |
44 | not_an_error = NotAir302SecretError()
|
= help: Use `get_connection` instead
AIR302_class_attribute.py:54:18: AIR302 `dataset_factories` is removed in Airflow 3.0
|
53 | provider_manager = ProvidersManager()
54 | provider_manager.dataset_factories
| ^^^^^^^^^^^^^^^^^ AIR302
55 | provider_manager.dataset_uri_handlers
56 | provider_manager.dataset_to_openlineage_converters
|
= help: Use `asset_factories` instead
AIR302_class_attribute.py:55:18: AIR302 `dataset_uri_handlers` is removed in Airflow 3.0
|
53 | provider_manager = ProvidersManager()
54 | provider_manager.dataset_factories
55 | provider_manager.dataset_uri_handlers
| ^^^^^^^^^^^^^^^^^^^^ AIR302
56 | provider_manager.dataset_to_openlineage_converters
|
= help: Use `asset_uri_handlers` instead
AIR302_class_attribute.py:56:18: AIR302 `dataset_to_openlineage_converters` is removed in Airflow 3.0
|
54 | provider_manager.dataset_factories
55 | provider_manager.dataset_uri_handlers
56 | provider_manager.dataset_to_openlineage_converters
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR302
57 |
58 | dl_info = DatasetLineageInfo()
|
= help: Use `asset_to_openlineage_converters` instead
AIR302_class_attribute.py:58:11: AIR302 `airflow.lineage.hook.DatasetLineageInfo` is removed in Airflow 3.0
|
56 | provider_manager.dataset_to_openlineage_converters
57 |
58 | dl_info = DatasetLineageInfo()
| ^^^^^^^^^^^^^^^^^^ AIR302
59 | dl_info.dataset
|
= help: Use `airflow.lineage.hook.AssetLineageInfo` instead