diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index b1d23034bf460..cc6a38cb1daca 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -38,7 +38,7 @@ from tabulate import tabulate import airflow.models -from airflow.callbacks.callback_requests import CallbackRequest +from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest from airflow.configuration import conf from airflow.dag_processing.processor import DagFileProcessorProcess from airflow.models import DagModel, DagWarning, DbCallbackRequest, errors @@ -679,16 +679,35 @@ def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION): guard.commit() def _add_callback_to_queue(self, request: CallbackRequest): - self._callback_to_execute[request.full_filepath].append(request) - # Callback has a higher priority over DAG Run scheduling - if request.full_filepath in self._file_path_queue: - # Remove file paths matching request.full_filepath from self._file_path_queue - # Since we are already going to use that filepath to run callback, - # there is no need to have same file path again in the queue - self._file_path_queue = [ - file_path for file_path in self._file_path_queue if file_path != request.full_filepath - ] - self._file_path_queue.insert(0, request.full_filepath) + + # requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled + # task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives, + # goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to + # the front of the queue, and we never get round to picking stuff off the back of the queue + if isinstance(request, SlaCallbackRequest): + if request in self._callback_to_execute[request.full_filepath]: + self.log.debug("Skipping already queued SlaCallbackRequest") + return + + # not already queued, queue the file _at the back_, and add the request to the file's callbacks + self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id) + self._callback_to_execute[request.full_filepath].append(request) + if request.full_filepath not in self._file_path_queue: + self._file_path_queue.append(request.full_filepath) + + # Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if + # already in the queue + else: + self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request) + self._callback_to_execute[request.full_filepath].append(request) + if request.full_filepath in self._file_path_queue: + # Remove file paths matching request.full_filepath from self._file_path_queue + # Since we are already going to use that filepath to run callback, + # there is no need to have same file path again in the queue + self._file_path_queue = [ + file_path for file_path in self._file_path_queue if file_path != request.full_filepath + ] + self._file_path_queue.insert(0, request.full_filepath) def _refresh_dag_dir(self): """Refresh file paths from dag dir if we haven't done it for too long.""" diff --git a/newsfragments/25147.bugfix.rst b/newsfragments/25147.bugfix.rst new file mode 100644 index 0000000000000..2d4523604c654 --- /dev/null +++ b/newsfragments/25147.bugfix.rst @@ -0,0 +1 @@ +``DagProcessorManager`` callback queue changed to queue SLAs at the back (stops DAG processing stalling due to SLAs) diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 891df4ec641b3..bcca03d5598e5 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -867,6 +867,68 @@ def test_fetch_callbacks_from_database_not_standalone(self, tmpdir): with create_session() as session: assert session.query(DbCallbackRequest).count() == 1 + def test_callback_queue(self, tmpdir): + # given + manager = DagFileProcessorManager( + dag_directory=TEST_DAG_FOLDER, + max_runs=1, + processor_timeout=timedelta(days=365), + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ) + + dag1_req1 = DagCallbackRequest( + full_filepath="/green_eggs/ham/file1.py", + dag_id="dag1", + run_id="run1", + is_failure_callback=False, + msg=None, + ) + dag1_req2 = DagCallbackRequest( + full_filepath="/green_eggs/ham/file1.py", + dag_id="dag1", + run_id="run1", + is_failure_callback=False, + msg=None, + ) + dag1_sla1 = SlaCallbackRequest(full_filepath="/green_eggs/ham/file1.py", dag_id="dag1") + dag1_sla2 = SlaCallbackRequest(full_filepath="/green_eggs/ham/file1.py", dag_id="dag1") + + dag2_req1 = DagCallbackRequest( + full_filepath="/green_eggs/ham/file2.py", + dag_id="dag2", + run_id="run1", + is_failure_callback=False, + msg=None, + ) + + # when + manager._add_callback_to_queue(dag1_req1) + manager._add_callback_to_queue(dag1_sla1) + manager._add_callback_to_queue(dag2_req1) + + # then - requests should be in manager's queue, with dag2 ahead of dag1 (because it was added last) + assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath] + assert set(manager._callback_to_execute.keys()) == {dag1_req1.full_filepath, dag2_req1.full_filepath} + assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1] + assert manager._callback_to_execute[dag2_req1.full_filepath] == [dag2_req1] + + # when + manager._add_callback_to_queue(dag1_sla2) + + # then - since sla2 == sla1, should not have brought dag1 to the fore + assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath] + assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1] + + # when + manager._add_callback_to_queue(dag1_req2) + + # then - non-sla callback should have brought dag1 to the fore + assert manager._file_path_queue == [dag1_req1.full_filepath, dag2_req1.full_filepath] + assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1, dag1_req2] + class TestDagFileProcessorAgent(unittest.TestCase): def setUp(self):