Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CeleryKubernetesExecutor #16700

Merged
merged 1 commit into from Jun 29, 2021
Merged

Fix CeleryKubernetesExecutor #16700

merged 1 commit into from Jun 29, 2021

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Jun 29, 2021

closes #16326

Currently when running celery tasks when running with CeleryKubernetesExecutor,
we see the following error. This error occurs as the BaseJob (via LocalTaskJob) tries to needlessly
instantiate a KubernetesExecutor which in turn tries to create a multiprocessing process/Manager
which fails.

[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children

We don't need to instantiate an executor when running LocalTaskJob as executor isn't used in it.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@kaxil kaxil added this to the Airflow 2.1.2 milestone Jun 29, 2021
@kaxil kaxil requested review from ashb and XD-DENG as code owners June 29, 2021 00:35
@boring-cyborg boring-cyborg bot added the area:Scheduler Scheduler or dag parsing Issues label Jun 29, 2021
Comment on lines 97 to 98
if self.__class__.__name__ != "LocalTaskJob":
self.executor = executor or ExecutorLoader.get_default_executor()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other option is to keep it None in BaseJob and override it in SchedulerJob and BackfillJob

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a7f6efb vs 376ffd1

Thoughts?

Copy link
Contributor

@dstandish dstandish Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another option would be to make executor a cached property and executor_class a property. this would simply get them out of init, which i think would be enough if they are not accessed in LocalTaskJob anyway (though if one wanted to be explicit they could override those properties with not implemented in LocalTaskJob)

@cached_property
def executor(self):
    return self._executor or  ExecutorLoader.get_default_executor()  # store init param `executor` as private attr `_executor` 

this has two benefits, one is you don't have to reference the subclass name here (as in your first approach) and the other is you don't have to make as many changes re executor in init params)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i do also wonder.... why is it that we only get this issue with CKE? is there perhaps something about the way in which CKE is designed that causes this problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i do also wonder.... why is it that we only get this issue with CKE? is there perhaps something about the way in which CKE is designed that causes this problem?

The issue is that it tries to create an instance of KubernetesExecutor inside CeleryExecutor and KubernetesExecutor creates a multiprocessing Manager & Queue in its __init__ which creates issues with Celery as explained in celery/celery#4525

airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
Comment on lines 97 to 98
if self.__class__.__name__ != "LocalTaskJob":
self.executor = executor or ExecutorLoader.get_default_executor()
Copy link
Contributor

@dstandish dstandish Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another option would be to make executor a cached property and executor_class a property. this would simply get them out of init, which i think would be enough if they are not accessed in LocalTaskJob anyway (though if one wanted to be explicit they could override those properties with not implemented in LocalTaskJob)

@cached_property
def executor(self):
    return self._executor or  ExecutorLoader.get_default_executor()  # store init param `executor` as private attr `_executor` 

this has two benefits, one is you don't have to reference the subclass name here (as in your first approach) and the other is you don't have to make as many changes re executor in init params)

Copy link
Contributor

@dstandish dstandish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know this part of the code that well but looks like when celery runs a task it runs using LocalTaskJob and in this case we don't need an executor. I offer a third option above. Small notes in this review.

Looking forward to trying this executor out at long last :)

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As dstandish mentioned.

airflow/jobs/backfill_job.py Outdated Show resolved Hide resolved
@kaxil
Copy link
Member Author

kaxil commented Jun 29, 2021

Updated the PR with cached_propery approach.

@kaxil kaxil requested review from ashb and dstandish June 29, 2021 10:17
airflow/jobs/base_job.py Outdated Show resolved Hide resolved
airflow/jobs/base_job.py Outdated Show resolved Hide resolved
airflow/jobs/base_job.py Outdated Show resolved Hide resolved
@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Jun 29, 2021
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

airflow/jobs/base_job.py Outdated Show resolved Hide resolved
@kaxil kaxil removed the full tests needed We need to run full set of tests for this PR to merge label Jun 29, 2021
closes apache#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.
@kaxil kaxil merged commit 7857a9b into apache:main Jun 29, 2021
@kaxil kaxil deleted the fix-cke branch June 29, 2021 22:39
@ashb ashb removed this from the Airflow 2.1.2 milestone Jul 7, 2021
@ashb ashb added this to the Airflow 2.1.3 milestone Jul 7, 2021
@kaxil kaxil modified the milestones: Airflow 2.1.3, Airflow 2.1.2 Jul 7, 2021
ashb pushed a commit that referenced this pull request Jul 7, 2021
closes #16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

(cherry picked from commit 7857a9b)
@potiuk potiuk mentioned this pull request Jul 7, 2021
jhtimmins pushed a commit to astronomer/airflow that referenced this pull request Jul 9, 2021
closes apache#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

(cherry picked from commit 7857a9b)
jhtimmins pushed a commit that referenced this pull request Jul 9, 2021
closes #16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

(cherry picked from commit 7857a9b)
kaxil added a commit to astronomer/airflow that referenced this pull request Jul 13, 2021
closes apache#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

(cherry picked from commit 7857a9b)
(cherry picked from commit e264ef1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CeleryKubernetesExecutor is broken in 2.1.0
4 participants