Skip to content

Commit

Permalink
Fix CeleryKubernetesExecutor (#16700)
Browse files Browse the repository at this point in the history
closes #16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

(cherry picked from commit 7857a9b)
  • Loading branch information
kaxil authored and ashb committed Jul 7, 2021
1 parent 5b3b7d4 commit 06f8022
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
12 changes: 10 additions & 2 deletions airflow/jobs/base_job.py
Expand Up @@ -25,6 +25,7 @@
from sqlalchemy.orm import backref, foreign, relationship
from sqlalchemy.orm.session import make_transient

from airflow.compat.functools import cached_property
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.executor_loader import ExecutorLoader
Expand Down Expand Up @@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):

def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
self.executor = executor or ExecutorLoader.get_default_executor()
self.executor_class = self.executor.__class__.__name__
if executor:
self.executor = executor
self.executor_class = executor.__class__.__name__
else:
self.executor_class = conf.get('core', 'EXECUTOR')
self.start_date = timezone.utcnow()
self.latest_heartbeat = timezone.utcnow()
if heartrate is not None:
Expand All @@ -104,6 +108,10 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)

@cached_property
def executor(self):
return ExecutorLoader.get_default_executor()

@classmethod
@provide_session
def most_recent_job(cls, session=None) -> Optional['BaseJob']:
Expand Down
7 changes: 6 additions & 1 deletion tests/jobs/test_base_job.py
Expand Up @@ -118,7 +118,12 @@ def test_heartbeat_failed(self, mock_create_session):

assert job.latest_heartbeat == when, "attribute not updated when heartbeat fails"

@conf_vars({('scheduler', 'max_tis_per_query'): '100'})
@conf_vars(
{
('scheduler', 'max_tis_per_query'): '100',
('core', 'executor'): 'SequentialExecutor',
}
)
@patch('airflow.jobs.base_job.ExecutorLoader.get_default_executor')
@patch('airflow.jobs.base_job.get_hostname')
@patch('airflow.jobs.base_job.getuser')
Expand Down

0 comments on commit 06f8022

Please sign in to comment.