diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 2fedf807958c2..490d5077be66e 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -375,6 +375,17 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = # Many dags don't have a task_concurrency, so where we can avoid loading the full # serialized DAG the better. serialized_dag = self.dagbag.get_dag(dag_id, session=session) + # If the dag is missing, fail the task and continue to the next task. + if not serialized_dag: + self.log.error( + "DAG '%s' for task instance %s not found in serialized_dag table", + dag_id, + task_instance, + ) + session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update( + {TI.state: State.FAILED}, synchronize_session='fetch' + ) + continue if serialized_dag.has_task(task_instance.task_id): task_concurrency_limit = serialized_dag.get_task( task_instance.task_id diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index a308029a7da1c..718572004016e 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -610,6 +610,34 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker): session.rollback() session.close() + def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session): + """Check that task instances of missing DAGs are failed""" + dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag' + task_id_1 = 'dummy' + task_id_2 = 'dummydummy' + + with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 1}): + DummyOperator(task_id=task_id_1) + DummyOperator(task_id=task_id_2) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.dagbag = mock.MagicMock() + self.scheduler_job.dagbag.get_dag.return_value = None + + dr = dag_maker.create_dagrun(state=DagRunState.RUNNING) + + tis = dr.task_instances + for ti in tis: + ti.state = State.SCHEDULED + session.merge(ti) + session.flush() + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session) + session.flush() + assert 0 == len(res) + tis = dr.get_task_instances(session=session) + assert len(tis) == 2 + assert all(ti.state == State.FAILED for ti in tis) + def test_nonexistent_pool(self, dag_maker): dag_id = 'SchedulerJobTest.test_nonexistent_pool' with dag_maker(dag_id=dag_id, max_active_tasks=16):