Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openlineage, celery: scheduler hanging when emitting lots of OL events via HTTP #39232

Open
2 tasks done
JDarDagran opened this issue Apr 24, 2024 · 2 comments · May be fixed by #39235
Open
2 tasks done

openlineage, celery: scheduler hanging when emitting lots of OL events via HTTP #39232

JDarDagran opened this issue Apr 24, 2024 · 2 comments · May be fixed by #39235

Comments

@JDarDagran
Copy link
Contributor

Apache Airflow Provider(s)

celery, openlineage

Versions of Apache Airflow Providers

No response

Apache Airflow version

2.9.0 but happens on 2.7+ too

Operating System

Darwin MacBook-Pro.local 23.4.0 Darwin Kernel Version 23.4.0: Fri Mar 15 00:10:42 PDT 2024; root:xnu-10063.101.17~1/RELEASE_ARM64_T6000 arm64

Deployment

Other

Deployment details

The issue can be reproduced in all environments, both in local with breeze and cloud deployment, e.g. Astro Cloud.

What happened

OpenLineage listener hooks on DagRun state changes via on_dag_run_running/failed/success. When OL events are emitted via HTTP in large scale the scheduler hangs and needs restart. The issue appears to be happening only with CeleryExecutor.

This couldn't be reproduced when disabling OpenLineage (with [openlineage] disabled = True) or with any other OpenLineage transport that doesn't use HTTP. I also experimented with using raw urllib3 or httpx as alternative to requests. All of the experiments produced the same bug resulting in Scheduler hanging.

What you think should happen instead

When reproducing with local breeze setup with CeleryExecutor there’s this strange behaviour:

htop:
image

lsof | grep CLOSE_WAIT:
image

Stack from main loop of scheduler:

