Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store callbacks in database if standalone_dag_processor config is True. #21731

Merged
merged 6 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 26 additions & 4 deletions airflow/callbacks/callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import json
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
Expand All @@ -34,13 +35,21 @@ def __init__(self, full_filepath: str, msg: Optional[str] = None):
self.msg = msg

def __eq__(self, other):
if isinstance(other, CallbackRequest):
if isinstance(other, self.__class__):
return self.__dict__ == other.__dict__
return False
return NotImplemented

def __repr__(self):
return str(self.__dict__)

def to_json(self) -> str:
return json.dumps(self.__dict__)

@classmethod
def from_json(cls, json_str: str):
json_object = json.loads(json_str)
return cls(**json_object)


class TaskCallbackRequest(CallbackRequest):
"""
Expand All @@ -64,6 +73,19 @@ def __init__(
self.simple_task_instance = simple_task_instance
self.is_failure_callback = is_failure_callback

def to_json(self) -> str:
dict_obj = self.__dict__.copy()
dict_obj["simple_task_instance"] = dict_obj["simple_task_instance"].__dict__
return json.dumps(dict_obj)

@classmethod
def from_json(cls, json_str: str):
from airflow.models.taskinstance import SimpleTaskInstance

kwargs = json.loads(json_str)
simple_ti = SimpleTaskInstance.from_dict(obj_dict=kwargs.pop("simple_task_instance"))
return cls(simple_task_instance=simple_ti, **kwargs)


class DagCallbackRequest(CallbackRequest):
"""
Expand Down Expand Up @@ -98,6 +120,6 @@ class SlaCallbackRequest(CallbackRequest):
:param dag_id: DAG ID
"""

def __init__(self, full_filepath: str, dag_id: str):
super().__init__(full_filepath)
def __init__(self, full_filepath: str, dag_id: str, msg: Optional[str] = None):
super().__init__(full_filepath, msg)
self.dag_id = dag_id
35 changes: 35 additions & 0 deletions airflow/callbacks/database_callback_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from sqlalchemy.orm import Session

from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.models import DbCallbackRequest
from airflow.utils.session import NEW_SESSION, provide_session


class DatabaseCallbackSink(BaseCallbackSink):
"""Sends callbacks to database."""

@provide_session
def send(self, callback: CallbackRequest, session: Session = NEW_SESSION) -> None:
"""Sends callback for execution."""
db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
session.add(db_callback)
16 changes: 16 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2011,6 +2011,22 @@
type: string
example: ~
default: "modified_time"
- name: standalone_dag_processor
description: |
Whether the dag processor is running as a standalone process or it is a subprocess of a scheduler
job.
version_added: 2.3.0
type: boolean
example: ~
default: "False"
- name: max_callbacks_per_loop
description: |
Only applicable if `[scheduler]standalone_dag_processor` is true and callbacks are stored
in database. Contains maximum number of callbacks that are fetched during a single loop.
version_added: 2.3.0
type: integer
example: ~
default: "20"
- name: use_job_schedule
description: |
Turn off scheduler use of cron intervals by setting this to False.
Expand Down
8 changes: 8 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,14 @@ parsing_processes = 2
# * ``alphabetical``: Sort by filename
file_parsing_sort_mode = modified_time

# Whether the dag processor is running as a standalone process or it is a subprocess of a scheduler
# job.
standalone_dag_processor = False

# Only applicable if `[scheduler]standalone_dag_processor` is true and callbacks are stored
# in database. Contains maximum number of callbacks that are fetched during a single loop.
max_callbacks_per_loop = 20

# Turn off scheduler use of cron intervals by setting this to False.
# DAGs submitted manually in the web UI or with trigger_dag will still run.
use_job_schedule = True
Expand Down
32 changes: 29 additions & 3 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Union, cast

from setproctitle import setproctitle
from sqlalchemy.orm import Session
from tabulate import tabulate

import airflow.models
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.models import DagModel, errors
from airflow.models import DagModel, DbCallbackRequest, errors
from airflow.models.serialized_dag import SerializedDagModel
from airflow.stats import Stats
from airflow.utils import timezone
Expand All @@ -49,7 +50,8 @@
from airflow.utils.mixins import MultiprocessingStartMethodMixin
from airflow.utils.net import get_hostname
from airflow.utils.process_utils import kill_child_processes_by_pids, reap_process_group
from airflow.utils.session import provide_session
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, with_row_locks

if TYPE_CHECKING:
import pathlib
Expand Down Expand Up @@ -479,16 +481,19 @@ def _run_parsing_loop(self):

self._refresh_dag_dir()
self.prepare_file_path_queue()
maxCallbacksPerLoop = conf.getint("scheduler", "max_callbacks_per_loop")

if self._async_mode:
# If we're in async mode, we can start up straight away. If we're
# in sync mode we need to be told to start a "loop"
self.start_new_processes()
while True:
loop_start_time = time.monotonic()

ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
if self._signal_conn in ready:
if conf.getboolean("scheduler", "standalone_dag_processor"):
# Nothing to do if callbacks are not stored in the database
self._fetch_callbacks(maxCallbacksPerLoop)
agent_signal = self._signal_conn.recv()

self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal)
Expand Down Expand Up @@ -591,6 +596,27 @@ def _run_parsing_loop(self):
else:
poll_time = 0.0

@provide_session
def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION):
"""Fetches callbacks from database and add them to the internal pipe for execution."""
self.log.debug("Fetching callbacks from the database.")
with prohibit_commit(session) as guard:
query = (
session.query(DbCallbackRequest)
.order_by(DbCallbackRequest.priority_weight.asc())
.limit(max_callbacks)
)
callbacks = with_row_locks(
query, of=DbCallbackRequest, session=session, **skip_locked(session=session)
).all()
for callback in callbacks:
try:
self._signal_conn.send(callback.get_callback_request())
session.delete(callback)
except Exception as e:
self.log.warning("Error adding callback for execution: %s, %s", callback, e)
guard.commit()

def _add_callback_to_queue(self, request: CallbackRequest):
self._callback_to_execute[request.full_filepath].append(request)
# Callback has a higher priority over DAG Run scheduling
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def queue_task_instance(
cfg_path: Optional[str] = None,
) -> None:
"""Queues task instance via celery or kubernetes executor"""
executor = self._router(SimpleTaskInstance(task_instance))
executor = self._router(SimpleTaskInstance.from_ti(task_instance))
self.log.debug(
"Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def queue_task_instance(
cfg_path: Optional[str] = None,
) -> None:
"""Queues task instance via local or kubernetes executor"""
executor = self._router(SimpleTaskInstance(task_instance))
executor = self._router(SimpleTaskInstance.from_ti(task_instance))
self.log.debug(
"Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key
)
Expand Down
88 changes: 49 additions & 39 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

from airflow import models, settings
from airflow.callbacks.callback_requests import DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest
from airflow.callbacks.database_callback_sink import DatabaseCallbackSink
from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
from airflow.configuration import conf
from airflow.dag_processing.manager import DagFileProcessorAgent
Expand Down Expand Up @@ -631,7 +632,7 @@ def _process_executor_events(self, session: Session = None) -> int:
if task.on_retry_callback or task.on_failure_callback:
request = TaskCallbackRequest(
full_filepath=ti.dag_model.fileloc,
simple_task_instance=SimpleTaskInstance(ti),
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=msg % (ti, state, ti.state, info),
)
self.executor.send_callback(request)
Expand Down Expand Up @@ -665,9 +666,14 @@ def _execute(self) -> None:

try:
self.executor.job_id = self.id
self.executor.callback_sink = PipeCallbackSink(
get_sink_pipe=self.processor_agent.get_callbacks_pipe
)
if conf.getboolean("scheduler", "standalone_dag_processor"):
self.log.debug("Using DatabaseCallbackSink as callback sink.")
self.executor.callback_sink = DatabaseCallbackSink()
else:
self.log.debug("Using PipeCallbackSink as callback sink.")
self.executor.callback_sink = PipeCallbackSink(
get_sink_pipe=self.processor_agent.get_callbacks_pipe
)

self.executor.start()

Expand Down Expand Up @@ -834,7 +840,6 @@ def _do_scheduling(self, session) -> int:
"""
# Put a check in place to make sure we don't commit unexpectedly
with prohibit_commit(session) as guard:

