From b7afa6725369f86cabdebc5e191a7ce96ceb1958 Mon Sep 17 00:00:00 2001 From: Anthony Panat Date: Sat, 1 Oct 2022 23:33:59 -0400 Subject: [PATCH] Retry on Airflow Schedule DAG Run DB Deadlock (#26347) Co-authored-by: Anthony Panat Co-authored-by: Anthony Panat (cherry picked from commit 0da49935000476b1d1941b63d0d66d3c58d64fea) --- airflow/jobs/scheduler_job.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 53a96cf9acdbd..74541a1cf8b9f 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -943,12 +943,7 @@ def _do_scheduling(self, session: Session) -> int: # Bulk fetch the currently active dag runs for the dags we are # examining, rather than making one query per DagRun - callback_tuples = [] - for dag_run in dag_runs: - callback_to_run = self._schedule_dag_run(dag_run, session) - callback_tuples.append((dag_run, callback_to_run)) - - guard.commit() + callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session) # Send the callbacks after we commit to ensure the context is up to date when it gets run for dag_run, callback_to_run in callback_tuples: @@ -1232,6 +1227,18 @@ def _update_state(dag: DAG, dag_run: DagRun): active_runs_of_dags[dag_run.dag_id] += 1 _update_state(dag, dag_run) + @retry_db_transaction + def _schedule_all_dag_runs(self, guard, dag_runs, session): + """Makes scheduling decisions for all `dag_runs`""" + callback_tuples = [] + for dag_run in dag_runs: + callback_to_run = self._schedule_dag_run(dag_run, session) + callback_tuples.append((dag_run, callback_to_run)) + + guard.commit() + + return callback_tuples + def _schedule_dag_run( self, dag_run: DagRun,