From 05ca629edad6308f7e3d9006d32f5195ab8beaf5 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Sat, 29 May 2021 18:28:36 +0100 Subject: [PATCH 1/3] Do not queue tasks when the DAG goes missing Currently, if a dag goes missing, the scheduler continues to queue the task instances until the executor reports the tasks as failed and then the scheduler would now set the state properly. This change ensures that tasks are not queued when the dag file goes missing. Instead of waiting on the executor to fail the task without explicit reason, the task fails here with the reason why it failed. Thanks to this, the Pool's queued slots will be freed for other tasks to be queued fixup! Do not queue tasks when the DAG goes missing add tests fixup! fixup! Do not queue tasks when the DAG goes missing Change implementation to check at regular interval instead of at every loop fixup! Change implementation to check at regular interval instead of at every loop fixup! fixup! Change implementation to check at regular interval instead of at every loop fixup! fixup! fixup! Change implementation to check at regular interval instead of at every loop change configuration name apply review suggestions fixup! apply review suggestions add has_dag method to DagBag and improve missing dag fail method in scheduler Update airflow/jobs/scheduler_job.py Co-authored-by: Kaxil Naik Fix dag_bag.has_dag behaviour fixup! Fix dag_bag.has_dag behaviour Apply suggestions from code review Co-authored-by: Kaxil Naik fixup! Apply suggestions from code review fixup! fixup! Apply suggestions from code review fixup! Do not queue tasks when the DAG goes missing --- airflow/config_templates/config.yml | 8 ++++ airflow/config_templates/default_airflow.cfg | 4 ++ airflow/jobs/scheduler_job.py | 36 +++++++++++++- airflow/models/dagbag.py | 28 +++++++++++ airflow/utils/state.py | 1 + tests/jobs/test_scheduler_job.py | 49 ++++++++++++++++++++ tests/models/test_dagbag.py | 33 +++++++++++++ 7 files changed, 158 insertions(+), 1 deletion(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 3e083d4b90b70..a1cd4c3635b10 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1709,6 +1709,14 @@ type: float example: ~ default: "15.0" + - name: clean_tis_without_dag + description: | + How often (in seconds) to check and fail task instances and dagruns whose corresponding + DAG is missing + version_added: 2.1.1 + type: float + example: ~ + default: "15.0" - name: scheduler_heartbeat_sec description: | The scheduler constantly tries to trigger new tasks (look at the diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index c03e116746f96..78998efa7d7eb 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -854,6 +854,10 @@ job_heartbeat_sec = 5 # that no longer have a matching DagRun clean_tis_without_dagrun_interval = 15.0 +# How often (in seconds) to check and fail task instances and dagruns whose corresponding +# DAG is missing +clean_tis_without_dag = 15.0 + # The scheduler constantly tries to trigger new tasks (look at the # scheduler section in the docs for more information). This defines # how often the scheduler should run (in seconds). diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index c2b3f2c8ae59b..980bf5b8c1804 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -549,7 +549,6 @@ def _critical_section_execute_task_instances(self, session: Session) -> int: else: max_tis = min(self.max_tis_per_query, self.executor.slots_available) queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) - self._enqueue_task_instances_with_queued_state(queued_tis) return len(queued_tis) @@ -733,6 +732,10 @@ def _run_scheduler_loop(self) -> None: conf.getfloat('scheduler', 'clean_tis_without_dagrun_interval', fallback=15.0), self._clean_tis_without_dagrun, ) + timers.call_regular_interval( + conf.getfloat('scheduler', 'clean_tis_without_dag', fallback=15.0), + self._clean_tis_without_dag, + ) for loop_count in itertools.count(start=1): with Stats.timer() as timer: @@ -807,6 +810,37 @@ def _clean_tis_without_dagrun(self, session): raise guard.commit() + @provide_session + def _clean_tis_without_dag(self, session: Session = None): + """Fails task instances and DagRuns of DAGs that no longer exist in the dagbag/DB""" + states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN]) + tis = session.query(TI).filter(TI.state.in_(states_to_check)).all() + missing_dags = {} + dag_runs = set() + + for ti in tis: + if ti.dag_id in missing_dags: + ti.set_state(State.FAILED, session=session) + continue + # Dag no longer in dagbag? + if not self.dagbag.has_dag(ti.dag_id, session=session): + ti.set_state(State.FAILED, session=session) + dag_runs.add(ti.dag_run) + missing_dags[ti.dag_id] = [ti] + continue + if missing_dags: + self.log.warning( + "The following Dags are missing, therefore the DAG's " + "task instances have been set to failed: \t\n%s", + [ + (f"Missing DAGs: {dag_name}", f"Failed TaskInstances: [{ti}]") + for dag_name, ti in missing_dags.items() + ], + ) + self.log.warning("Failing the corresponding DagRuns of the missing DAGs. DagRuns: %s", dag_runs) + for dr in dag_runs: + dr.set_state(State.FAILED) + def _do_scheduling(self, session) -> int: """ This function is where the main scheduling decisions take places. It: diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index bd339eec348ad..cdc65af8ac81a 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -166,6 +166,34 @@ def dag_ids(self) -> List[str]: """ return list(self.dags.keys()) + @provide_session + def has_dag(self, dag_id: str, session: Session = None): + """ + Checks the "local" cache, if it exists, and is not older than the configured cache time, return True + else check in the DB if the dag exists, return True, False otherwise + """ + from airflow.models.serialized_dag import SerializedDagModel + + sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime( + dag_id=dag_id, + session=session, + ) + sd_has_dag = sd_last_updated_datetime is not None + if dag_id not in self.dags: + return sd_has_dag + if dag_id not in self.dags_last_fetched: + return sd_has_dag + min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL) + if timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs: + return sd_has_dag + if sd_has_dag: + return True + self.log.warning("Serialized DAG %s no longer exists", dag_id) + del self.dags[dag_id] + del self.dags_last_fetched[dag_id] + del self.dags_hash[dag_id] + return False + @provide_session def get_dag(self, dag_id, session: Session = None): """ diff --git a/airflow/utils/state.py b/airflow/utils/state.py index d5300e11b6a9f..7e98299bd0dc7 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -107,6 +107,7 @@ def color_fg(cls, state): FAILED, SKIPPED, UPSTREAM_FAILED, + REMOVED, ] ) """ diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 87be3e8d16638..1eeccfea266df 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1475,6 +1475,55 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun( assert ti.start_date == ti.end_date assert ti.duration is not None + def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self): + """This tests that if dags are missing in dagbag, then it should be failed""" + session = settings.Session() + dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag' + dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'}) + + with dag: + op1 = BashOperator(task_id='op1', bash_command='sleep 5') + + # Write Dag to DB + dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False) + dagbag.bag_dag(dag, root_dag=dag) + dagbag.sync_to_db() + + # Mock dagbag.has_dag + dagbag.has_dag = mock.MagicMock(return_value=False) + + dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id) + dr = dag.create_dagrun( + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + execution_date=DEFAULT_DATE + timedelta(days=1), + start_date=DEFAULT_DATE + timedelta(days=1), + session=session, + ) + ti = dr.get_task_instance(task_id=op1.task_id, session=session) + ti.state = State.RUNNING + session.commit() + + # This poll interval is large, bug the scheduler doesn't sleep that + # long, instead we hit the clean_tis_without_dag interval instead + self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30) + self.scheduler_job.dagbag = dagbag + executor = MockExecutor(do_update=False) + self.scheduler_job.executor = executor + processor = mock.MagicMock() + processor.done = False + self.scheduler_job.processor_agent = processor + + with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars( + {('scheduler', 'clean_tis_without_dag'): '0.001'} + ): + self.scheduler_job._run_scheduler_loop() + + ti = dr.get_task_instance(task_id=op1.task_id, session=session) + assert ti.state == State.FAILED + assert ti.start_date is not None + assert ti.dag_run.state == State.FAILED + @mock.patch('airflow.jobs.scheduler_job.DagFileProcessorAgent') def test_executor_end_called(self, mock_processor_agent): """ diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index b4edc0c7a4f0e..4aa9c754a2a35 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self): assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"} assert updated_ser_dag_1_update_time > ser_dag_1_update_time + @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5) + def test_has_dag_method(self): + """Test has_dag method""" + dag_id = "example_bash_operator" + with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)): + example_bash_op_dag = DagBag(include_examples=True).dags.get(dag_id) + SerializedDagModel.write_dag(dag=example_bash_op_dag) + + dag_bag = DagBag(read_dags_from_db=True) + dag_bag.get_dag(dag_id) # Add dag to self.dags + assert dag_bag.has_dag(dag_id) + assert not dag_bag.has_dag("non_added_dag_id") + + # Check that min_serialized_dag_fetch_interval has passed but + # dag is in SerializedDagModel and we return True + with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 6)): + assert dag_bag.has_dag(dag_id) + + # We remove the dag from dag_bag.dags and ensure it returns True + dag_bag.dags.pop(dag_id) + assert not dag_bag.dags.get(dag_id) + assert dag_bag.has_dag(dag_id) + + # We removed dag_id from dag_bag.dags, let's add it back + dag_bag.get_dag(dag_id) + # Remove the dag from SerializedDagModel + SerializedDagModel.remove_dag(dag_id=dag_id) + with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 8)): + assert dag_bag.dags[dag_id] + # Even though that we have dag in dag_bag.dags + # it's no longer in SerializedDagModel and we should return False + assert not dag_bag.has_dag(dag_id) + def test_collect_dags_from_db(self): """DAGs are collected from Database""" example_dags_folder = airflow.example_dags.__path__[0] From 93ae6bf39e93d499c309aba8a795ad3e172f7115 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 28 Jun 2021 21:10:14 +0100 Subject: [PATCH 2/3] fixup! Do not queue tasks when the DAG goes missing --- airflow/config_templates/config.yml | 2 +- airflow/config_templates/default_airflow.cfg | 2 +- airflow/jobs/scheduler_job.py | 2 +- tests/jobs/test_scheduler_job.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index a1cd4c3635b10..634e01002084e 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1709,7 +1709,7 @@ type: float example: ~ default: "15.0" - - name: clean_tis_without_dag + - name: clean_tis_without_dag_interval description: | How often (in seconds) to check and fail task instances and dagruns whose corresponding DAG is missing diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 78998efa7d7eb..5216707851b0d 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -856,7 +856,7 @@ clean_tis_without_dagrun_interval = 15.0 # How often (in seconds) to check and fail task instances and dagruns whose corresponding # DAG is missing -clean_tis_without_dag = 15.0 +clean_tis_without_dag_interval = 15.0 # The scheduler constantly tries to trigger new tasks (look at the # scheduler section in the docs for more information). This defines diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 980bf5b8c1804..b1e12b8f066ed 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -733,7 +733,7 @@ def _run_scheduler_loop(self) -> None: self._clean_tis_without_dagrun, ) timers.call_regular_interval( - conf.getfloat('scheduler', 'clean_tis_without_dag', fallback=15.0), + conf.getfloat('scheduler', 'clean_tis_without_dag_interval', fallback=15.0), self._clean_tis_without_dag, ) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 1eeccfea266df..bca1adbc6f9b1 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1515,7 +1515,7 @@ def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self): self.scheduler_job.processor_agent = processor with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars( - {('scheduler', 'clean_tis_without_dag'): '0.001'} + {('scheduler', 'clean_tis_without_dag_interval'): '0.001'} ): self.scheduler_job._run_scheduler_loop() From 9c4a894a158b69b576e4d801a037edf0ebd0cf19 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 5 Jul 2021 12:31:30 +0100 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Ash Berlin-Taylor --- airflow/config_templates/config.yml | 2 +- airflow/jobs/scheduler_job.py | 2 +- tests/models/test_dagbag.py | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 634e01002084e..f1de53417eec5 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1713,7 +1713,7 @@ description: | How often (in seconds) to check and fail task instances and dagruns whose corresponding DAG is missing - version_added: 2.1.1 + version_added: 2.1.2 type: float example: ~ default: "15.0" diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index b1e12b8f066ed..f366be44b22e3 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -839,7 +839,7 @@ def _clean_tis_without_dag(self, session: Session = None): ) self.log.warning("Failing the corresponding DagRuns of the missing DAGs. DagRuns: %s", dag_runs) for dr in dag_runs: - dr.set_state(State.FAILED) + dr.state = State.FAILED def _do_scheduling(self, session) -> int: """ diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 4aa9c754a2a35..6280a07ddc5df 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -879,7 +879,6 @@ def test_has_dag_method(self): # We remove the dag from dag_bag.dags and ensure it returns True dag_bag.dags.pop(dag_id) - assert not dag_bag.dags.get(dag_id) assert dag_bag.has_dag(dag_id) # We removed dag_id from dag_bag.dags, let's add it back