From f7f3bb4ac5be70c9e51b2fd09d44b1cb2f50c552 Mon Sep 17 00:00:00 2001 From: jordanjeremy <72943478+jordanjeremy@users.noreply.github.com> Date: Mon, 7 Nov 2022 14:26:20 -0600 Subject: [PATCH 1/2] Fix getting the dag/task ids from base executor Was skipping the 3rd position in the command during validation. This caused the dag_id to be missed and the return from the get_parsing_context() (from airflow/utils/dag_parsing_context.py) had the wrong information. The dag_id contained the task_id and the task_id contained the run_id. Changed starting index when getting the dag and task ids to avoid skipping and added test for base executor. --- airflow/executors/base_executor.py | 2 +- tests/executors/test_base_executor.py | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) mode change 100644 => 100755 airflow/executors/base_executor.py mode change 100644 => 100755 tests/executors/test_base_executor.py diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py old mode 100644 new mode 100755 index 8361ba4bef27a..33a666b550580 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -363,7 +363,7 @@ def validate_airflow_tasks_run_command(command: list[str]) -> tuple[str | None, if len(command) > 3 and "--help" not in command: dag_id: str | None = None task_id: str | None = None - for arg in command[4:]: + for arg in command[3:]: if not arg.startswith("--"): if dag_id is None: dag_id = arg diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py old mode 100644 new mode 100755 index e4e39e45a0fdc..25f7235479941 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -125,3 +125,10 @@ def test_trigger_running_tasks(dag_maker, change_state_attempt): assert len(executor.execute_async.mock_calls) == len(dagrun.task_instances) + 1 else: assert len(executor.execute_async.mock_calls) == len(dagrun.task_instances) + + +def test_validate_airflow_tasks_run_command(dag_maker): + dagrun = setup_dagrun(dag_maker) + tis = dagrun.task_instances + dag_id, task_id = BaseExecutor.validate_airflow_tasks_run_command(tis[0].command_as_list()) + assert dag_id == dagrun.dag_id and task_id == tis[0].task_id From 76587ba50b6b420df39b464fa3980dc871e1084f Mon Sep 17 00:00:00 2001 From: jordanjeremy <72943478+jordanjeremy@users.noreply.github.com> Date: Mon, 7 Nov 2022 15:02:55 -0600 Subject: [PATCH 2/2] Fix file permissions to remove executable --- airflow/executors/base_executor.py | 0 tests/executors/test_base_executor.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 airflow/executors/base_executor.py mode change 100755 => 100644 tests/executors/test_base_executor.py diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py old mode 100755 new mode 100644 diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py old mode 100755 new mode 100644