Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Scheduler crash when executing task instances of missing DAG #20349

Merged
merged 9 commits into from Jan 13, 2022
11 changes: 11 additions & 0 deletions airflow/jobs/scheduler_job.py
Expand Up @@ -403,6 +403,17 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
# Many dags don't have a task_concurrency, so where we can avoid loading the full
# serialized DAG the better.
serialized_dag = self.dagbag.get_dag(dag_id, session=session)
# If the dag is missing, fail the task and continue to the next task.
if not serialized_dag:
self.log.error(
"DAG '%s' for task instance %s not found in serialized_dag table",
dag_id,
task_instance,
)
session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update(
{TI.state: State.FAILED}, synchronize_session='fetch'
)
continue
if serialized_dag.has_task(task_instance.task_id):
task_concurrency_limit = serialized_dag.get_task(
task_instance.task_id
Expand Down
28 changes: 28 additions & 0 deletions tests/jobs/test_scheduler_job.py
Expand Up @@ -645,6 +645,34 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker):
session.rollback()
session.close()

def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
"""Check that task instances of missing DAGs are failed"""
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag'
task_id_1 = 'dummy'
task_id_2 = 'dummydummy'

with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 1}):
DummyOperator(task_id=task_id_1)
DummyOperator(task_id=task_id_2)

self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.dagbag = mock.MagicMock()
self.scheduler_job.dagbag.get_dag.return_value = None

dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)

tis = dr.task_instances
for ti in tis:
ti.state = State.SCHEDULED
session.merge(ti)
session.flush()
res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
session.flush()
assert 0 == len(res)
tis = dr.get_task_instances(session=session)
assert len(tis) == 2
assert all(ti.state == State.FAILED for ti in tis)

def test_nonexistent_pool(self, dag_maker):
dag_id = 'SchedulerJobTest.test_nonexistent_pool'
with dag_maker(dag_id=dag_id, max_active_tasks=16):
Expand Down