Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move callback_sink from executor to scheduler #24372

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 0 additions & 14 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
from collections import OrderedDict
from typing import Any, Counter, Dict, List, Optional, Set, Tuple, Union

from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.stats import Stats
Expand Down Expand Up @@ -63,7 +61,6 @@ class BaseExecutor(LoggingMixin):
"""

job_id: Union[None, int, str] = None
callback_sink: Optional[BaseCallbackSink] = None

def __init__(self, parallelism: int = PARALLELISM):
super().__init__()
Expand Down Expand Up @@ -350,14 +347,3 @@ def debug_dump(self):
len(self.event_buffer),
"\n\t".join(map(repr, self.event_buffer.items())),
)

def send_callback(self, request: CallbackRequest) -> None:
"""Sends callback for execution.

Provides a default implementation which sends the callback to the `callback_sink` object.

:param request: Callback request to be executed.
"""
if not self.callback_sink:
raise ValueError("Callback sink is not ready.")
self.callback_sink.send(request)
12 changes: 0 additions & 12 deletions airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
# under the License.
from typing import Dict, List, Optional, Set, Union

from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType
from airflow.executors.celery_executor import CeleryExecutor
Expand All @@ -37,7 +35,6 @@ class CeleryKubernetesExecutor(LoggingMixin):
"""

supports_ad_hoc_ti_run: bool = True
callback_sink: Optional[BaseCallbackSink] = None

KUBERNETES_QUEUE = conf.get('celery_kubernetes_executor', 'kubernetes_queue')

Expand Down Expand Up @@ -207,12 +204,3 @@ def debug_dump(self) -> None:
self.celery_executor.debug_dump()
self.log.info("Dumping KubernetesExecutor state")
self.kubernetes_executor.debug_dump()

def send_callback(self, request: CallbackRequest) -> None:
"""Sends callback for execution.

:param request: Callback request to be executed.
"""
if not self.callback_sink:
raise ValueError("Callback sink is not ready.")
self.callback_sink.send(request)
12 changes: 0 additions & 12 deletions airflow/executors/local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
# under the License.
from typing import Dict, List, Optional, Set, Union

from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType
from airflow.executors.kubernetes_executor import KubernetesExecutor
Expand All @@ -37,7 +35,6 @@ class LocalKubernetesExecutor(LoggingMixin):
"""

supports_ad_hoc_ti_run: bool = True
callback_sink: Optional[BaseCallbackSink] = None

KUBERNETES_QUEUE = conf.get('local_kubernetes_executor', 'kubernetes_queue')

Expand Down Expand Up @@ -206,12 +203,3 @@ def debug_dump(self) -> None:
self.local_executor.debug_dump()
self.log.info("Dumping KubernetesExecutor state")
self.kubernetes_executor.debug_dump()

def send_callback(self, request: CallbackRequest) -> None:
"""Sends callback for execution.

:param request: Callback request to be executed.
"""
if not self.callback_sink:
raise ValueError("Callback sink is not ready.")
self.callback_sink.send(request)
33 changes: 24 additions & 9 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@
from sqlalchemy.orm.session import Session, make_transient

from airflow import models, settings
from airflow.callbacks.callback_requests import DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest
from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import (
CallbackRequest,
DagCallbackRequest,
SlaCallbackRequest,
TaskCallbackRequest,
)
from airflow.callbacks.database_callback_sink import DatabaseCallbackSink
from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
from airflow.configuration import conf
Expand Down Expand Up @@ -165,6 +171,8 @@ def __init__(
DeprecationWarning,
)

self._callback_sink: Optional[BaseCallbackSink] = None

def register_signals(self) -> None:
"""Register signals that stop child processes"""
signal.signal(signal.SIGINT, self._exit_gracefully)
Expand Down Expand Up @@ -687,7 +695,7 @@ def _process_executor_events(self, session: Session = None) -> int:
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=msg % (ti, state, ti.state, info),
)
self.executor.send_callback(request)
self._send_callback(request)
else:
ti.handle_failure(error=msg % (ti, state, ti.state, info), session=session)

Expand Down Expand Up @@ -721,12 +729,10 @@ def _execute(self) -> None:
self.executor.job_id = self.id
if self.processor_agent:
self.log.debug("Using PipeCallbackSink as callback sink.")
self.executor.callback_sink = PipeCallbackSink(
get_sink_pipe=self.processor_agent.get_callbacks_pipe
)
self._callback_sink = PipeCallbackSink(get_sink_pipe=self.processor_agent.get_callbacks_pipe)
else:
self.log.debug("Using DatabaseCallbackSink as callback sink.")
self.executor.callback_sink = DatabaseCallbackSink()
self._callback_sink = DatabaseCallbackSink()

self.executor.start()

Expand Down Expand Up @@ -1203,7 +1209,7 @@ def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session=None):
def _send_dag_callbacks_to_processor(self, dag: DAG, callback: Optional[DagCallbackRequest] = None):
self._send_sla_callbacks_to_processor(dag)
if callback:
self.executor.send_callback(callback)
self._send_callback(callback)
else:
self.log.debug("callback is empty")

Expand All @@ -1217,7 +1223,7 @@ def _send_sla_callbacks_to_processor(self, dag: DAG):
return

request = SlaCallbackRequest(full_filepath=dag.fileloc, dag_id=dag.dag_id)
self.executor.send_callback(request)
self._send_callback(request)

@provide_session
def _emit_pool_metrics(self, session: Session = None) -> None:
Expand Down Expand Up @@ -1378,5 +1384,14 @@ def _find_zombies(self, session):
msg=f"Detected {ti} as zombie",
)
self.log.error("Detected zombie job: %s", request)
self.executor.send_callback(request)
self._send_callback(request)
Stats.incr('zombies_killed')

def _send_callback(self, request: CallbackRequest) -> None:
"""Sends callback for execution.

:param request: Callback request to be executed.
"""
if not self._callback_sink:
raise ValueError("Callback sink is not ready.")
self._callback_sink.send(request)
12 changes: 0 additions & 12 deletions tests/executors/test_celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

from parameterized import parameterized

from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
Expand Down Expand Up @@ -224,14 +223,3 @@ def test_kubernetes_executor_knows_its_queue(self):
assert k8s_executor_mock.kubernetes_queue == conf.get(
'celery_kubernetes_executor', 'kubernetes_queue'
)

def test_send_callback(self):
cel_exec = CeleryExecutor()
k8s_exec = KubernetesExecutor()
cel_k8s_exec = CeleryKubernetesExecutor(cel_exec, k8s_exec)
cel_k8s_exec.callback_sink = mock.MagicMock()

callback = CallbackRequest(full_filepath="fake")
cel_k8s_exec.send_callback(callback)

cel_k8s_exec.callback_sink.send.assert_called_once_with(callback)
12 changes: 0 additions & 12 deletions tests/executors/test_local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# under the License.
from unittest import mock

from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.local_kubernetes_executor import LocalKubernetesExecutor
Expand Down Expand Up @@ -68,14 +67,3 @@ def test_kubernetes_executor_knows_its_queue(self):
LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)

assert k8s_executor_mock.kubernetes_queue == conf.get('local_kubernetes_executor', 'kubernetes_queue')

def test_send_callback(self):
local_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
local_k8s_exec = LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)
local_k8s_exec.callback_sink = mock.MagicMock()

callback = CallbackRequest(full_filepath="fake")
local_k8s_exec.send_callback(callback)

local_k8s_exec.callback_sink.send.assert_called_once_with(callback)