Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openlineage: use ProcessPoolExecutor over ThreadPoolExecutor. #39235

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions airflow/providers/openlineage/conf.py
Expand Up @@ -104,3 +104,10 @@ def is_disabled() -> bool:
# Check if both 'transport' and 'config_path' are not present and also
# if legacy 'OPENLINEAGE_URL' environment variables is not set
return transport() == {} and config_path(True) == "" and os.getenv("OPENLINEAGE_URL", "") == ""


@cache
def dag_state_change_process_pool_size() -> int:
"""[openlineage] dag_state_change_process_pool_size."""
option = conf.getint(_CONFIG_SECTION, "dag_state_change_process_pool_size", fallback=1)
return option
5 changes: 3 additions & 2 deletions airflow/providers/openlineage/plugins/listener.py
Expand Up @@ -17,14 +17,15 @@
from __future__ import annotations

import logging
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
from typing import TYPE_CHECKING

from openlineage.client.serde import Serde

from airflow import __version__ as airflow_version
from airflow.listeners import hookimpl
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors import ExtractorManager
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState
from airflow.providers.openlineage.utils.utils import (
Expand Down Expand Up @@ -281,7 +282,7 @@ def on_failure():
@property
def executor(self):
if not self._executor:
self._executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="openlineage_")
self._executor = ProcessPoolExecutor(max_workers=conf.dag_state_change_process_pool_size())
return self._executor

@hookimpl
Expand Down
21 changes: 21 additions & 0 deletions tests/providers/openlineage/plugins/test_listener.py
Expand Up @@ -526,6 +526,27 @@ def test_listener_on_task_instance_success_do_not_call_adapter_when_disabled_ope
listener.adapter.complete_task.assert_not_called()


@pytest.mark.parametrize(
"max_workers,expected",
[
(None, 1),
("8", 8),
],
)
@mock.patch("airflow.providers.openlineage.plugins.listener.ProcessPoolExecutor", autospec=True)
def test_listener_on_dag_run_state_changes_configure_process_pool_size(mock_executor, max_workers, expected):
"""mock ProcessPoolExecutor and check if conf.dag_state_change_process_pool_size is applied to max_workers"""
listener = OpenLineageListener()
# mock ProcessPoolExecutor class
try:
with conf_vars({("openlineage", "dag_state_change_process_pool_size"): max_workers}):
listener.on_dag_run_running(mock.MagicMock(), None)
mock_executor.assert_called_once_with(max_workers=expected)
mock_executor.return_value.submit.assert_called_once()
finally:
conf.dag_state_change_process_pool_size.cache_clear()


class TestOpenLineageSelectiveEnable:
def setup_method(self):
self.dag = DAG(
Expand Down