Skip to content

Commit

Permalink
Fix Deferrable stuck as "scheduled" during backfill (#26205)
Browse files Browse the repository at this point in the history
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
(cherry picked from commit 3396d1f)
  • Loading branch information
ephraimbuddy authored and jedcunningham committed Sep 23, 2022
1 parent 5ae47cd commit 53f0ffd
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
6 changes: 6 additions & 0 deletions airflow/jobs/backfill_job.py
Expand Up @@ -217,6 +217,12 @@ def _update_counters(self, ti_status, session=None):
tis_to_be_scheduled.append(ti)
ti_status.running.pop(reduced_key)
ti_status.to_run[ti.key] = ti
# special case: Deferrable task can go from DEFERRED to SCHEDULED;
# when that happens, we need to put it back as in UP_FOR_RESCHEDULE
elif ti.state == TaskInstanceState.SCHEDULED:
self.log.debug("Task instance %s is resumed from deferred state", ti)
ti_status.running.pop(ti.key)
ti_status.to_run[ti.key] = ti

# Batch schedule of task instances
if tis_to_be_scheduled:
Expand Down
24 changes: 24 additions & 0 deletions tests/jobs/test_backfill_job.py
Expand Up @@ -1415,6 +1415,30 @@ def test_update_counters(self, dag_maker, session):

ti_status.to_run.clear()

# test for scheduled
ti.set_state(State.SCHEDULED)
# Deferred tasks are put into scheduled by the triggerer
# Check that they are put into to_run
ti_status.running[ti.key] = ti
job._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 0
assert len(ti_status.skipped) == 0
assert len(ti_status.failed) == 0
assert len(ti_status.to_run) == 1

ti_status.to_run.clear()
# test for deferred
# if a task is deferred and it's not yet time for the triggerer
# to reschedule it, we should leave it in ti_status.running
ti.set_state(State.DEFERRED)
ti_status.running[ti.key] = ti
job._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 1
assert len(ti_status.succeeded) == 0
assert len(ti_status.skipped) == 0
assert len(ti_status.failed) == 0
assert len(ti_status.to_run) == 0
session.close()

def test_dag_dagrun_infos_between(self, dag_maker):
Expand Down

0 comments on commit 53f0ffd

Please sign in to comment.