From a51f9f0b1e54351be5db0eb36e18a0f1125c8a13 Mon Sep 17 00:00:00 2001 From: Andrew Gibbs Date: Tue, 19 Jul 2022 08:40:11 +0100 Subject: [PATCH 1/3] Stop SLA callbacks gazumping other callbacks and effectively DOS'ing the queue --- airflow/dag_processing/manager.py | 43 ++++++++++++++----- tests/dag_processing/test_manager.py | 62 ++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 11 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index b1d23034bf460..56b537b0160f6 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -38,7 +38,7 @@ from tabulate import tabulate import airflow.models -from airflow.callbacks.callback_requests import CallbackRequest +from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest from airflow.configuration import conf from airflow.dag_processing.processor import DagFileProcessorProcess from airflow.models import DagModel, DagWarning, DbCallbackRequest, errors @@ -679,16 +679,37 @@ def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION): guard.commit() def _add_callback_to_queue(self, request: CallbackRequest): - self._callback_to_execute[request.full_filepath].append(request) - # Callback has a higher priority over DAG Run scheduling - if request.full_filepath in self._file_path_queue: - # Remove file paths matching request.full_filepath from self._file_path_queue - # Since we are already going to use that filepath to run callback, - # there is no need to have same file path again in the queue - self._file_path_queue = [ - file_path for file_path in self._file_path_queue if file_path != request.full_filepath - ] - self._file_path_queue.insert(0, request.full_filepath) + + # requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled + # task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives, + # goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to + # the front of the queue, and we never get round to picking stuff off the back of the queue + if isinstance(request, SlaCallbackRequest): + if request in self._callback_to_execute[request.full_filepath]: + self.log.debug("Skipping already queued SlaCallbackRequest") + return + + # not already queued, queue the file _at the back_, and add the request to the file's callbacks + self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id) + self._callback_to_execute[request.full_filepath].append(request) + if request.full_filepath not in self._file_path_queue: + self._file_path_queue.append(request.full_filepath) + + # Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if + # already in the queue + else: + self.log.debug( + "Queuing %s CallbackRequest: %s", type(request).__name__.rsplit('.', 1)[-1], request + ) + self._callback_to_execute[request.full_filepath].append(request) + if request.full_filepath in self._file_path_queue: + # Remove file paths matching request.full_filepath from self._file_path_queue + # Since we are already going to use that filepath to run callback, + # there is no need to have same file path again in the queue + self._file_path_queue = [ + file_path for file_path in self._file_path_queue if file_path != request.full_filepath + ] + self._file_path_queue.insert(0, request.full_filepath) def _refresh_dag_dir(self): """Refresh file paths from dag dir if we haven't done it for too long.""" diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 891df4ec641b3..bcca03d5598e5 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -867,6 +867,68 @@ def test_fetch_callbacks_from_database_not_standalone(self, tmpdir): with create_session() as session: assert session.query(DbCallbackRequest).count() == 1 + def test_callback_queue(self, tmpdir): + # given + manager = DagFileProcessorManager( + dag_directory=TEST_DAG_FOLDER, + max_runs=1, + processor_timeout=timedelta(days=365), + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ) + + dag1_req1 = DagCallbackRequest( + full_filepath="/green_eggs/ham/file1.py", + dag_id="dag1", + run_id="run1", + is_failure_callback=False, + msg=None, + ) + dag1_req2 = DagCallbackRequest( + full_filepath="/green_eggs/ham/file1.py", + dag_id="dag1", + run_id="run1", + is_failure_callback=False, + msg=None, + ) + dag1_sla1 = SlaCallbackRequest(full_filepath="/green_eggs/ham/file1.py", dag_id="dag1") + dag1_sla2 = SlaCallbackRequest(full_filepath="/green_eggs/ham/file1.py", dag_id="dag1") + + dag2_req1 = DagCallbackRequest( + full_filepath="/green_eggs/ham/file2.py", + dag_id="dag2", + run_id="run1", + is_failure_callback=False, + msg=None, + ) + + # when + manager._add_callback_to_queue(dag1_req1) + manager._add_callback_to_queue(dag1_sla1) + manager._add_callback_to_queue(dag2_req1) + + # then - requests should be in manager's queue, with dag2 ahead of dag1 (because it was added last) + assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath] + assert set(manager._callback_to_execute.keys()) == {dag1_req1.full_filepath, dag2_req1.full_filepath} + assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1] + assert manager._callback_to_execute[dag2_req1.full_filepath] == [dag2_req1] + + # when + manager._add_callback_to_queue(dag1_sla2) + + # then - since sla2 == sla1, should not have brought dag1 to the fore + assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath] + assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1] + + # when + manager._add_callback_to_queue(dag1_req2) + + # then - non-sla callback should have brought dag1 to the fore + assert manager._file_path_queue == [dag1_req1.full_filepath, dag2_req1.full_filepath] + assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1, dag1_req2] + class TestDagFileProcessorAgent(unittest.TestCase): def setUp(self): From c92b6439098c18a180490a65e0cb5c115d8a8eee Mon Sep 17 00:00:00 2001 From: Andrew Gibbs Date: Tue, 19 Jul 2022 09:17:57 +0100 Subject: [PATCH 2/3] Added newsfragment --- newsfragments/25147.bugfix.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/25147.bugfix.rst diff --git a/newsfragments/25147.bugfix.rst b/newsfragments/25147.bugfix.rst new file mode 100644 index 0000000000000..2d4523604c654 --- /dev/null +++ b/newsfragments/25147.bugfix.rst @@ -0,0 +1 @@ +``DagProcessorManager`` callback queue changed to queue SLAs at the back (stops DAG processing stalling due to SLAs) From b50f8c1126958281910ab9df63c8f0859013d2ba Mon Sep 17 00:00:00 2001 From: Andrew Gibbs Date: Thu, 21 Jul 2022 10:14:35 +0100 Subject: [PATCH 3/3] MR feedback - remove rsplit --- airflow/dag_processing/manager.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 56b537b0160f6..cc6a38cb1daca 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -698,9 +698,7 @@ def _add_callback_to_queue(self, request: CallbackRequest): # Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if # already in the queue else: - self.log.debug( - "Queuing %s CallbackRequest: %s", type(request).__name__.rsplit('.', 1)[-1], request - ) + self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request) self._callback_to_execute[request.full_filepath].append(request) if request.full_filepath in self._file_path_queue: # Remove file paths matching request.full_filepath from self._file_path_queue