From cd97eccb9d5085ad76bbcca8a574699334b44e03 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 13 Jan 2022 22:23:10 +0100 Subject: [PATCH] Fix Scheduler crash when executing task instances of missing DAG (#20349) When executing task instances, we do not check if the dag is missing in the dagbag. This PR fixes it by ignoring task instances if we can't find the dag in serialized dag table Closes: #20099 (cherry picked from commit 98715760f72e5205c291293088b5e79636884491) --- airflow/jobs/scheduler_job.py | 11 +++++++++++ tests/jobs/test_scheduler_job.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 2fedf807958c2..490d5077be66e 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -375,6 +375,17 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = # Many dags don't have a task_concurrency, so where we can avoid loading the full # serialized DAG the better. serialized_dag = self.dagbag.get_dag(dag_id, session=session) + # If the dag is missing, fail the task and continue to the next task. + if not serialized_dag: + self.log.error( + "DAG '%s' for task instance %s not found in serialized_dag table", + dag_id, + task_instance, + ) + session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update( + {TI.state: State.FAILED}, synchronize_session='fetch' + ) + continue if serialized_dag.has_task(task_instance.task_id): task_concurrency_limit = serialized_dag.get_task( task_instance.task_id diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index a308029a7da1c..718572004016e 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -610,6 +610,34 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker): session.rollback() session.close() + def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session): + """Check that task instances of missing DAGs are failed""" + dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag' + task_id_1 = 'dummy' + task_id_2 = 'dummydummy' + + with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 1}): + DummyOperator(task_id=task_id_1) + DummyOperator(task_id=task_id_2) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.dagbag = mock.MagicMock() + self.scheduler_job.dagbag.get_dag.return_value = None + + dr = dag_maker.create_dagrun(state=DagRunState.RUNNING) + + tis = dr.task_instances + for ti in tis: + ti.state = State.SCHEDULED + session.merge(ti) + session.flush() + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session) + session.flush() + assert 0 == len(res) + tis = dr.get_task_instances(session=session) + assert len(tis) == 2 + assert all(ti.state == State.FAILED for ti in tis) + def test_nonexistent_pool(self, dag_maker): dag_id = 'SchedulerJobTest.test_nonexistent_pool' with dag_maker(dag_id=dag_id, max_active_tasks=16):