Traceback for thread 152 (airflow) [] (most recent call last):
    (Python) File "/usr/local/bin/airflow", line 8, in <module>
        sys.exit(main())
    (Python) File "/opt/airflow/airflow/__main__.py", line 58, in main
        args.func(args)
    (Python) File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
        return func(*args, **kwargs)
    (Python) File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
        return f(*args, **kwargs)
    (Python) File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
        return func(*args, **kwargs)
    (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 58, in scheduler
        run_command_with_daemon_option(
    (Python) File "/opt/airflow/airflow/cli/commands/daemon_utils.py", line 85, in run_command_with_daemon_option
        callback()
    (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 61, in <lambda>
        callback=lambda: _run_scheduler_job(args),
    (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 49, in _run_scheduler_job
        run_job(job=job_runner.job, execute_callable=job_runner._execute)
    (Python) File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
        return func(*args, session=session, **kwargs)
    (Python) File "/opt/airflow/airflow/jobs/job.py", line 410, in run_job
        return execute_job(job, execute_callable=execute_callable)
    (Python) File "/opt/airflow/airflow/jobs/job.py", line 439, in execute_job
        ret = execute_callable()
    (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 847, in _execute
        self._run_scheduler_loop()
    (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 982, in _run_scheduler_loop
        self.job.executor.heartbeat()
    (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 240, in heartbeat
        self.trigger_tasks(open_slots)
    (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 298, in trigger_tasks
        self._process_tasks(task_tuples)
    (Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 292, in _process_tasks
        key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
    (Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 342, in _send_tasks_to_celery
        key_and_async_results = list(
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 484, in _chain_from_iterable_of_lists
        for element in iterable:
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 619, in result_iterator
        yield fs.pop().result()
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 439, in result
        self._condition.wait(timeout)
    (Python) File "/usr/local/lib/python3.8/threading.py", line 302, in wait
        waiter.acquire()

Stack from one of the child spawned scheduler processes

Traceback for thread 6363 (airflow) [] (most recent call last):
    (Python) File "/usr/local/bin/airflow", line 8, in <module>
        sys.exit(main())
    (Python) File "/opt/airflow/airflow/__main__.py", line 58, in main
        args.func(args)
    (Python) File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
        return func(*args, **kwargs)
    (Python) File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
        return f(*args, **kwargs)
    (Python) File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
        return func(*args, **kwargs)
    (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 58, in scheduler
        run_command_with_daemon_option(
    (Python) File "/opt/airflow/airflow/cli/commands/daemon_utils.py", line 85, in run_command_with_daemon_option
        callback()
    (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 61, in <lambda>
        callback=lambda: _run_scheduler_job(args),
    (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 49, in _run_scheduler_job
        run_job(job=job_runner.job, execute_callable=job_runner._execute)
    (Python) File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
        return func(*args, session=session, **kwargs)
    (Python) File "/opt/airflow/airflow/jobs/job.py", line 410, in run_job
        return execute_job(job, execute_callable=execute_callable)
    (Python) File "/opt/airflow/airflow/jobs/job.py", line 439, in execute_job
        ret = execute_callable()
    (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 847, in _execute
        self._run_scheduler_loop()
    (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 982, in _run_scheduler_loop
        self.job.executor.heartbeat()
    (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 240, in heartbeat
        self.trigger_tasks(open_slots)
    (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 298, in trigger_tasks
        self._process_tasks(task_tuples)
    (Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 292, in _process_tasks
        key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
    (Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 343, in _send_tasks_to_celery
        send_pool.map(send_task_to_executor, task_tuples_to_send, chunksize=chunksize)
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 674, in map
        results = super().map(partial(_process_chunk, fn),
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 608, in map
        fs = [self.submit(fn, *args) for args in zip(*iterables)]
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 608, in <listcomp>
        fs = [self.submit(fn, *args) for args in zip(*iterables)]
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 645, in submit
        self._start_queue_management_thread()
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 584, in _start_queue_management_thread
        self._adjust_process_count()
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 608, in _adjust_process_count
        p.start()
    (Python) File "/usr/local/lib/python3.8/multiprocessing/process.py", line 121, in start
        self._popen = self._Popen(self)
    (Python) File "/usr/local/lib/python3.8/multiprocessing/context.py", line 277, in _Popen
        return Popen(process_obj)
    (Python) File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
        self._launch(process_obj)
    (Python) File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", line 75, in _launch
        code = process_obj._bootstrap(parent_sentinel=child_r)
    (Python) File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
        self.run()
    (Python) File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
        call_item = call_queue.get(block=True)
    (Python) File "/usr/local/lib/python3.8/multiprocessing/queues.py", line 97, in get
        res = self._recv_bytes()
    (Python) File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
        buf = self._recv_bytes(maxlength)
    (Python) File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
        buf = self._recv(4)
    (Python) File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
        chunk = read(handle, remaining)

This points to fact that the bug has probably something to do with file descriptors not being closed properly. I did not follow the whole logic how scheduler spawns child processes but before scheduler getting stuck it spawns some child processes closing properly and not causing the issue.

How to reproduce

  1. Setup breeze.
  2. Create dynamic_dag.jinja2 template:
from __future__ import annotations

from pendulum import datetime

from airflow.decorators import dag, task

@task
def return_string(path):
    return path

@dag(
    start_date=datetime(2023, 1, 1),
    max_active_runs={{ max_active_runs }},
    schedule=None,
    catchup=False,
)
def dynamic_dag_{{ number }}():
    return_string("whatever")

dynamic_dag_{{ number }}()
  1. Create DAGs in files/dags Airflow directory, e.g. with following script:
from __future__ import annotations

import pathlib

import jinja2

DAGS_LOCATION = pathlib.Path.home() / "airflow" / "files" / "dags" # change if airflow is another location
TEMPLATE_LOCATION = pathlib.Path(__file__).resolve().parent / "dynamic_dag.jinja2" # this should point to template file
NO_OF_DAGS = 10
MAX_ACTIVE_RUNS = 100

if __name__ == '__main__':
    jinja_env = jinja2.Environment()
    template = jinja_env.from_string(TEMPLATE_LOCATION.read_text())
    for i in range(NO_OF_DAGS):
        with open(DAGS_LOCATION / f"dynamic_dag_{i}.py", 'w') as f:
            f.write(template.render(number=i, max_active_runs=MAX_ACTIVE_RUNS))
  1. Create trigger_dag in files/dags :
from __future__ import annotations

from datetime import datetime, timedelta

from airflow import DAG
from airflow.api.common.trigger_dag import trigger_dag
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
from airflow.utils.types import DagRunType

default_args = {
    "owner": "airflow",
    "start_date": datetime(2023, 1, 1),
    "depends_on_past": False,
}

NO_OF_DAGS = 10
NO_OF_TRIGGERS_PER_DAG = 100

def trigger(**kwargs):
    dag_runs = []
    def trigger_dags(x):
        for i in range(NO_OF_DAGS):
            trigger_dag_id = f"dynamic_dag_{i}"
            parsed_execution_date = timezone.utcnow()
            dag_model = DagModel.get_dagmodel(trigger_dag_id)
            if not dag_model:
                continue
            dag_model.set_is_paused(False)
            run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_execution_date)
            dag_run = trigger_dag(
                dag_id=trigger_dag_id,
                run_id=run_id + str(x),
                execution_date=parsed_execution_date,
                replace_microseconds=False,
            )
            dag_runs.append(dag_run)
    for i in range(NO_OF_TRIGGERS_PER_DAG):
        trigger_dags(i)

# Main DAG that will trigger DAGs
with DAG(
    "trigger_dag",
    default_args=default_args,
    schedule_interval=timedelta(days=1),  # Adjust as needed
    catchup=False,
) as dag:
    start = EmptyOperator(task_id="start")

    trigger_dags = PythonOperator(task_id="trigger_dags", python_callable=trigger)

    start >> trigger_dags
  1. Host local server (skil if you want to use some other, e.g. httpbin.org):

    1. Flask server in app.py

      from __future__ import annotations
      
      import sys
      import time
      
      from flask import Flask, Response, request
      
      app = Flask(__name__)
      
      @app.route('/', defaults={'path': ''})
      @app.route('/<path:path>', methods=['GET', 'HEAD', 'POST', 'PUT', 'DELETE'])
      def catch_all(path):
          # time.sleep(4) # uncomment to delay response
          if request.is_json:
              print(request.json, file=sys.stderr)
          
          return Response('You want path: %s' % path, status=502) # change the status if needed
      
      if __name__ == '__main__':
          app.run()
      
    2. requirements.txt

      flask==2.3.2
      python-dateutil==2.8.2
      gunicorn
      
    3. Dockerfile

      FROM python:3-alpine AS builder
       
      WORKDIR /app
       
      RUN python3 -m venv venv
      ENV VIRTUAL_ENV=/app/venv
      ENV PATH="$VIRTUAL_ENV/bin:$PATH"
       
      COPY requirements.txt .
      RUN pip install -r requirements.txt
       
      # Stage 2
      FROM python:3-alpine AS runner
       
      WORKDIR /app
       
      COPY --from=builder /app/venv venv
      COPY app.py app.py
       
      ENV VIRTUAL_ENV=/app/venv
      ENV PATH="$VIRTUAL_ENV/bin:$PATH"
      ENV FLASK_APP=app/app.py
       
      EXPOSE 8080
       
      CMD ["gunicorn", "--bind" , ":8080", "--workers", "2", "app:app"]
      
    4. Build image with: docker build . -t test-server

    5. Run with docker run -eFLASK_RUN_PORT=5000 -eFLASK_APP=[app.py](http://app.py/) --rm -p 5000:8080 test-server

  2. In files/airflow-breeze-config/variables.env put following entries:

    AIRFLOW__CORE__LAZY_DISCOVER_PROVIDERS=False
    OPENLINEAGE_URL=http://host.docker.internal:5000
    AIRFLOW__CORE__LOGGING_LEVEL=DEBUG # if you want DEBUG logs
    

example for httpbin.org would be to set below instead of OPENLINEAGE_URL:

AIRFLOW__OPENLINEAGE__TRANSPORT={"type":"http","url":"https://httpbin.org/status/401","endpoint":""}
  1. Run breeze with breeze start-airflow --executor CeleryExecutor and trigger trigger_dag . Wait until scheduler hangs and DAGs get stuck in queued/running state.

Anything else

The issue is reproducible with a considerable amount of concurrent DagRuns (100+ in my case, maybe it depends on the setup). I also did some tests with raw threading only and/or skipping OpenLineage dependencies at all (sending requests directly from on_dag_run_... hooks.

I am willing to submit a PR that potentially fixes the bug - the change is to use ProcessPoolExecutor instead of ThreadPoolExecutor. This seems to fix it completely and not break anything else.

I could not find the root cause for it eventually. My guts are saying it has something to do with threading in Python + some Celery dependency on file descriptors that produces the bug.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@JDarDagran JDarDagran added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Apr 24, 2024
@RNHTTR
Copy link
Collaborator

RNHTTR commented Apr 24, 2024

@mobuchowski can you take a look?

@mobuchowski
Copy link
Contributor

Hey @RNHTTR - this is the result of our internal investigation. We think someone knowledgeable with Celery might be able to help - if not, then we'd love to go with #39235 as this solves the issue in our tests.

@RNHTTR RNHTTR added provider:celery and removed needs-triage label for new issues that we didn't triage yet labels Apr 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants