Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CeleryKubernetesExecutor is broken in 2.1.0 #16326

Closed
dstandish opened this issue Jun 8, 2021 · 7 comments · Fixed by #16700
Closed

CeleryKubernetesExecutor is broken in 2.1.0 #16326

dstandish opened this issue Jun 8, 2021 · 7 comments · Fixed by #16700
Labels
affected_version:2.1 Issues Reported for 2.1 area:Scheduler Scheduler or dag parsing Issues kind:bug This is a clearly a bug priority:high High priority bug that should be patched quickly but does not require immediate new release
Milestone

Comments

@dstandish
Copy link
Contributor

dstandish commented Jun 8, 2021

Tested with both chart 1.1.0rc1 (i.e. main branch r.n.) and 1.0.0

in Airflow 2.1.0, scheduler does not exit immediately (this was an issue < 2.1.0), but all tasks fail like this:

2021-06-08 15:30:17,167] {scheduler_job.py:1241} ERROR - Executor reports task instance <TaskInstance: sqoop_acquisition.terminate_job_flow 2021-06-08 13:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2021-06-08 15:30:17,170] {scheduler_job.py:1241} ERROR - Executor reports task instance <TaskInstance: gsheets.state_mapping.to_s3 2021-06-08 14:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2021-06-08 15:30:17,171] {scheduler_job.py:1241} ERROR - Executor reports task instance <TaskInstance: gsheets.app_event_taxonomy.to_s3 2021-06-08 14:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2021-06-08 15:30:17,172] {scheduler_job.py:1241} ERROR - Executor reports task instance <TaskInstance: gsheets.strain_flavors.to_s3 2021-06-08 14:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2021-06-08 15:30:19,053] {scheduler_job.py:1205} INFO - Executor reports execution of reporting_8hr.dev.cannalytics.feature_duration.sql execution_date=2021-06-08 07:00:00+00:00 exited with status failed for try_number 1
[2021-06-08 15:30:19,125] {scheduler_job.py:1241} ERROR - Executor reports task instance <TaskInstance: reporting_8hr.dev.cannalytics.feature_duration.sql 2021-06-08 07:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2021-06-08 15:30:23,842] {dagrun.py:429} ERROR - Marking run <DagRun gsheets @ 2021-06-08 14:00:00+00:00: scheduled__2021-06-08T14:00:00+00:00, externally triggered: False> failed

@kaxil @jedcunningham do you see this when you run CKE? Any suggestions?

@dstandish dstandish added the kind:bug This is a clearly a bug label Jun 8, 2021
@dstandish dstandish changed the title CeleryKubernetes executor is broken CeleryKubernetes executor is broken in 2.0.2 Jun 8, 2021
@dstandish dstandish changed the title CeleryKubernetes executor is broken in 2.0.2 CeleryKubernetes executor is broken in 2.0.2 and 2.1 Jun 8, 2021
@dstandish dstandish changed the title CeleryKubernetes executor is broken in 2.0.2 and 2.1 CeleryKubernetes executor is broken in 2.0.2 and 2.1.0 Jun 8, 2021
@jedcunningham
Copy link
Member

Might be related to #13247? That is still only in v2-0-test, not stable.

@dstandish
Copy link
Contributor Author

That is still only in v2-0-test, not stable.

hmm so i think that would the issue in 2.0.2 (i forgot about that cherry pick issue)

but, it does not explain 2.1... i'm pretty sure the code for that fix made it in somehow or another. the scheduler_job_id issue is gone but instead tasks always fail like this:

[2021-06-08 15:30:17,171] {scheduler_job.py:1241} ERROR - Executor reports task instance <TaskInstance: gsheets.app_event_taxonomy.to_s3 2021-06-08 14:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?

lemme try and update the issue to focus on 2.1

@dstandish dstandish changed the title CeleryKubernetes executor is broken in 2.0.2 and 2.1.0 CeleryKubernetes executor is broken in 2.1.0 Jun 8, 2021
@dstandish dstandish changed the title CeleryKubernetes executor is broken in 2.1.0 CeleryKubernetesExecutor is broken in 2.1.0 Jun 8, 2021
@kaxil
Copy link
Member

kaxil commented Jun 8, 2021

