diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 3e083d4b90b70..f1de53417eec5 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1709,6 +1709,14 @@ type: float example: ~ default: "15.0" + - name: clean_tis_without_dag_interval + description: | + How often (in seconds) to check and fail task instances and dagruns whose corresponding + DAG is missing + version_added: 2.1.2 + type: float + example: ~ + default: "15.0" - name: scheduler_heartbeat_sec description: | The scheduler constantly tries to trigger new tasks (look at the diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index c03e116746f96..5216707851b0d 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -854,6 +854,10 @@ job_heartbeat_sec = 5 # that no longer have a matching DagRun clean_tis_without_dagrun_interval = 15.0 +# How often (in seconds) to check and fail task instances and dagruns whose corresponding +# DAG is missing +clean_tis_without_dag_interval = 15.0 + # The scheduler constantly tries to trigger new tasks (look at the # scheduler section in the docs for more information). This defines # how often the scheduler should run (in seconds). diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index c2b3f2c8ae59b..f366be44b22e3 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -549,7 +549,6 @@ def _critical_section_execute_task_instances(self, session: Session) -> int: else: max_tis = min(self.max_tis_per_query, self.executor.slots_available) queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) - self._enqueue_task_instances_with_queued_state(queued_tis) return len(queued_tis) @@ -733,6 +732,10 @@ def _run_scheduler_loop(self) -> None: conf.getfloat('scheduler', 'clean_tis_without_dagrun_interval', fallback=15.0), self._clean_tis_without_dagrun, ) + timers.call_regular_interval( + conf.getfloat('scheduler', 'clean_tis_without_dag_interval', fallback=15.0), + self._clean_tis_without_dag, + ) for loop_count in itertools.count(start=1): with Stats.timer() as timer: @@ -807,6 +810,37 @@ def _clean_tis_without_dagrun(self, session): raise guard.commit() + @provide_session + def _clean_tis_without_dag(self, session: Session = None): + """Fails task instances and DagRuns of DAGs that no longer exist in the dagbag/DB""" + states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN]) + tis = session.query(TI).filter(TI.state.in_(states_to_check)).all() + missing_dags = {} + dag_runs = set() + + for ti in tis: + if ti.dag_id in missing_dags: + ti.set_state(State.FAILED, session=session) + continue + # Dag no longer in dagbag? + if not self.dagbag.has_dag(ti.dag_id, session=session): + ti.set_state(State.FAILED, session=session) + dag_runs.add(ti.dag_run) + missing_dags[ti.dag_id] = [ti] + continue + if missing_dags: + self.log.warning( + "The following Dags are missing, therefore the DAG's " + "task instances have been set to failed: \t\n%s", + [ + (f"Missing DAGs: {dag_name}", f"Failed TaskInstances: [{ti}]") + for dag_name, ti in missing_dags.items() + ], + ) + self.log.warning("Failing the corresponding DagRuns of the missing DAGs. DagRuns: %s", dag_runs) + for dr in dag_runs: + dr.state = State.FAILED + def _do_scheduling(self, session) -> int: """ This function is where the main scheduling decisions take places. It: diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index bd339eec348ad..cdc65af8ac81a 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -166,6 +166,34 @@ def dag_ids(self) -> List[str]: """ return list(self.dags.keys()) + @provide_session + def has_dag(self, dag_id: str, session: Session = None): + """ + Checks the "local" cache, if it exists, and is not older than the configured cache time, return True + else check in the DB if the dag exists, return True, False otherwise + """ + from airflow.models.serialized_dag import SerializedDagModel + + sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime( + dag_id=dag_id, + session=session, + ) + sd_has_dag = sd_last_updated_datetime is not None + if dag_id not in self.dags: + return sd_has_dag + if dag_id not in self.dags_last_fetched: + return sd_has_dag + min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL) + if timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs: + return sd_has_dag + if sd_has_dag: + return True + self.log.warning("Serialized DAG %s no longer exists", dag_id) + del self.dags[dag_id] + del self.dags_last_fetched[dag_id] + del self.dags_hash[dag_id] + return False + @provide_session def get_dag(self, dag_id, session: Session = None): """ diff --git a/airflow/utils/state.py b/airflow/utils/state.py index d5300e11b6a9f..7e98299bd0dc7 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -107,6 +107,7 @@ def color_fg(cls, state): FAILED, SKIPPED, UPSTREAM_FAILED, + REMOVED, ] ) """ diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 87be3e8d16638..bca1adbc6f9b1 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1475,6 +1475,55 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun( assert ti.start_date == ti.end_date assert ti.duration is not None + def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self): + """This tests that if dags are missing in dagbag, then it should be failed""" + session = settings.Session() + dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag' + dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'}) + + with dag: + op1 = BashOperator(task_id='op1', bash_command='sleep 5') + + # Write Dag to DB + dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False) + dagbag.bag_dag(dag, root_dag=dag) + dagbag.sync_to_db() + + # Mock dagbag.has_dag + dagbag.has_dag = mock.MagicMock(return_value=False) + + dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id) + dr = dag.create_dagrun( + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + execution_date=DEFAULT_DATE + timedelta(days=1), + start_date=DEFAULT_DATE + timedelta(days=1), + session=session, + ) + ti = dr.get_task_instance(task_id=op1.task_id, session=session) + ti.state = State.RUNNING + session.commit() + + # This poll interval is large, bug the scheduler doesn't sleep that + # long, instead we hit the clean_tis_without_dag interval instead + self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30) + self.scheduler_job.dagbag = dagbag + executor = MockExecutor(do_update=False) + self.scheduler_job.executor = executor + processor = mock.MagicMock() + processor.done = False + self.scheduler_job.processor_agent = processor + + with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars( + {('scheduler', 'clean_tis_without_dag_interval'): '0.001'} + ): + self.scheduler_job._run_scheduler_loop() + + ti = dr.get_task_instance(task_id=op1.task_id, session=session) + assert ti.state == State.FAILED + assert ti.start_date is not None + assert ti.dag_run.state == State.FAILED + @mock.patch('airflow.jobs.scheduler_job.DagFileProcessorAgent') def test_executor_end_called(self, mock_processor_agent): """ diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index b4edc0c7a4f0e..6280a07ddc5df 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -859,6 +859,38 @@ def test_get_dag_with_dag_serialization(self): assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"} assert updated_ser_dag_1_update_time > ser_dag_1_update_time + @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5) + def test_has_dag_method(self): + """Test has_dag method""" + dag_id = "example_bash_operator" + with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)): + example_bash_op_dag = DagBag(include_examples=True).dags.get(dag_id) + SerializedDagModel.write_dag(dag=example_bash_op_dag) + + dag_bag = DagBag(read_dags_from_db=True) + dag_bag.get_dag(dag_id) # Add dag to self.dags + assert dag_bag.has_dag(dag_id) + assert not dag_bag.has_dag("non_added_dag_id") + + # Check that min_serialized_dag_fetch_interval has passed but + # dag is in SerializedDagModel and we return True + with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 6)): + assert dag_bag.has_dag(dag_id) + + # We remove the dag from dag_bag.dags and ensure it returns True + dag_bag.dags.pop(dag_id) + assert dag_bag.has_dag(dag_id) + + # We removed dag_id from dag_bag.dags, let's add it back + dag_bag.get_dag(dag_id) + # Remove the dag from SerializedDagModel + SerializedDagModel.remove_dag(dag_id=dag_id) + with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 8)): + assert dag_bag.dags[dag_id] + # Even though that we have dag in dag_bag.dags + # it's no longer in SerializedDagModel and we should return False + assert not dag_bag.has_dag(dag_id) + def test_collect_dags_from_db(self): """DAGs are collected from Database""" example_dags_folder = airflow.example_dags.__path__[0]