diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py index 4a2d3e7bd4ad3..62bc8c2bb308f 100644 --- a/airflow/jobs/backfill_job.py +++ b/airflow/jobs/backfill_job.py @@ -217,6 +217,12 @@ def _update_counters(self, ti_status, session=None): tis_to_be_scheduled.append(ti) ti_status.running.pop(reduced_key) ti_status.to_run[ti.key] = ti + # special case: Deferrable task can go from DEFERRED to SCHEDULED; + # when that happens, we need to put it back as in UP_FOR_RESCHEDULE + elif ti.state == TaskInstanceState.SCHEDULED: + self.log.debug("Task instance %s is resumed from deferred state", ti) + ti_status.running.pop(ti.key) + ti_status.to_run[ti.key] = ti # Batch schedule of task instances if tis_to_be_scheduled: diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index 31ea238f5e963..18fa630835a50 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -1415,6 +1415,30 @@ def test_update_counters(self, dag_maker, session): ti_status.to_run.clear() + # test for scheduled + ti.set_state(State.SCHEDULED) + # Deferred tasks are put into scheduled by the triggerer + # Check that they are put into to_run + ti_status.running[ti.key] = ti + job._update_counters(ti_status=ti_status, session=session) + assert len(ti_status.running) == 0 + assert len(ti_status.succeeded) == 0 + assert len(ti_status.skipped) == 0 + assert len(ti_status.failed) == 0 + assert len(ti_status.to_run) == 1 + + ti_status.to_run.clear() + # test for deferred + # if a task is deferred and it's not yet time for the triggerer + # to reschedule it, we should leave it in ti_status.running + ti.set_state(State.DEFERRED) + ti_status.running[ti.key] = ti + job._update_counters(ti_status=ti_status, session=session) + assert len(ti_status.running) == 1 + assert len(ti_status.succeeded) == 0 + assert len(ti_status.skipped) == 0 + assert len(ti_status.failed) == 0 + assert len(ti_status.to_run) == 0 session.close() def test_dag_dagrun_infos_between(self, dag_maker):