From a4bd3973a6d55e5ea03090779e7a57153c739453 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Fri, 8 Jul 2022 10:49:12 -0600 Subject: [PATCH] Fix zombie task handling with multiple schedulers (#24906) Each scheduler was looking at all running tasks for zombies, leading to multiple schedulers handling the zombies. This causes problems with retries (e.g. being marked as FAILED instead of UP_FOR_RETRY) and callbacks (e.g. `on_failure_callback` being called multiple times). When the second scheduler tries to determine if the task is able to be retried, and it's already in UP_FOR_RETRY (the first scheduler already finished), it sees the "next" try_number (as it's no longer running), which then leads it to be FAILED instead. The easy fix is to simply restrict each scheduler to its own TIs, as orphaned running TIs will be adopted anyways. (cherry picked from commit 1c0d0a5d907ae447b7221200952b47b69f8f8e87) --- airflow/jobs/scheduler_job.py | 4 +++- tests/jobs/test_scheduler_job.py | 35 +++++++++++++++++++++----------- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 3440832275ef2..3332434838391 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1339,7 +1339,8 @@ def check_trigger_timeouts(self, session: Session = None): def _find_zombies(self, session): """ Find zombie task instances, which are tasks haven't heartbeated for too long - and update the current zombie list. + or have a no-longer-running LocalTaskJob, and send them off to the DAG processor + to be handled. """ self.log.debug("Finding 'running' jobs without a recent heartbeat") limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs) @@ -1355,6 +1356,7 @@ def _find_zombies(self, session): LocalTaskJob.latest_heartbeat < limit_dttm, ) ) + .filter(TaskInstance.queued_by_job_id == self.id) .all() ) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index de909ef6e69bd..3895ab6af8637 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3856,7 +3856,6 @@ def test_find_zombies(self): session.query(LocalTaskJob).delete() dag = dagbag.get_dag('example_branch_operator') dag.sync_to_db() - task = dag.get_task(task_id='run_this_first') dag_run = dag.create_dagrun( state=DagRunState.RUNNING, @@ -3865,21 +3864,33 @@ def test_find_zombies(self): session=session, ) - ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING) - local_job = LocalTaskJob(ti) - local_job.state = State.SHUTDOWN - - session.add(local_job) - session.flush() - - ti.job_id = local_job.id - session.add(ti) - session.flush() - self.scheduler_job = SchedulerJob(subdir=os.devnull) self.scheduler_job.executor = MockExecutor() self.scheduler_job.processor_agent = mock.MagicMock() + # We will provision 2 tasks so we can check we only find zombies from this scheduler + tasks_to_setup = ['branching', 'run_this_first'] + + for task_id in tasks_to_setup: + task = dag.get_task(task_id=task_id) + ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING) + ti.queued_by_job_id = 999 + + local_job = LocalTaskJob(ti) + local_job.state = State.SHUTDOWN + + session.add(local_job) + session.flush() + + ti.job_id = local_job.id + session.add(ti) + session.flush() + + assert task.task_id == 'run_this_first' # Make sure we have the task/ti we expect + + ti.queued_by_job_id = self.scheduler_job.id + session.flush() + self.scheduler_job._find_zombies(session=session) self.scheduler_job.executor.callback_sink.send.assert_called_once()