Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove double collection of dags in airflow dags reserialize #27030

Merged
merged 3 commits into from Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 7 additions & 5 deletions airflow/models/dagbag.py
Expand Up @@ -98,6 +98,7 @@ def __init__(
read_dags_from_db: bool = False,
store_serialized_dags: bool | None = None,
load_op_links: bool = True,
collect_dags: bool = True,
):
# Avoid circular import
from airflow.models.dag import DAG
Expand Down Expand Up @@ -137,11 +138,12 @@ def __init__(

self.dagbag_import_error_tracebacks = conf.getboolean('core', 'dagbag_import_error_tracebacks')
self.dagbag_import_error_traceback_depth = conf.getint('core', 'dagbag_import_error_traceback_depth')
self.collect_dags(
dag_folder=dag_folder,
include_examples=include_examples,
safe_mode=safe_mode,
)
if collect_dags:
self.collect_dags(
dag_folder=dag_folder,
include_examples=include_examples,
safe_mode=safe_mode,
)
# Should the extra operator link be loaded via plugins?
# This flag is set to False in Scheduler so that Extra Operator links are not loaded
self.load_op_links = load_op_links
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/db.py
Expand Up @@ -831,8 +831,8 @@ def reserialize_dags(*, session: Session = NEW_SESSION) -> None:
from airflow.models.serialized_dag import SerializedDagModel

session.query(SerializedDagModel).delete(synchronize_session=False)
dagbag = DagBag()
dagbag.collect_dags(only_if_updated=False, safe_mode=False)
dagbag = DagBag(collect_dags=False)
dagbag.collect_dags(only_if_updated=False)
dagbag.sync_to_db(session=session)


Expand Down
13 changes: 13 additions & 0 deletions tests/models/test_dagbag.py
Expand Up @@ -1060,3 +1060,16 @@ def test_dag_cluster_policy_obeyed(self):
dagbag = DagBag(dag_folder=dag_file, include_examples=False)
assert len(dagbag.dag_ids) == 0
assert "has no tags" in dagbag.import_errors[dag_file]

def test_dagbag_dag_collection(self):

dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False, collect_dags=False)
# since collect_dags is False, dagbag.dags should be empty
assert not dagbag.dags

dagbag.collect_dags()
assert dagbag.dags

# test that dagbag.dags is not empty if collect_dags is True
dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False)
assert dagbag.dags