From 63e81accb69ae6d2afb3a744b5b57ac1353594a9 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 16 Dec 2021 14:49:51 +0100 Subject: [PATCH 1/9] Fix Scheduler crash when executing task instances of missing DAG When executing task instances, we do not check if the dag is missing in the dagbag. This PR fixes it by ignoring task instances if we can't find the dag in serialized dag table --- airflow/jobs/scheduler_job.py | 18 ++++++++++++++++++ tests/jobs/test_scheduler_job.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 6dc3b22857ab6..165664d9baf10 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -316,6 +316,15 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list) for task_instance in task_instances_to_examine: + # If the dag is no longer in the dagbag, don't bother + if not self.dagbag.get_dag(task_instance.dag_id, session=session): + self.log.error( + "DAG '%s' for taskinstance %s not found in serialized_dag table", + task_instance.dag_id, + task_instance, + ) + continue + pool_to_task_instances[task_instance.pool].append(task_instance) # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks. @@ -403,6 +412,15 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = # Many dags don't have a task_concurrency, so where we can avoid loading the full # serialized DAG the better. serialized_dag = self.dagbag.get_dag(dag_id, session=session) + # This check below may not be necessary because we have handled it before, + # but it's better to be safe than sorry. + if not serialized_dag: + self.log.error( + "DAG '%s' for taskinstance %s not found in serialized_dag table", + dag_id, + task_instance, + ) + continue if serialized_dag.has_task(task_instance.task_id): task_concurrency_limit = serialized_dag.get_task( task_instance.task_id diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index dd57c2dd8b26c..55ce05ef8def9 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -645,6 +645,34 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker): session.rollback() session.close() + def test_executable_task_instances_to_queued_logs_for_missing_dag_in_dagbag( + self, dag_maker, caplog, session + ): + """If DAG is no longer in dagbag, do not execute""" + dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag' + task_id_1 = 'dummy' + task_id_2 = 'dummydummy' + + with dag_maker(dag_id=dag_id, session=session): + DummyOperator(task_id=task_id_1) + DummyOperator(task_id=task_id_2) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.dagbag = mock.MagicMock() + self.scheduler_job.dagbag.get_dag.return_value = None + + dr = dag_maker.create_dagrun(state=DagRunState.RUNNING) + + tis = dr.task_instances + for ti in tis: + ti.state = State.SCHEDULED + session.merge(ti) + session.flush() + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session) + session.flush() + assert 0 == len(res) + assert f"DAG '{dag_id}' for taskinstance {tis[0]} not found in serialized_dag table" in caplog.text + def test_nonexistent_pool(self, dag_maker): dag_id = 'SchedulerJobTest.test_nonexistent_pool' with dag_maker(dag_id=dag_id, max_active_tasks=16): From 996ae4a2c001245dc428f230f11c0dcc3589f9cd Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 16 Dec 2021 21:51:02 +0100 Subject: [PATCH 2/9] fixup! Fix Scheduler crash when executing task instances of missing DAG --- airflow/jobs/scheduler_job.py | 12 +----------- tests/jobs/test_scheduler_job.py | 4 ++-- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 165664d9baf10..38fa75f974931 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -316,15 +316,6 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list) for task_instance in task_instances_to_examine: - # If the dag is no longer in the dagbag, don't bother - if not self.dagbag.get_dag(task_instance.dag_id, session=session): - self.log.error( - "DAG '%s' for taskinstance %s not found in serialized_dag table", - task_instance.dag_id, - task_instance, - ) - continue - pool_to_task_instances[task_instance.pool].append(task_instance) # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks. @@ -412,8 +403,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = # Many dags don't have a task_concurrency, so where we can avoid loading the full # serialized DAG the better. serialized_dag = self.dagbag.get_dag(dag_id, session=session) - # This check below may not be necessary because we have handled it before, - # but it's better to be safe than sorry. + # If the dag is missing, continue to the next task. if not serialized_dag: self.log.error( "DAG '%s' for taskinstance %s not found in serialized_dag table", diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 55ce05ef8def9..2e187c31644eb 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -648,12 +648,12 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker): def test_executable_task_instances_to_queued_logs_for_missing_dag_in_dagbag( self, dag_maker, caplog, session ): - """If DAG is no longer in dagbag, do not execute""" + """Only check concurrency for dag in dagbag""" dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag' task_id_1 = 'dummy' task_id_2 = 'dummydummy' - with dag_maker(dag_id=dag_id, session=session): + with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 1}): DummyOperator(task_id=task_id_1) DummyOperator(task_id=task_id_2) From b0c1ae54340a7f04afeb7650d93cd4bfcd40050a Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 12 Jan 2022 11:44:19 +0800 Subject: [PATCH 3/9] Split 'task instance' into two words --- airflow/jobs/scheduler_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 38fa75f974931..d09359e07c9a3 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -406,7 +406,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = # If the dag is missing, continue to the next task. if not serialized_dag: self.log.error( - "DAG '%s' for taskinstance %s not found in serialized_dag table", + "DAG '%s' for task instance %s not found in serialized_dag table", dag_id, task_instance, ) From 20e340848677da935c204e04db4680b2fe57631c Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 12 Jan 2022 09:23:26 +0100 Subject: [PATCH 4/9] apply suggestions from code review --- airflow/jobs/scheduler_job.py | 3 ++- tests/jobs/test_scheduler_job.py | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index d09359e07c9a3..f0eda4f61931b 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -403,13 +403,14 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = # Many dags don't have a task_concurrency, so where we can avoid loading the full # serialized DAG the better. serialized_dag = self.dagbag.get_dag(dag_id, session=session) - # If the dag is missing, continue to the next task. + # If the dag is missing, fail the task and continue to the next task. if not serialized_dag: self.log.error( "DAG '%s' for task instance %s not found in serialized_dag table", dag_id, task_instance, ) + task_instance.set_state(State.FAILED, session=session) continue if serialized_dag.has_task(task_instance.task_id): task_concurrency_limit = serialized_dag.get_task( diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 2e187c31644eb..ce718374cda8a 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -645,8 +645,8 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker): session.rollback() session.close() - def test_executable_task_instances_to_queued_logs_for_missing_dag_in_dagbag( - self, dag_maker, caplog, session + def test_executable_task_instances_to_queued_fails_task_for_missing_dag_in_dagbag( + self, dag_maker, session ): """Only check concurrency for dag in dagbag""" dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag' @@ -671,7 +671,7 @@ def test_executable_task_instances_to_queued_logs_for_missing_dag_in_dagbag( res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session) session.flush() assert 0 == len(res) - assert f"DAG '{dag_id}' for taskinstance {tis[0]} not found in serialized_dag table" in caplog.text + assert session.query(TaskInstance).filter(TaskInstance.state == State.FAILED).count() == 2 def test_nonexistent_pool(self, dag_maker): dag_id = 'SchedulerJobTest.test_nonexistent_pool' From b03f8cd64d38a57cabbb18bfda91a6202542ed34 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 12 Jan 2022 12:26:53 +0100 Subject: [PATCH 5/9] fixup! apply suggestions from code review --- tests/jobs/test_scheduler_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index ce718374cda8a..6af7f3c3688ed 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -648,7 +648,7 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker): def test_executable_task_instances_to_queued_fails_task_for_missing_dag_in_dagbag( self, dag_maker, session ): - """Only check concurrency for dag in dagbag""" + """Check that task instances of missing DAGs are failed""" dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag' task_id_1 = 'dummy' task_id_2 = 'dummydummy' From 7d9c0028288a717a3f31a8b2963afcc944747570 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 12 Jan 2022 12:59:13 +0100 Subject: [PATCH 6/9] fixup! fixup! apply suggestions from code review --- tests/jobs/test_scheduler_job.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 6af7f3c3688ed..0b1cafd4c13cb 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -671,7 +671,9 @@ def test_executable_task_instances_to_queued_fails_task_for_missing_dag_in_dagba res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session) session.flush() assert 0 == len(res) - assert session.query(TaskInstance).filter(TaskInstance.state == State.FAILED).count() == 2 + tis = dr.get_task_instances(session=session) + for ti in tis: + assert ti.state == State.FAILED def test_nonexistent_pool(self, dag_maker): dag_id = 'SchedulerJobTest.test_nonexistent_pool' From cfeb9ee8caa755b4176677628113e854d9561fb0 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 13 Jan 2022 07:51:30 +0100 Subject: [PATCH 7/9] Apply suggestions from code review Co-authored-by: Kaxil Naik --- tests/jobs/test_scheduler_job.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 0b1cafd4c13cb..ef05c40525cea 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -645,7 +645,7 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker): session.rollback() session.close() - def test_executable_task_instances_to_queued_fails_task_for_missing_dag_in_dagbag( + def test_queued_task_instances_fails_with_missing_dag( self, dag_maker, session ): """Check that task instances of missing DAGs are failed""" @@ -672,8 +672,8 @@ def test_executable_task_instances_to_queued_fails_task_for_missing_dag_in_dagba session.flush() assert 0 == len(res) tis = dr.get_task_instances(session=session) - for ti in tis: - assert ti.state == State.FAILED + assert len(tis) == 2 + assert all(ti.state == State.FAILED for ti in tis) def test_nonexistent_pool(self, dag_maker): dag_id = 'SchedulerJobTest.test_nonexistent_pool' From 4ca34f63f758449623ed81861adc24110e14825e Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 13 Jan 2022 08:23:53 +0100 Subject: [PATCH 8/9] set all scheduled tasks of the missing dag to NONE state --- airflow/jobs/scheduler_job.py | 4 +++- tests/jobs/test_scheduler_job.py | 6 ++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index f0eda4f61931b..befda1cfe98dc 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -410,7 +410,9 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = dag_id, task_instance, ) - task_instance.set_state(State.FAILED, session=session) + session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update( + {TI.state: State.NONE}, synchronize_session='fetch' + ) continue if serialized_dag.has_task(task_instance.task_id): task_concurrency_limit = serialized_dag.get_task( diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index ef05c40525cea..e97b7bea3ef49 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -645,9 +645,7 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker): session.rollback() session.close() - def test_queued_task_instances_fails_with_missing_dag( - self, dag_maker, session - ): + def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session): """Check that task instances of missing DAGs are failed""" dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag' task_id_1 = 'dummy' @@ -673,7 +671,7 @@ def test_queued_task_instances_fails_with_missing_dag( assert 0 == len(res) tis = dr.get_task_instances(session=session) assert len(tis) == 2 - assert all(ti.state == State.FAILED for ti in tis) + assert all(ti.state == State.NONE for ti in tis) def test_nonexistent_pool(self, dag_maker): dag_id = 'SchedulerJobTest.test_nonexistent_pool' From 3c5079fe88359a6f9f14bd9d8948fde4892f7e0c Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 13 Jan 2022 09:52:37 +0100 Subject: [PATCH 9/9] fail the tasks instances rather than set to none --- airflow/jobs/scheduler_job.py | 2 +- tests/jobs/test_scheduler_job.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index befda1cfe98dc..a69c0574f966e 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -411,7 +411,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = task_instance, ) session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update( - {TI.state: State.NONE}, synchronize_session='fetch' + {TI.state: State.FAILED}, synchronize_session='fetch' ) continue if serialized_dag.has_task(task_instance.task_id): diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index e97b7bea3ef49..8795e75505822 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -671,7 +671,7 @@ def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session): assert 0 == len(res) tis = dr.get_task_instances(session=session) assert len(tis) == 2 - assert all(ti.state == State.NONE for ti in tis) + assert all(ti.state == State.FAILED for ti in tis) def test_nonexistent_pool(self, dag_maker): dag_id = 'SchedulerJobTest.test_nonexistent_pool'