From a200d5d67197bc3dfa110b7fce1ca0c1d634395e Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 12 Oct 2022 16:33:31 +0100 Subject: [PATCH 1/3] Remove double collection of dags in `airflow dags reserialize` We explicitly call dagbag.collect_dags after instantiating DagBag in airflow dags reserialize code. The method collect_dags is called on instantiation of the DagBag so calling it again means more processing of the same dags. Here, we use a variable to achieve the same needed effect on reserialization --- airflow/models/dagbag.py | 12 +++++++----- airflow/utils/db.py | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index cabc142f31708..849498c1e4c6e 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -98,6 +98,7 @@ def __init__( read_dags_from_db: bool = False, store_serialized_dags: bool | None = None, load_op_links: bool = True, + collect_dags: bool = True, ): # Avoid circular import from airflow.models.dag import DAG @@ -137,11 +138,12 @@ def __init__( self.dagbag_import_error_tracebacks = conf.getboolean('core', 'dagbag_import_error_tracebacks') self.dagbag_import_error_traceback_depth = conf.getint('core', 'dagbag_import_error_traceback_depth') - self.collect_dags( - dag_folder=dag_folder, - include_examples=include_examples, - safe_mode=safe_mode, - ) + if collect_dags: + self.collect_dags( + dag_folder=dag_folder, + include_examples=include_examples, + safe_mode=safe_mode, + ) # Should the extra operator link be loaded via plugins? # This flag is set to False in Scheduler so that Extra Operator links are not loaded self.load_op_links = load_op_links diff --git a/airflow/utils/db.py b/airflow/utils/db.py index f796156f8f96d..1ac799ddd256c 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -831,8 +831,8 @@ def reserialize_dags(*, session: Session = NEW_SESSION) -> None: from airflow.models.serialized_dag import SerializedDagModel session.query(SerializedDagModel).delete(synchronize_session=False) - dagbag = DagBag() - dagbag.collect_dags(only_if_updated=False, safe_mode=False) + dagbag = DagBag(collect_dags=False) + dagbag.collect_dags(only_if_updated=False, safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')) dagbag.sync_to_db(session=session) From dee6497cb733cc69f28364da5614309e1f9833ed Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 13 Oct 2022 13:14:59 +0100 Subject: [PATCH 2/3] add test for collect_dags arg --- tests/models/test_dagbag.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index ed20d41ef01ca..f55a31400d26a 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -1060,3 +1060,16 @@ def test_dag_cluster_policy_obeyed(self): dagbag = DagBag(dag_folder=dag_file, include_examples=False) assert len(dagbag.dag_ids) == 0 assert "has no tags" in dagbag.import_errors[dag_file] + + def test_dagbag_dag_collection(self): + + dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False, collect_dags=False) + # since collect_dags is False, dagbag.dags should be empty + assert not dagbag.dags + + dagbag.collect_dags() + assert dagbag.dags + + # test that dagbag.dags is not empty if collect_dags is True + dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False) + assert dagbag.dags From 6cf67cc473979c41dc16d7f58918377aa0d9f9c3 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 13 Oct 2022 13:18:22 +0100 Subject: [PATCH 3/3] Update airflow/utils/db.py Co-authored-by: Ash Berlin-Taylor --- airflow/utils/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 1ac799ddd256c..d5819257912b3 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -832,7 +832,7 @@ def reserialize_dags(*, session: Session = NEW_SESSION) -> None: session.query(SerializedDagModel).delete(synchronize_session=False) dagbag = DagBag(collect_dags=False) - dagbag.collect_dags(only_if_updated=False, safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')) + dagbag.collect_dags(only_if_updated=False) dagbag.sync_to_db(session=session)