Skip to content

Commit

Permalink
Fix Scheduler crash when executing task instances of missing DAG (#20349
Browse files Browse the repository at this point in the history
)

When executing task instances, we do not check if the dag is missing in
the dagbag. This PR fixes it by ignoring task instances if we can't find
the dag in serialized dag table

Closes: #20099
(cherry picked from commit 9871576)
  • Loading branch information
ephraimbuddy authored and jedcunningham committed Feb 10, 2022
1 parent 4852b81 commit cd97ecc
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
11 changes: 11 additions & 0 deletions airflow/jobs/scheduler_job.py
Expand Up @@ -375,6 +375,17 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
# Many dags don't have a task_concurrency, so where we can avoid loading the full
# serialized DAG the better.
serialized_dag = self.dagbag.get_dag(dag_id, session=session)
# If the dag is missing, fail the task and continue to the next task.
if not serialized_dag:
self.log.error(
"DAG '%s' for task instance %s not found in serialized_dag table",
dag_id,
task_instance,
)
session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update(
{TI.state: State.FAILED}, synchronize_session='fetch'
)
continue
if serialized_dag.has_task(task_instance.task_id):
task_concurrency_limit = serialized_dag.get_task(
task_instance.task_id
Expand Down
28 changes: 28 additions & 0 deletions tests/jobs/test_scheduler_job.py
Expand Up @@ -610,6 +610,34 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker):
session.rollback()
session.close()

def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
"""Check that task instances of missing DAGs are failed"""
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag'
task_id_1 = 'dummy'
task_id_2 = 'dummydummy'

with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 1}):
DummyOperator(task_id=task_id_1)
DummyOperator(task_id=task_id_2)

self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.dagbag = mock.MagicMock()
self.scheduler_job.dagbag.get_dag.return_value = None

dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)

tis = dr.task_instances
for ti in tis:
ti.state = State.SCHEDULED
session.merge(ti)
session.flush()
res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
session.flush()
assert 0 == len(res)
tis = dr.get_task_instances(session=session)
assert len(tis) == 2
assert all(ti.state == State.FAILED for ti in tis)

def test_nonexistent_pool(self, dag_maker):
dag_id = 'SchedulerJobTest.test_nonexistent_pool'
with dag_maker(dag_id=dag_id, max_active_tasks=16):
Expand Down

0 comments on commit cd97ecc

Please sign in to comment.