Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't update backfill run from the scheduler #26342

Merged
merged 4 commits into from Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions airflow/jobs/scheduler_job.py
Expand Up @@ -774,7 +774,8 @@ def _execute(self) -> None:
self.log.exception("Exception when executing DagFileProcessorAgent.end")
self.log.info("Exited execute loop")

def _update_dag_run_state_for_paused_dags(self) -> None:
@provide_session
def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION) -> None:
try:
paused_dag_ids = DagModel.get_all_paused_dag_ids()
for dag_id in paused_dag_ids:
Expand All @@ -784,7 +785,11 @@ def _update_dag_run_state_for_paused_dags(self) -> None:
dag = SerializedDagModel.get_dag(dag_id)
if dag is None:
continue
dag_runs = DagRun.find(dag_id=dag_id, state=DagRunState.RUNNING)
dag_runs = session.query(DagRun).filter(
DagRun.dag_id == dag_id,
DagRun.state == DagRunState.RUNNING,
DagRun.run_type != DagRunType.BACKFILL_JOB,
)
for dag_run in dag_runs:
dag_run.dag = dag
_, callback_to_run = dag_run.update_state(execute_callbacks=False)
Expand Down
43 changes: 43 additions & 0 deletions tests/jobs/test_scheduler_job.py
Expand Up @@ -4439,6 +4439,49 @@ def test_catchup_works_correctly(self, dag_maker):
.scalar()
) > (timezone.utcnow() - timedelta(days=2))

def test_update_dagrun_state_for_paused_dag_not_for_backfill(self, dag_maker, session):
"""Test that the _update_dagrun_state_for_paused_dag does not affect backfilled dagruns"""

with dag_maker('testdag') as dag:
EmptyOperator(task_id='task1')

# Backfill run
backfill_run = dag_maker.create_dagrun(run_type=DagRunType.BACKFILL_JOB)
ti = backfill_run.get_task_instances()[0]
ti.set_state(TaskInstanceState.SUCCESS)
dm = DagModel.get_dagmodel(dag.dag_id)
dm.is_paused = True
session.merge(dm)
session.merge(ti)
session.flush()

# scheduled run
scheduled_run = dag_maker.create_dagrun(
execution_date=datetime.datetime(2022, 1, 1), run_type=DagRunType.SCHEDULED
)
ti = scheduled_run.get_task_instances()[0]
ti.set_state(TaskInstanceState.SUCCESS)
dm = DagModel.get_dagmodel(dag.dag_id)
dm.is_paused = True
session.merge(dm)
session.merge(ti)
session.flush()

assert dag.dag_id in DagModel.get_all_paused_dag_ids()
assert backfill_run.state == State.RUNNING
assert scheduled_run.state == State.RUNNING

self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.executor = MockExecutor()
self.scheduler_job._update_dag_run_state_for_paused_dags()
session.flush()

(backfill_run,) = DagRun.find(dag_id=dag.dag_id, run_type=DagRunType.BACKFILL_JOB, session=session)
assert backfill_run.state == State.RUNNING

(scheduled_run,) = DagRun.find(dag_id=dag.dag_id, run_type=DagRunType.SCHEDULED, session=session)
assert scheduled_run.state == State.SUCCESS


@pytest.mark.need_serialized_dag
def test_schedule_dag_run_with_upstream_skip(dag_maker, session):
Expand Down