if settings.USE_JOB_SCHEDULE:
self._create_dagruns_for_dags(guard, session)

Expand All @@ -851,46 +856,49 @@ def _do_scheduling(self, session) -> int:

guard.commit()

# Send the callbacks after we commit to ensure the context is up to date when it gets run
for dag_run, callback_to_run in callback_tuples:
dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
if not dag:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue

self._send_dag_callbacks_to_processor(dag, callback_to_run)
# Send the callbacks after we commit to ensure the context is up to date when it gets run
for dag_run, callback_to_run in callback_tuples:
dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
if not dag:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
# Sending callbacks there as in standalone_dag_processor they are adding to the database,
# so it must be done outside of prohibit_commit.
self._send_dag_callbacks_to_processor(dag, callback_to_run)

with prohibit_commit(session) as guard:
# Without this, the session has an invalid view of the DB
session.expunge_all()
# END: schedule TIs

try:
if self.executor.slots_available <= 0:
# We know we can't do anything here, so don't even try!
self.log.debug("Executor full, skipping critical section")
return 0

timer = Stats.timer('scheduler.critical_section_duration')
timer.start()

# Find anything TIs in state SCHEDULED, try to QUEUE it (send it to the executor)
num_queued_tis = self._critical_section_execute_task_instances(session=session)

