Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hookimpl on_dag_run_failed goes to infinite loop #39146

Open
1 of 2 tasks
vilozio opened this issue Apr 20, 2024 · 1 comment
Open
1 of 2 tasks

hookimpl on_dag_run_failed goes to infinite loop #39146

vilozio opened this issue Apr 20, 2024 · 1 comment
Labels
affected_version:2.6 Issues Reported for 2.6 area:core area:Listeners kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet

Comments

@vilozio
Copy link

vilozio commented Apr 20, 2024

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.6.3

What happened?

We have our own small Airflow plugin to notify a slack channel when a DAG run fails. We use the hookimpl Listeners.
We had twice a strange issue when the plugin has gone to infinite loop and spammed the channel with lots of messages. We have to remove the plugin and restart Airflow instances to stop it. We didn't see any anomalies in logs except that scheduler detected a zombie job. So we think that something may be wrong with the way how we wrote the plugin, or with Airflow.

What you think should happen instead?

No response

How to reproduce

Unfortunately we don't know how to reproduce it, but this issue happened two times.

Operating System

Ubuntu 20.04.6 LTS

Versions of Apache Airflow Providers

No response

Deployment

Google Cloud Composer

Deployment details

Composer version: composer-2.5.1-airflow-2.6.3
Number of schedulers: 2

Anything else?

This is the log row that appeared near the time when the spam started, for this exact dag id.

Detected zombie job: {'full_filepath': '/home/airflow/gcs/dags/myfolder/dag_player_value_model.py', 'processor_subdir': '/home/airflow/gcs/dags', 'msg': "{'DAG Id': 'player_value_model', 'Task Id': 'main', 'Run Id': 'scheduled__2024-04-19T08:00:00+00:00', 'Hostname': 'airflow-worker-48ssg', 'External Executor Id': '6c81b467-3ba0-430f-9fe1-fe48f3aca8a1'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f4050d6e760>, 'is_failure_callback': True}

This is our plugin code

PLUGIN CODE
"""Airflow plugin to notify channels on DagRun failure."""

import logging
from datetime import datetime
from typing import List

import requests
from airflow import settings
from airflow.listeners import hookimpl
from airflow.models import DagModel, DagRun, DagTag, TaskInstance, Variable
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.state import DagRunState, TaskInstanceState
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.session import Session

logger = logging.getLogger(__name__)


# Tag to disable alert for a dag.
NO_ALERT_TAG = "no alert"


def _is_alert_enabled(dag_id: str, session: Session) -> bool:
    """Return True if the alert is enabled for the given dag id."""
    dag_tags: List[DagTag] = (
        session.query(DagModel)
        .options(joinedload(DagModel.tags, innerjoin=False))
        .filter(DagModel.dag_id == dag_id)
        .first()
        .tags
    )
    for tag in dag_tags:
        if tag.name.lower() == NO_ALERT_TAG:
            return False
    return True


def notify_slack_on_dag(dag_run: DagRun, msg: str, session: Session) -> None:
    logger.info(
        f"Sending notification to slack channel... dag id {dag_run.dag_id} dag state {dag_run.state}"
    )
    now = datetime.utcnow()
    dt_string = now.strftime("%d/%m/%Y %H:%M:%S")

    if not _is_alert_enabled(dag_run.dag_id, session):
        logger.info(
            "Alert is disabled for the dag. "
            "Skip sending notification to slack channel."
        )
        return
    # For some reason, variable is not accessible in scheduler.
    # So, we are using session to get the variable.
    variable = session.query(Variable).filter(Variable.key == "GCP_PROJECT_ID").first()
    project_id = variable.get_val()

    failed_tasks = (
        session.query(TaskInstance)
        .filter(
            TaskInstance.dag_run == dag_run,
            TaskInstance.state == TaskInstanceState.FAILED,
        )
        .all()
    )

    failed_tasks_str = "\n".join(
        f"{task.task_id} - {task.log_url}" for task in failed_tasks
    )

    request_variables = {
        "notificationType": "Airflow Task Fail",
        "projectId": project_id,
        "processName": dag_run.dag_id,
        "timeStamp": dt_string,
        "errorMessage": msg + "\n" + failed_tasks_str,
    }
    variable = (
        session.query(Variable)
        .filter(Variable.key == "url_notifications_to_slack_channels")
        .first()
    )
    url = variable.get_val()
    response = requests.post(
        url=url,
        json=request_variables,
    )
    logger.info(
        f"Response from slack channel is {response.status_code} {response.text}"
    )


class NotifyOnFailurePlugin(AirflowPlugin):
    """Airflow plugin to notify channels on DagRun failure.

    For more information, see https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/plugins.html
    """

    class Listener:
        @hookimpl
        def on_dag_run_failed(
            self,
            dag_run: DagRun,
            msg: str,
        ) -> None:
            if dag_run.state != DagRunState.FAILED:
                return
            try:
                session = settings.Session()
                notify_slack_on_dag(dag_run, msg, session)
            except Exception as e:
                logger.error(f"Error in on_dag_run_failed: {e}")
            finally:
                session.close()

    # Name of the plugin.
    name = "NotifyOnFailurePlugin"
    # A list of Listeners that plugin provides. Listeners can register to
    # listen to particular events that happen in Airflow, like
    # TaskInstance state changes. Listeners are python classes or modules.
    listeners = [Listener()]

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@vilozio vilozio added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Apr 20, 2024
@eladkal eladkal added affected_version:2.6 Issues Reported for 2.6 area:Listeners labels May 7, 2024
@eladkal
Copy link
Contributor

eladkal commented May 7, 2024

Not sure we can invistigate without reproduce example

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.6 Issues Reported for 2.6 area:core area:Listeners kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet
Projects
None yet
Development

No branches or pull requests

2 participants