diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index cabc142f31708..849498c1e4c6e 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -98,6 +98,7 @@ def __init__( read_dags_from_db: bool = False, store_serialized_dags: bool | None = None, load_op_links: bool = True, + collect_dags: bool = True, ): # Avoid circular import from airflow.models.dag import DAG @@ -137,11 +138,12 @@ def __init__( self.dagbag_import_error_tracebacks = conf.getboolean('core', 'dagbag_import_error_tracebacks') self.dagbag_import_error_traceback_depth = conf.getint('core', 'dagbag_import_error_traceback_depth') - self.collect_dags( - dag_folder=dag_folder, - include_examples=include_examples, - safe_mode=safe_mode, - ) + if collect_dags: + self.collect_dags( + dag_folder=dag_folder, + include_examples=include_examples, + safe_mode=safe_mode, + ) # Should the extra operator link be loaded via plugins? # This flag is set to False in Scheduler so that Extra Operator links are not loaded self.load_op_links = load_op_links diff --git a/airflow/utils/db.py b/airflow/utils/db.py index cec3d58c364b9..b2ff28f68ad86 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -819,8 +819,8 @@ def reserialize_dags(*, session: Session = NEW_SESSION) -> None: from airflow.models.serialized_dag import SerializedDagModel session.query(SerializedDagModel).delete(synchronize_session=False) - dagbag = DagBag() - dagbag.collect_dags(only_if_updated=False, safe_mode=False) + dagbag = DagBag(collect_dags=False) + dagbag.collect_dags(only_if_updated=False) dagbag.sync_to_db(session=session) diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index ed20d41ef01ca..f55a31400d26a 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -1060,3 +1060,16 @@ def test_dag_cluster_policy_obeyed(self): dagbag = DagBag(dag_folder=dag_file, include_examples=False) assert len(dagbag.dag_ids) == 0 assert "has no tags" in dagbag.import_errors[dag_file] + + def test_dagbag_dag_collection(self): + + dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False, collect_dags=False) + # since collect_dags is False, dagbag.dags should be empty + assert not dagbag.dags + + dagbag.collect_dags() + assert dagbag.dags + + # test that dagbag.dags is not empty if collect_dags is True + dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False) + assert dagbag.dags