diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 43f34350cffd9..8a5557c2c2264 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -326,6 +326,7 @@ def parse(mod_name, filepath): loader.exec_module(new_module) return [new_module] except Exception as e: + DagContext.autoregistered_dags.clear() self.log.exception("Failed to import: %s", filepath) if self.dagbag_import_error_tracebacks: self.import_errors[filepath] = traceback.format_exc( @@ -391,6 +392,7 @@ def _load_modules_from_zip(self, filepath, safe_mode): current_module = importlib.import_module(mod_name) mods.append(current_module) except Exception as e: + DagContext.autoregistered_dags.clear() fileloc = os.path.join(filepath, zip_info.filename) self.log.exception("Failed to import: %s", fileloc) if self.dagbag_import_error_tracebacks: diff --git a/tests/dags/test_invalid_dup_task.py b/tests/dags/test_invalid_dup_task.py new file mode 100644 index 0000000000000..472cb9185135c --- /dev/null +++ b/tests/dags/test_invalid_dup_task.py @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.operators.empty import EmptyOperator + +with DAG( + "test_invalid_dup_task", + start_date=datetime(2021, 1, 1), + schedule="@once", +): + EmptyOperator(task_id="hi") + EmptyOperator(task_id="hi") diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index abed3148cdb1b..028522c74e9f8 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2766,6 +2766,7 @@ def test_list_py_file_paths(self): ignored_files = { 'no_dags.py', 'test_invalid_cron.py', + 'test_invalid_dup_task.py', 'test_ignore_this.py', 'test_invalid_param.py', 'test_nested_dag.py', diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 5be9a596aead9..ed20d41ef01ca 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -357,6 +357,28 @@ def test_get_dag_registration(self, file_to_load, expected): assert dag, f"{dag_id} was bagged" assert dag.fileloc.endswith(path) + def test_dag_registration_with_failure(self): + dagbag = models.DagBag(dag_folder=os.devnull, include_examples=False) + found = dagbag.process_file(str(TEST_DAGS_FOLDER / 'test_invalid_dup_task.py')) + assert [] == found + + @pytest.fixture() + def zip_with_valid_dag_and_dup_tasks(self, tmp_path: pathlib.Path) -> Iterator[str]: + failing_dag_file = TEST_DAGS_FOLDER / 'test_invalid_dup_task.py' + working_dag_file = TEST_DAGS_FOLDER / 'test_example_bash_operator.py' + zipped = os.path.join(tmp_path, "test_zip_invalid_dup_task.zip") + with zipfile.ZipFile(zipped, "w") as zf: + zf.write(failing_dag_file, os.path.basename(failing_dag_file)) + zf.write(working_dag_file, os.path.basename(working_dag_file)) + yield zipped + os.unlink(zipped) + + def test_dag_registration_with_failure_zipped(self, zip_with_valid_dag_and_dup_tasks): + dagbag = models.DagBag(dag_folder=os.devnull, include_examples=False) + found = dagbag.process_file(zip_with_valid_dag_and_dup_tasks) + assert 1 == len(found) + assert ['test_example_bash_operator'] == [dag.dag_id for dag in found] + @patch.object(DagModel, "get_current") def test_refresh_py_dag(self, mock_dagmodel): """