From 06f8022b37f121b17512ac58ef55132b22252b18 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 29 Jun 2021 23:39:34 +0100 Subject: [PATCH] Fix ``CeleryKubernetesExecutor`` (#16700) closes https://github.com/apache/airflow/issues/16326 Currently when running celery tasks when running with ``CeleryKubernetesExecutor``, we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager which fails. ``` [2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children. Traceback (most recent call last): File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork args.func(args) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper return f(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run _run_task_by_selected_method(args, dag, ti) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method _run_task_by_local_task_job(args, ti) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job pool=args.pool, File "", line 4, in __init__ File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance manager.dispatch.init_failure(self, args, kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ with_traceback=exc_tb, File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_ raise exception File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance return manager.original_init(*mixed[1:], **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__ super().__init__(*args, **kwargs) File "", line 6, in __init__ File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__ self.executor = executor or ExecutorLoader.get_default_executor() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor cls._default_executor = cls.load_executor(executor_name) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor return cls.__load_celery_kubernetes_executor() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__ self._manager = multiprocessing.Manager() File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager m.start() File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start self._process.start() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start 'daemonic processes are not allowed to have children' AssertionError: daemonic processes are not allowed to have children ``` We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it. (cherry picked from commit 7857a9bde2e189881f87fe4dc0cdce7503895c03) --- airflow/jobs/base_job.py | 12 ++++++++++-- tests/jobs/test_base_job.py | 7 ++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py index 1837c27992963..4edb692251c6b 100644 --- a/airflow/jobs/base_job.py +++ b/airflow/jobs/base_job.py @@ -25,6 +25,7 @@ from sqlalchemy.orm import backref, foreign, relationship from sqlalchemy.orm.session import make_transient +from airflow.compat.functools import cached_property from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.executors.executor_loader import ExecutorLoader @@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin): def __init__(self, executor=None, heartrate=None, *args, **kwargs): self.hostname = get_hostname() - self.executor = executor or ExecutorLoader.get_default_executor() - self.executor_class = self.executor.__class__.__name__ + if executor: + self.executor = executor + self.executor_class = executor.__class__.__name__ + else: + self.executor_class = conf.get('core', 'EXECUTOR') self.start_date = timezone.utcnow() self.latest_heartbeat = timezone.utcnow() if heartrate is not None: @@ -104,6 +108,10 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs): self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query') super().__init__(*args, **kwargs) + @cached_property + def executor(self): + return ExecutorLoader.get_default_executor() + @classmethod @provide_session def most_recent_job(cls, session=None) -> Optional['BaseJob']: diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py index 093386b3021cd..93f26309700e0 100644 --- a/tests/jobs/test_base_job.py +++ b/tests/jobs/test_base_job.py @@ -118,7 +118,12 @@ def test_heartbeat_failed(self, mock_create_session): assert job.latest_heartbeat == when, "attribute not updated when heartbeat fails" - @conf_vars({('scheduler', 'max_tis_per_query'): '100'}) + @conf_vars( + { + ('scheduler', 'max_tis_per_query'): '100', + ('core', 'executor'): 'SequentialExecutor', + } + ) @patch('airflow.jobs.base_job.ExecutorLoader.get_default_executor') @patch('airflow.jobs.base_job.get_hostname') @patch('airflow.jobs.base_job.getuser')