Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry on Airflow Schedule DAG Run DB Deadlock #26347

Merged
merged 7 commits into from Oct 2, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 13 additions & 6 deletions airflow/jobs/scheduler_job.py
Expand Up @@ -943,12 +943,7 @@ def _do_scheduling(self, session: Session) -> int:
# Bulk fetch the currently active dag runs for the dags we are
# examining, rather than making one query per DagRun

callback_tuples = []
for dag_run in dag_runs:
callback_to_run = self._schedule_dag_run(dag_run, session)
callback_tuples.append((dag_run, callback_to_run))

guard.commit()
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)

# Send the callbacks after we commit to ensure the context is up to date when it gets run
for dag_run, callback_to_run in callback_tuples:
Expand Down Expand Up @@ -1232,6 +1227,18 @@ def _update_state(dag: DAG, dag_run: DagRun):
active_runs_of_dags[dag_run.dag_id] += 1
_update_state(dag, dag_run)

@retry_db_transaction
def _schedule_all_dag_runs(self, guard, dag_runs, session):
"""Makes scheduling decisions for all `dag_runs`"""
callback_tuples = []
for dag_run in dag_runs:
callback_to_run = self._schedule_dag_run(dag_run, session)
callback_tuples.append((dag_run, callback_to_run))

guard.commit()

return callback_tuples

def _schedule_dag_run(
self,
dag_run: DagRun,
Expand Down