Skip to content

Commit

Permalink
Remove double collection of dags in airflow dags reserialize (#27030)
Browse files Browse the repository at this point in the history
We explicitly call dagbag.collect_dags after instantiating DagBag in
airflow dags reserialize code.

The method collect_dags is called on instantiation
of the DagBag so calling it again means more processing of the same dags.

Here, we use a variable to achieve the same needed effect on reserialization

Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
  • Loading branch information
ephraimbuddy and ashb committed Oct 13, 2022
1 parent 66fe681 commit 36e2e43
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
12 changes: 7 additions & 5 deletions airflow/models/dagbag.py
Expand Up @@ -98,6 +98,7 @@ def __init__(
read_dags_from_db: bool = False,
store_serialized_dags: bool | None = None,
load_op_links: bool = True,
collect_dags: bool = True,
):
# Avoid circular import
from airflow.models.dag import DAG
Expand Down Expand Up @@ -137,11 +138,12 @@ def __init__(

self.dagbag_import_error_tracebacks = conf.getboolean('core', 'dagbag_import_error_tracebacks')
self.dagbag_import_error_traceback_depth = conf.getint('core', 'dagbag_import_error_traceback_depth')
self.collect_dags(
dag_folder=dag_folder,
include_examples=include_examples,
safe_mode=safe_mode,
)
if collect_dags:
self.collect_dags(
dag_folder=dag_folder,
include_examples=include_examples,
safe_mode=safe_mode,
)
# Should the extra operator link be loaded via plugins?
# This flag is set to False in Scheduler so that Extra Operator links are not loaded
self.load_op_links = load_op_links
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/db.py
Expand Up @@ -831,8 +831,8 @@ def reserialize_dags(*, session: Session = NEW_SESSION) -> None:
from airflow.models.serialized_dag import SerializedDagModel

session.query(SerializedDagModel).delete(synchronize_session=False)
dagbag = DagBag()
dagbag.collect_dags(only_if_updated=False, safe_mode=False)
dagbag = DagBag(collect_dags=False)
dagbag.collect_dags(only_if_updated=False)
dagbag.sync_to_db(session=session)


Expand Down
13 changes: 13 additions & 0 deletions tests/models/test_dagbag.py
Expand Up @@ -1060,3 +1060,16 @@ def test_dag_cluster_policy_obeyed(self):
dagbag = DagBag(dag_folder=dag_file, include_examples=False)
assert len(dagbag.dag_ids) == 0
assert "has no tags" in dagbag.import_errors[dag_file]

def test_dagbag_dag_collection(self):

dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False, collect_dags=False)
# since collect_dags is False, dagbag.dags should be empty
assert not dagbag.dags

dagbag.collect_dags()
assert dagbag.dags

# test that dagbag.dags is not empty if collect_dags is True
dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False)
assert dagbag.dags

0 comments on commit 36e2e43

Please sign in to comment.