# Make sure we only sent this metric if we obtained the lock, otherwise we'll skew the
# metric, way down
timer.stop(send=True)
except OperationalError as e:
timer.stop(send=False)

if is_lock_not_available_error(error=e):
self.log.debug("Critical section lock held by another Scheduler")
Stats.incr('scheduler.critical_section_busy')
session.rollback()
return 0
raise
if self.executor.slots_available <= 0:
# We know we can't do anything here, so don't even try!
self.log.debug("Executor full, skipping critical section")
num_queued_tis = 0
else:
try:
timer = Stats.timer('scheduler.critical_section_duration')
timer.start()

# Find anything TIs in state SCHEDULED, try to QUEUE it (send it to the executor)
num_queued_tis = self._critical_section_execute_task_instances(session=session)

# Make sure we only sent this metric if we obtained the lock, otherwise we'll skew the
# metric, way down
mhenc marked this conversation as resolved.
Show resolved Hide resolved
timer.stop(send=True)
except OperationalError as e:
timer.stop(send=False)

if is_lock_not_available_error(error=e):
self.log.debug("Critical section lock held by another Scheduler")
Stats.incr('scheduler.critical_section_busy')
session.rollback()
return 0
raise

guard.commit()
return num_queued_tis

return num_queued_tis

@retry_db_transaction
def _get_next_dagruns_to_examine(self, state: DagRunState, session: Session):
Expand Down Expand Up @@ -1128,6 +1136,8 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: Optional[DagCallb
self._send_sla_callbacks_to_processor(dag)
if callback:
self.executor.send_callback(callback)
else:
self.log.debug("callback is empty")

def _send_sla_callbacks_to_processor(self, dag: DAG):
"""Sends SLA Callbacks to DagFileProcessor if tasks have SLAs set and check_slas=True"""
Expand Down Expand Up @@ -1296,7 +1306,7 @@ def _find_zombies(self, session):
for ti, file_loc in zombies:
request = TaskCallbackRequest(
full_filepath=file_loc,
simple_task_instance=SimpleTaskInstance(ti),
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=f"Detected {ti} as zombie",
)
self.log.error("Detected zombie job: %s", request)
Expand Down