Oh interesting :( -- the code definitely exists in 2.1 but haven't tested it yet.

Marking it for 2.1.1 if it is broken

@kaxil kaxil added this to the Airflow 2.1.1 milestone Jun 8, 2021
@jedcunningham
Copy link
Member

Cool. I was able to reproduce this with 2.1.0 also.

@jedcunningham jedcunningham added the area:Scheduler Scheduler or dag parsing Issues label Jun 8, 2021
@eladkal eladkal added the affected_version:2.1 Issues Reported for 2.1 label Jun 9, 2021
@potiuk
Copy link
Member

potiuk commented Jun 28, 2021

Hey @jedcunningham @kaxil - Is this one fixed already in 2.1.1 ? It Has 2.1.1 milestone :)?

@ashb ashb modified the milestones: Airflow 2.1.1, Airflow 2.1.2 Jun 28, 2021
@ashb ashb added the priority:high High priority bug that should be patched quickly but does not require immediate new release label Jun 28, 2021
@kaxil
Copy link
Member

kaxil commented Jun 29, 2021

I was able to reproduce it and #16700 should fix it.

Workaround until the PR is merged and release is to set PYTHONOPTIMIZE=1 as Environment Variable for the worker.

@damon09273
Copy link
Contributor

I was able to reproduce it and #16700 should fix it.

Workaround until the PR is merged and release is to set PYTHONOPTIMIZE=1 as Environment Variable for the worker.

Got it, thank you for providing information.

kaxil added a commit to astronomer/airflow that referenced this issue Jun 29, 2021
closes apache#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.
kaxil added a commit that referenced this issue Jun 29, 2021
closes #16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.
@ashb ashb modified the milestones: Airflow 2.1.2, Airflow 2.1.3 Jul 7, 2021
ashb pushed a commit that referenced this issue Jul 7, 2021
closes #16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

(cherry picked from commit 7857a9b)
jhtimmins pushed a commit to astronomer/airflow that referenced this issue Jul 9, 2021
closes apache#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

(cherry picked from commit 7857a9b)
jhtimmins pushed a commit that referenced this issue Jul 9, 2021
closes #16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

(cherry picked from commit 7857a9b)
kaxil added a commit to astronomer/airflow that referenced this issue Jul 13, 2021
closes apache#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

(cherry picked from commit 7857a9b)
(cherry picked from commit e264ef1)
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Sep 23, 2021
closes apache/airflow#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

(cherry picked from commit 7857a9bde2e189881f87fe4dc0cdce7503895c03)

GitOrigin-RevId: e264ef1eb226fbe1424d240839989e0e4dae041a
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Nov 27, 2021
closes apache/airflow#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

(cherry picked from commit 7857a9bde2e189881f87fe4dc0cdce7503895c03)

GitOrigin-RevId: e264ef1eb226fbe1424d240839989e0e4dae041a
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Mar 10, 2022
closes apache/airflow#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

GitOrigin-RevId: 7857a9bde2e189881f87fe4dc0cdce7503895c03
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Jun 4, 2022
closes apache/airflow#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

GitOrigin-RevId: 7857a9bde2e189881f87fe4dc0cdce7503895c03
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Jul 10, 2022
closes apache/airflow#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

GitOrigin-RevId: 7857a9bde2e189881f87fe4dc0cdce7503895c03
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Aug 27, 2022
closes apache/airflow#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

GitOrigin-RevId: 7857a9bde2e189881f87fe4dc0cdce7503895c03
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Oct 4, 2022
closes apache/airflow#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

GitOrigin-RevId: 7857a9bde2e189881f87fe4dc0cdce7503895c03
aglipska pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Oct 7, 2022
closes apache/airflow#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

GitOrigin-RevId: 7857a9bde2e189881f87fe4dc0cdce7503895c03
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Dec 7, 2022
closes apache/airflow#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

GitOrigin-RevId: 7857a9bde2e189881f87fe4dc0cdce7503895c03
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Jan 27, 2023
closes apache/airflow#16326

Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
which fails.

```
[2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
    self._process.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
```

We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.

GitOrigin-RevId: 7857a9bde2e189881f87fe4dc0cdce7503895c03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.1 Issues Reported for 2.1 area:Scheduler Scheduler or dag parsing Issues kind:bug This is a clearly a bug priority:high High priority bug that should be patched quickly but does not require immediate new release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants