Skip to content

Commit

Permalink
Store callbacks in database if standalone_dag_processor config is Tru…
Browse files Browse the repository at this point in the history
…e. (#21731)
  • Loading branch information
mhenc committed Mar 8, 2022
1 parent 4014194 commit 5ace37a
Show file tree
Hide file tree
Showing 21 changed files with 623 additions and 78 deletions.
30 changes: 26 additions & 4 deletions airflow/callbacks/callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import json
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
Expand All @@ -34,13 +35,21 @@ def __init__(self, full_filepath: str, msg: Optional[str] = None):
self.msg = msg

def __eq__(self, other):
if isinstance(other, CallbackRequest):
if isinstance(other, self.__class__):
return self.__dict__ == other.__dict__
return False
return NotImplemented

def __repr__(self):
return str(self.__dict__)

def to_json(self) -> str:
return json.dumps(self.__dict__)

@classmethod
def from_json(cls, json_str: str):
json_object = json.loads(json_str)
return cls(**json_object)


class TaskCallbackRequest(CallbackRequest):
"""
Expand All @@ -64,6 +73,19 @@ def __init__(
self.simple_task_instance = simple_task_instance
self.is_failure_callback = is_failure_callback

def to_json(self) -> str:
dict_obj = self.__dict__.copy()
dict_obj["simple_task_instance"] = dict_obj["simple_task_instance"].__dict__
return json.dumps(dict_obj)

@classmethod
def from_json(cls, json_str: str):
from airflow.models.taskinstance import SimpleTaskInstance

kwargs = json.loads(json_str)
simple_ti = SimpleTaskInstance.from_dict(obj_dict=kwargs.pop("simple_task_instance"))
return cls(simple_task_instance=simple_ti, **kwargs)


class DagCallbackRequest(CallbackRequest):
"""
Expand Down Expand Up @@ -98,6 +120,6 @@ class SlaCallbackRequest(CallbackRequest):
:param dag_id: DAG ID
"""

def __init__(self, full_filepath: str, dag_id: str):
super().__init__(full_filepath)
def __init__(self, full_filepath: str, dag_id: str, msg: Optional[str] = None):
super().__init__(full_filepath, msg)
self.dag_id = dag_id
35 changes: 35 additions & 0 deletions airflow/callbacks/database_callback_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from sqlalchemy.orm import Session

from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.models import DbCallbackRequest
from airflow.utils.session import NEW_SESSION, provide_session


class DatabaseCallbackSink(BaseCallbackSink):
"""Sends callbacks to database."""

@provide_session
def send(self, callback: CallbackRequest, session: Session = NEW_SESSION) -> None:
"""Sends callback for execution."""
db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
session.add(db_callback)
16 changes: 16 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2011,6 +2011,22 @@
type: string
example: ~
default: "modified_time"
- name: standalone_dag_processor
description: |
Whether the dag processor is running as a standalone process or it is a subprocess of a scheduler
job.
version_added: 2.3.0
type: boolean
example: ~
default: "False"
- name: max_callbacks_per_loop
description: |
Only applicable if `[scheduler]standalone_dag_processor` is true and callbacks are stored
in database. Contains maximum number of callbacks that are fetched during a single loop.
version_added: 2.3.0
type: integer
example: ~
default: "20"
- name: use_job_schedule
description: |
Turn off scheduler use of cron intervals by setting this to False.
Expand Down
8 changes: 8 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,14 @@ parsing_processes = 2
# * ``alphabetical``: Sort by filename
file_parsing_sort_mode = modified_time

# Whether the dag processor is running as a standalone process or it is a subprocess of a scheduler
# job.
standalone_dag_processor = False

# Only applicable if `[scheduler]standalone_dag_processor` is true and callbacks are stored
# in database. Contains maximum number of callbacks that are fetched during a single loop.
max_callbacks_per_loop = 20

# Turn off scheduler use of cron intervals by setting this to False.
# DAGs submitted manually in the web UI or with trigger_dag will still run.
use_job_schedule = True
Expand Down
32 changes: 29 additions & 3 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Union, cast

from setproctitle import setproctitle
from sqlalchemy.orm import Session
from tabulate import tabulate

import airflow.models
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.models import DagModel, errors
from airflow.models import DagModel, DbCallbackRequest, errors
from airflow.models.serialized_dag import SerializedDagModel
from airflow.stats import Stats
from airflow.utils import timezone
Expand All @@ -49,7 +50,8 @@
from airflow.utils.mixins import MultiprocessingStartMethodMixin
from airflow.utils.net import get_hostname
from airflow.utils.process_utils import kill_child_processes_by_pids, reap_process_group
from airflow.utils.session import provide_session
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, with_row_locks

if TYPE_CHECKING:
import pathlib
Expand Down Expand Up @@ -479,16 +481,19 @@ def _run_parsing_loop(self):

self._refresh_dag_dir()
self.prepare_file_path_queue()
maxCallbacksPerLoop = conf.getint("scheduler", "max_callbacks_per_loop")

if self._async_mode:
# If we're in async mode, we can start up straight away. If we're
# in sync mode we need to be told to start a "loop"
self.start_new_processes()
while True:
loop_start_time = time.monotonic()

ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
if self._signal_conn in ready:
if conf.getboolean("scheduler", "standalone_dag_processor"):
# Nothing to do if callbacks are not stored in the database
self._fetch_callbacks(maxCallbacksPerLoop)
agent_signal = self._signal_conn.recv()

self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal)
Expand Down Expand Up @@ -591,6 +596,27 @@ def _run_parsing_loop(self):
else:
poll_time = 0.0

@provide_session
def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION):
"""Fetches callbacks from database and add them to the internal pipe for execution."""
self.log.debug("Fetching callbacks from the database.")
with prohibit_commit(session) as guard:
query = (
session.query(DbCallbackRequest)
.order_by(DbCallbackRequest.priority_weight.asc())
.limit(max_callbacks)
)
callbacks = with_row_locks(
query, of=DbCallbackRequest, session=session, **skip_locked(session=session)
).all()
for callback in callbacks:
try:
self._signal_conn.send(callback.get_callback_request())
session.delete(callback)
except Exception as e:
self.log.warning("Error adding callback for execution: %s, %s", callback, e)
guard.commit()

def _add_callback_to_queue(self, request: CallbackRequest):
self._callback_to_execute[request.full_filepath].append(request)
# Callback has a higher priority over DAG Run scheduling
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def queue_task_instance(
cfg_path: Optional[str] = None,
) -> None:
"""Queues task instance via celery or kubernetes executor"""
executor = self._router(SimpleTaskInstance(task_instance))
executor = self._router(SimpleTaskInstance.from_ti(task_instance))
self.log.debug(
"Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def queue_task_instance(
cfg_path: Optional[str] = None,
) -> None:
"""Queues task instance via local or kubernetes executor"""
executor = self._router(SimpleTaskInstance(task_instance))
executor = self._router(SimpleTaskInstance.from_ti(task_instance))
self.log.debug(
"Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key
)
Expand Down
88 changes: 49 additions & 39 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

from airflow import models, settings
from airflow.callbacks.callback_requests import DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest
from airflow.callbacks.database_callback_sink import DatabaseCallbackSink
from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
from airflow.configuration import conf
from airflow.dag_processing.manager import DagFileProcessorAgent
Expand Down Expand Up @@ -631,7 +632,7 @@ def _process_executor_events(self, session: Session = None) -> int:
if task.on_retry_callback or task.on_failure_callback:
request = TaskCallbackRequest(
full_filepath=ti.dag_model.fileloc,
simple_task_instance=SimpleTaskInstance(ti),
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=msg % (ti, state, ti.state, info),
)
self.executor.send_callback(request)
Expand Down Expand Up @@ -665,9 +666,14 @@ def _execute(self) -> None:

try:
self.executor.job_id = self.id
self.executor.callback_sink = PipeCallbackSink(
get_sink_pipe=self.processor_agent.get_callbacks_pipe
)
if conf.getboolean("scheduler", "standalone_dag_processor"):
self.log.debug("Using DatabaseCallbackSink as callback sink.")
self.executor.callback_sink = DatabaseCallbackSink()
else:
self.log.debug("Using PipeCallbackSink as callback sink.")
self.executor.callback_sink = PipeCallbackSink(
get_sink_pipe=self.processor_agent.get_callbacks_pipe
)

self.executor.start()

Expand Down Expand Up @@ -834,7 +840,6 @@ def _do_scheduling(self, session) -> int:
"""
# Put a check in place to make sure we don't commit unexpectedly
with prohibit_commit(session) as guard:

if settings.USE_JOB_SCHEDULE:
self._create_dagruns_for_dags(guard, session)

Expand All @@ -851,46 +856,49 @@ def _do_scheduling(self, session) -> int:

guard.commit()

# Send the callbacks after we commit to ensure the context is up to date when it gets run
for dag_run, callback_to_run in callback_tuples:
dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
if not dag:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue

self._send_dag_callbacks_to_processor(dag, callback_to_run)
# Send the callbacks after we commit to ensure the context is up to date when it gets run
for dag_run, callback_to_run in callback_tuples:
dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
if not dag:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
# Sending callbacks there as in standalone_dag_processor they are adding to the database,
# so it must be done outside of prohibit_commit.
self._send_dag_callbacks_to_processor(dag, callback_to_run)

with prohibit_commit(session) as guard:
# Without this, the session has an invalid view of the DB
session.expunge_all()
# END: schedule TIs

try:
if self.executor.slots_available <= 0:
# We know we can't do anything here, so don't even try!
self.log.debug("Executor full, skipping critical section")
return 0

timer = Stats.timer('scheduler.critical_section_duration')
timer.start()

# Find anything TIs in state SCHEDULED, try to QUEUE it (send it to the executor)
num_queued_tis = self._critical_section_execute_task_instances(session=session)

# Make sure we only sent this metric if we obtained the lock, otherwise we'll skew the
# metric, way down
timer.stop(send=True)
except OperationalError as e:
timer.stop(send=False)

if is_lock_not_available_error(error=e):
self.log.debug("Critical section lock held by another Scheduler")
Stats.incr('scheduler.critical_section_busy')
session.rollback()
return 0
raise
if self.executor.slots_available <= 0:
# We know we can't do anything here, so don't even try!
self.log.debug("Executor full, skipping critical section")
num_queued_tis = 0
else:
try:
timer = Stats.timer('scheduler.critical_section_duration')
timer.start()

# Find anything TIs in state SCHEDULED, try to QUEUE it (send it to the executor)
num_queued_tis = self._critical_section_execute_task_instances(session=session)

# Make sure we only sent this metric if we obtained the lock, otherwise we'll skew the
# metric, way down
timer.stop(send=True)
except OperationalError as e:
timer.stop(send=False)

if is_lock_not_available_error(error=e):
self.log.debug("Critical section lock held by another Scheduler")
Stats.incr('scheduler.critical_section_busy')
session.rollback()
return 0
raise

guard.commit()
return num_queued_tis

return num_queued_tis

@retry_db_transaction
def _get_next_dagruns_to_examine(self, state: DagRunState, session: Session):
Expand Down Expand Up @@ -1128,6 +1136,8 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: Optional[DagCallb
self._send_sla_callbacks_to_processor(dag)
if callback:
self.executor.send_callback(callback)
else:
self.log.debug("callback is empty")

def _send_sla_callbacks_to_processor(self, dag: DAG):
"""Sends SLA Callbacks to DagFileProcessor if tasks have SLAs set and check_slas=True"""
Expand Down Expand Up @@ -1296,7 +1306,7 @@ def _find_zombies(self, session):
for ti, file_loc in zombies:
request = TaskCallbackRequest(
full_filepath=file_loc,
simple_task_instance=SimpleTaskInstance(ti),
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=f"Detected {ti} as zombie",
)
self.log.error("Detected zombie job: %s", request)
Expand Down

0 comments on commit 5ace37a

Please sign in to comment.