Skip to content

Commit

Permalink
Retry on Airflow Schedule DAG Run DB Deadlock (#26347)
Browse files Browse the repository at this point in the history
Co-authored-by: Anthony Panat <anthonypanat@Anthonys-MacBook-Pro-2.local>
Co-authored-by: Anthony Panat <anthonypanat@anthonys-mbp-2.mynetworksettings.com>
(cherry picked from commit 0da4993)
  • Loading branch information
anthonyp97 authored and ephraimbuddy committed Oct 18, 2022
1 parent b6c8fd6 commit b7afa67
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions airflow/jobs/scheduler_job.py
Expand Up @@ -943,12 +943,7 @@ def _do_scheduling(self, session: Session) -> int:
# Bulk fetch the currently active dag runs for the dags we are
# examining, rather than making one query per DagRun

callback_tuples = []
for dag_run in dag_runs:
callback_to_run = self._schedule_dag_run(dag_run, session)
callback_tuples.append((dag_run, callback_to_run))

guard.commit()
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)

# Send the callbacks after we commit to ensure the context is up to date when it gets run
for dag_run, callback_to_run in callback_tuples:
Expand Down Expand Up @@ -1232,6 +1227,18 @@ def _update_state(dag: DAG, dag_run: DagRun):
active_runs_of_dags[dag_run.dag_id] += 1
_update_state(dag, dag_run)

@retry_db_transaction
def _schedule_all_dag_runs(self, guard, dag_runs, session):
"""Makes scheduling decisions for all `dag_runs`"""
callback_tuples = []
for dag_run in dag_runs:
callback_to_run = self._schedule_dag_run(dag_run, session)
callback_tuples.append((dag_run, callback_to_run))

guard.commit()

return callback_tuples

def _schedule_dag_run(
self,
dag_run: DagRun,
Expand Down

0 comments on commit b7afa67

Please sign in to comment.