Skip to content

Commit

Permalink
Fix zombie task handling with multiple schedulers (apache#24906)
Browse files Browse the repository at this point in the history
Each scheduler was looking at all running tasks for zombies, leading to
multiple schedulers handling the zombies. This causes problems with
retries (e.g. being marked as FAILED instead of UP_FOR_RETRY) and
callbacks (e.g. `on_failure_callback` being called multiple times).

When the second scheduler tries to determine if the task is able to be retried,
and it's already in UP_FOR_RETRY (the first scheduler already finished),
it sees the "next" try_number (as it's no longer running),
which then leads it to be FAILED instead.

The easy fix is to simply restrict each scheduler to its own TIs, as
orphaned running TIs will be adopted anyways.

(cherry picked from commit 1c0d0a5)
  • Loading branch information
jedcunningham authored and ephraimbuddy committed Jul 11, 2022
1 parent ac1defe commit a4bd397
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
4 changes: 3 additions & 1 deletion airflow/jobs/scheduler_job.py
Expand Up @@ -1339,7 +1339,8 @@ def check_trigger_timeouts(self, session: Session = None):
def _find_zombies(self, session):
"""
Find zombie task instances, which are tasks haven't heartbeated for too long
and update the current zombie list.
or have a no-longer-running LocalTaskJob, and send them off to the DAG processor
to be handled.
"""
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
Expand All @@ -1355,6 +1356,7 @@ def _find_zombies(self, session):
LocalTaskJob.latest_heartbeat < limit_dttm,
)
)
.filter(TaskInstance.queued_by_job_id == self.id)
.all()
)

Expand Down
35 changes: 23 additions & 12 deletions tests/jobs/test_scheduler_job.py
Expand Up @@ -3856,7 +3856,6 @@ def test_find_zombies(self):
session.query(LocalTaskJob).delete()
dag = dagbag.get_dag('example_branch_operator')
dag.sync_to_db()
task = dag.get_task(task_id='run_this_first')

dag_run = dag.create_dagrun(
state=DagRunState.RUNNING,
Expand All @@ -3865,21 +3864,33 @@ def test_find_zombies(self):
session=session,
)

ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
local_job = LocalTaskJob(ti)
local_job.state = State.SHUTDOWN

session.add(local_job)
session.flush()

ti.job_id = local_job.id
session.add(ti)
session.flush()

self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.executor = MockExecutor()
self.scheduler_job.processor_agent = mock.MagicMock()

# We will provision 2 tasks so we can check we only find zombies from this scheduler
tasks_to_setup = ['branching', 'run_this_first']

for task_id in tasks_to_setup:
task = dag.get_task(task_id=task_id)
ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
ti.queued_by_job_id = 999

local_job = LocalTaskJob(ti)
local_job.state = State.SHUTDOWN

session.add(local_job)
session.flush()

ti.job_id = local_job.id
session.add(ti)
session.flush()

assert task.task_id == 'run_this_first' # Make sure we have the task/ti we expect

ti.queued_by_job_id = self.scheduler_job.id
session.flush()

self.scheduler_job._find_zombies(session=session)

self.scheduler_job.executor.callback_sink.send.assert_called_once()
Expand Down

0 comments on commit a4bd397

Please sign in to comment.