From 118c6aa70a04af966e6d666f8db52187a0e1ac75 Mon Sep 17 00:00:00 2001 From: Anthony Panat Date: Mon, 12 Sep 2022 11:47:48 -0400 Subject: [PATCH 1/4] wrap schedule dag run block in retry db --- airflow/jobs/scheduler_job.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index d8b8073674305..938ddeaaa70bb 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -925,12 +925,7 @@ def _do_scheduling(self, session: Session) -> int: # Bulk fetch the currently active dag runs for the dags we are # examining, rather than making one query per DagRun - callback_tuples = [] - for dag_run in dag_runs: - callback_to_run = self._schedule_dag_run(dag_run, session) - callback_tuples.append((dag_run, callback_to_run)) - - guard.commit() + callback_tuples, callback_to_run = self._schedule_all_dag_runs(guard, dag_runs, session) # Send the callbacks after we commit to ensure the context is up to date when it gets run for dag_run, callback_to_run in callback_tuples: @@ -1213,6 +1208,21 @@ def _update_state(dag: DAG, dag_run: DagRun): active_runs_of_dags[dag_run.dag_id] += 1 _update_state(dag, dag_run) + @retry_db_transaction + def _schedule_all_dag_runs(self, guard, dag_runs, session): + """ + Makes scheduling decisions for all `dag_runs`. + """ + callback_tuples = [] + callback_to_run = None + for dag_run in dag_runs: + callback_to_run = self._schedule_dag_run(dag_run, session) + callback_tuples.append((dag_run, callback_to_run)) + + guard.commit() + + return callback_tuples, callback_to_run + def _schedule_dag_run( self, dag_run: DagRun, From 726a3dfa641b2582cd33ab00174c2840f36680db Mon Sep 17 00:00:00 2001 From: Anthony Panat Date: Mon, 12 Sep 2022 11:47:48 -0400 Subject: [PATCH 2/4] wrap schedule dag run block in retry db --- airflow/jobs/scheduler_job.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 13543745054b5..8443cad83ad77 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -938,12 +938,7 @@ def _do_scheduling(self, session: Session) -> int: # Bulk fetch the currently active dag runs for the dags we are # examining, rather than making one query per DagRun - callback_tuples = [] - for dag_run in dag_runs: - callback_to_run = self._schedule_dag_run(dag_run, session) - callback_tuples.append((dag_run, callback_to_run)) - - guard.commit() + callback_tuples, callback_to_run = self._schedule_all_dag_runs(guard, dag_runs, session) # Send the callbacks after we commit to ensure the context is up to date when it gets run for dag_run, callback_to_run in callback_tuples: @@ -1227,6 +1222,21 @@ def _update_state(dag: DAG, dag_run: DagRun): active_runs_of_dags[dag_run.dag_id] += 1 _update_state(dag, dag_run) + @retry_db_transaction + def _schedule_all_dag_runs(self, guard, dag_runs, session): + """ + Makes scheduling decisions for all `dag_runs`. + """ + callback_tuples = [] + callback_to_run = None + for dag_run in dag_runs: + callback_to_run = self._schedule_dag_run(dag_run, session) + callback_tuples.append((dag_run, callback_to_run)) + + guard.commit() + + return callback_tuples, callback_to_run + def _schedule_dag_run( self, dag_run: DagRun, From 6624e7ca4bb52fcb185fe59faa1e4cb4db5d6a7f Mon Sep 17 00:00:00 2001 From: Anthony Panat Date: Mon, 19 Sep 2022 10:57:51 -0400 Subject: [PATCH 3/4] update docstring single line --- airflow/jobs/scheduler_job.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 8443cad83ad77..e69d861ecc3b9 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1224,9 +1224,7 @@ def _update_state(dag: DAG, dag_run: DagRun): @retry_db_transaction def _schedule_all_dag_runs(self, guard, dag_runs, session): - """ - Makes scheduling decisions for all `dag_runs`. - """ + """Makes scheduling decisions for all `dag_runs`""" callback_tuples = [] callback_to_run = None for dag_run in dag_runs: From 1d74134ea71c9e45b26d4bdbfb9cb34a7f58a042 Mon Sep 17 00:00:00 2001 From: Anthony Panat Date: Thu, 22 Sep 2022 16:30:38 -0400 Subject: [PATCH 4/4] remove callback_to_run from return --- airflow/jobs/scheduler_job.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index d486410e79aaf..74541a1cf8b9f 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -943,7 +943,7 @@ def _do_scheduling(self, session: Session) -> int: # Bulk fetch the currently active dag runs for the dags we are # examining, rather than making one query per DagRun - callback_tuples, callback_to_run = self._schedule_all_dag_runs(guard, dag_runs, session) + callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session) # Send the callbacks after we commit to ensure the context is up to date when it gets run for dag_run, callback_to_run in callback_tuples: @@ -1231,14 +1231,13 @@ def _update_state(dag: DAG, dag_run: DagRun): def _schedule_all_dag_runs(self, guard, dag_runs, session): """Makes scheduling decisions for all `dag_runs`""" callback_tuples = [] - callback_to_run = None for dag_run in dag_runs: callback_to_run = self._schedule_dag_run(dag_run, session) callback_tuples.append((dag_run, callback_to_run)) guard.commit() - return callback_tuples, callback_to_run + return callback_tuples def _schedule_dag_run( self,