Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error: 'daemonic processes are not allowed to have children', after upgradeing to airflow:2.0.1 #14896

Closed
ivanrezic-maistra opened this issue Mar 19, 2021 · 25 comments
Labels
affected_version:2.0 Issues Reported for 2.0 area:core kind:bug This is a clearly a bug

Comments

@ivanrezic-maistra
Copy link

Apache Airflow version: 2.0.1

Kubernetes version (if you are using kubernetes) (use kubectl version): /

Environment:

What happened:
I am using LocalExecutor, and I was using it on Apache/Airflow 1.10.12 the same way. I mean I had one PythonOperator which runs python method which runs multiprocessing job using ProcessPoolExecutor (concurrent.futures). And on earlier version it ran successfully without any problems, but now I get this error:

[2021-03-18 15:38:37,552] {taskinstance.py:1455} ERROR - daemonic processes are not allowed to have children
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 128, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/airflow/Edmond/edmond/backend/main_parts/run_algorithm_o_book.py", line 15, in run_algorithm_o_book
    alg_o_output = run_o(k_output, capacity, OModelBook, config)
  File "/home/airflow/Edmond/edmond/models/O/model.py", line 388, in run_o
    for mid_result in executor.map(_run, args):
  File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 496, in map
    timeout=timeout)
  File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 575, in map
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 575, in <listcomp>
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 466, in submit
    self._start_queue_management_thread()
  File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 427, in _start_queue_management_thread
    self._adjust_process_count()
  File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 446, in _adjust_process_count
    p.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children

What you expected to happen:
I expected it to run as it was running on Airflow 1.10.12

How to reproduce it:
Run airflow using docker-compose like this:

version: '3.8'
x-airflow-common:
  &airflow-common
  image: edmond_image
  env_file:
    - compose-services.env
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./scripts:/opt/airflow/scripts
    - ./notebooks:/home/airflow/Edmond/notebooks
    - ./data:/home/airflow/Edmond/data
  depends_on:
    - postgres
  restart: always

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8090:8080

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler

  airflow-init:
    <<: *airflow-common
    restart: on-failure
    environment:
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: 'Admin'
      _AIRFLOW_WWW_USER_PASSWORD: 'Admin'

And inside run python operator which runs ProcessPoolExecutor from concurrent.futures

Anything else we need to know:
This problem occurs every time I run python operator with multiprocessing. I have searched everywhere without any luck. There seems to be a similar error when using Celery Executor but it doesn't help (as I am using LocalExecutor) and there is no import collisions.

@ivanrezic-maistra ivanrezic-maistra added the kind:bug This is a clearly a bug label Mar 19, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Mar 19, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@raphaelauv
Copy link
Contributor

same as -> #13298

@ivanrezic-maistra
Copy link
Author

Ok, it works now, but what if I need assertions in my code? Why it worked in earlier versions but not in the current one? Is it going to be fixed?

@kdorr
Copy link

kdorr commented Apr 28, 2021

I’m running into this same issue (with Celery Executor upgrading to airflow 2.0.1). I’m concerned about the line in the stack overflow explanation for the PYTHONOPTIMIZE fix that says:

so setting the environment variable only suppresses the assertion. The problem itself doesn't go away.

What are the possible consequences of ignoring this particular assertion? Are there any updates on why this suddenly broke in 2.0?

@potiuk
Copy link
Member

potiuk commented Apr 28, 2021

You simply should not use multiprocessing inside Local Executor. This is likely going to break. The assertion is correct and you should likely change your approach

@potiuk
Copy link
Member

potiuk commented Apr 28, 2021

It broke because of optimisations implemented in airflow that make use of multiprocessing. The best way for you to proceed will be to turn your multiprocessing jobs into separate Airflow tasks

@potiuk
Copy link
Member

potiuk commented Apr 28, 2021

You can also run your python code using python operator with virtualenv - that launches a new python interpreter and you can launch multiprocessing there.

@kdorr
Copy link

kdorr commented Apr 28, 2021

Thanks!

@Frietziek
Copy link

Frietziek commented Jun 9, 2021

We have the some problem, this worked fine in Airflow 1.

I think is important to allow concurrency inside a task, because there are cases when you dont want, from a design perspective, clutter your DAG with a lot of tiny and no-neccesary-to-check-and-control-independently tasks from an workflow managment perspective from the web UI, so you want to do concurrency inside the task. There is nothing to do to fix this? We are using LocalExecutor.

@damon09273
Copy link
Contributor

damon09273 commented Jun 18, 2021

I got the same error: daemonic processes are not allowed to have children from celery worker when CeleryKubernetesExecutor was used, but this error gone when CeleryExecutor was used.
I does some search, there’s two options for fixing this:
Workaround: set PYTHONOPTIMIZE=1 to escape the assert.
The possible solution: celery worker -P threads . But there’s a problem, I can’t set the -P threads argument to the airflow command: airflow celery worker

Context:

  • apache-airflow==2.0.1
  • apache-airflow-providers-celery==1.0.1
  • celery==4.4.7
  • Python==3.7.10
[2021-06-18 08:09:46,393: ERROR/ForkPoolWorker-15] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/local_task_job.py", line 68, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.7/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 563, in start
    self._process.start()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 110, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
[2021-06-18 08:09:46,413: ERROR/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[0b6b839e-87cd-4ac4-a61d-d001e848665a] raised unexpected: AirflowException('Celery command failed on host: airflow-default-worker-3.airflow-worker.default.svc.cluster.local')
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 412, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 704, in __protected_call__
    return self.run(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 87, in execute_command
    _execute_in_fork(command_to_exec)
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 98, in _execute_in_fork
    raise AirflowException('Celery command failed on host: ' + get_hostname())
airflow.exceptions.AirflowException: Celery command failed on host: airflow-default-worker-3.airflow-worker.default.svc.cluster.local

@ahazeemi
Copy link

I got the same error: daemonic processes are not allowed to have children from celery worker when CeleryKubernetesExecutor was used, but this error gone when CeleryExecutor was used.
I does some search, there’s two options for fixing this:
Workaround: set PYTHONOPTIMIZE=1 to escape the assert.
The possible solution: celery worker -P threads . But there’s a problem, I can’t set the -P threads argument to the airflow command: airflow celery worker

Context:

  • apache-airflow==2.0.1
  • apache-airflow-providers-celery==1.0.1
  • celery==4.4.7
  • Python==3.7.10
[2021-06-18 08:09:46,393: ERROR/ForkPoolWorker-15] Failed to execute task daemonic processes are not allowed to have children.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
    args.func(args)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
    pool=args.pool,
  File "<string>", line 4, in __init__
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/local_task_job.py", line 68, in __init__
    super().__init__(*args, **kwargs)
  File "<string>", line 6, in __init__
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 97, in __init__
    self.executor = executor or ExecutorLoader.get_default_executor()
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
    cls._default_executor = cls.load_executor(executor_name)
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
    return cls.__load_celery_kubernetes_executor()
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
    kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.7/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 563, in start
    self._process.start()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 110, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
[2021-06-18 08:09:46,413: ERROR/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[0b6b839e-87cd-4ac4-a61d-d001e848665a] raised unexpected: AirflowException('Celery command failed on host: airflow-default-worker-3.airflow-worker.default.svc.cluster.local')
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 412, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 704, in __protected_call__
    return self.run(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 87, in execute_command
    _execute_in_fork(command_to_exec)
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 98, in _execute_in_fork
    raise AirflowException('Celery command failed on host: ' + get_hostname())
airflow.exceptions.AirflowException: Celery command failed on host: airflow-default-worker-3.airflow-worker.default.svc.cluster.local

@damon09273 Were you able to resolve this issue?

@kaxil
Copy link
Member

kaxil commented Jun 29, 2021

@damon09273 @ahazeemi I posted the workaround and solution for CeleryKubernetesExecutor in #16326 (comment)

@pmlewis
Copy link

pmlewis commented Jul 13, 2021

I had a similar issue using CeleryExecutor and multiprocessing. This may be a side effect of Airflow 2 forking by default to execute tasks versus creating a new Python process, which seems to be one thing that changed from Airflow 1 in CeleryExecutor and LocalExecutor. There's a config value that can control that behavior. I got around the issue by adding

execute_tasks_new_python_interpreter = True

to my airflow.cfg under [core], and without using PYTHONOPTIMIZE. It looks like CeleryExecutor and LocalExecutor will try to use this value to determine to fork or to subprocess, so maybe that will work for LocalExecutor too.

@kaxil
Copy link
Member

kaxil commented Jul 14, 2021

@pmlewis Because execute_tasks_new_python_interpreter = True will create Python interpreter for each task so it will have performance implications -- few seconds / ms atleast.

@potiuk
Copy link
Member

potiuk commented Aug 30, 2021

There is a known solution to this problem. Please use billiard https://pypi.org/project/billiard/ instead of multiprocessing. It is a fork of multiprocessing internally used in Celery and it was specifically forked by Celery developers to handle similar problems and limitations of the original multiprocessing library. Since it is used by Celery - you should have it installed already, and it is 1-1 replacement to multiprocessing (literally you can do import billiard as multiprocessing).

I saw already quite a number of confirmations that it works for people with similar problems, and I am just considering making an entry in Best Practices of Airflow to cover that piece (@ashb @kaxil - you might also be interested in that :). This is an interesting finding I had by following up a number of issues and trying to help people.

@pmlewis, @damon09273 @ahazeemi @ivanrezic-maistra - if you can also confirm that this solution works, I'd be more confident in adding best practice for that one.

@damon09273
Copy link
Contributor

damon09273 commented Sep 2, 2021

@potiuk How can I try this?
Does it is just to replace the package multiprocessing in celery executor with billiard ?

Also I want to feedback a information is that issue didn't happens in Airflow 2.1.3, I'm not sure why, because I didn't set PYTHONOPTIMIZE to 1.

@potiuk
Copy link
Member

potiuk commented Sep 2, 2021

Well, I think we are talking about two different issues with "deamonic processes" and this is the source of confusion here.

  1. The original stack trace here (by @ivanrezic-maistra ) was about using PythonOperator running "multiprocessing" using LocalExecutor - and this is the one which I wrote about in the billiard context. The issue was about using multiprocessing inside your own customised Python operators - which I saw several people solved by simply using import billiard as multiprocessing in their own code.

  2. The second issue added in the same thread by @damon09273 and @ahazeemi also mentioned by @kaxil have been merged and solved and released in Airflow 2.1.2 (see milestone at Fix CeleryKubernetesExecutor #16700) - but this one was about CeleryKubernetes Executor and that was not a "custom" code - it was Airflow itself that failed in this case.,

Even if the error message was the same, those two issues have very different root cause, and while they were somewhat hijacked here - the error 1) still requires the billiard importing (in the custom code) to be solved.

So answering the question @damon09273 - it's fine for you now. but if someone tries to use local executor and do multi-processing in their custom code within Python Operator (or writes a custom operator) then biliard instead of multiprocessing used in the custom code should solve the problem. No need to change anything in celery executor.

@chonyy
Copy link

chonyy commented Dec 16, 2021

@potiuk Thanks!
import billiard as multiprocessing works like a charm.

@eladkal
Copy link
Contributor

eladkal commented Mar 16, 2022

@potiuk I believe there is on task to do on this issue? can we close?

@potiuk
Copy link
Member

potiuk commented Mar 16, 2022

Agree.

@JonnyWaffles
Copy link
Contributor

Hi team, apologies for adding to a closed thread. I encountered the same problem when using a CeleryExecutor when a third party package (Okera in my use case) attempts multiprocessing

File "/app/.local/lib/python3.9/site-packages/okera/concurrency.py", line 16, in init
self.manager = multiprocessing.Manager()
...

File "/usr/lib/python3.9/multiprocessing/process.py", line 118, in start
assert not _current_process._config.get('daemon'),
AssertionError: daemonic processes are not allowed to have children

I am using Airflow 2.3.3 and Celery 5.2.7 running on the default airflow celery worker entrypoint. Where in the stack is the daemonic subprocess being created? When I comb through the code I see by default the workers execute tasks in a fork. Are the forked celery workers daemons by default? Is there a way I can resolve the issue without monkey patching a third party library's use of multi-processing per @potiuk 's suggestion above? I just want to better understand the problem before mucking around. Thanks for any insight you can provide!

@potiuk
Copy link
Member

potiuk commented Aug 12, 2022

This is the question on how you are deploying Airflow and whether you run the worker with daemon flag: https://github.com/apache/airflow/blob/main/airflow/cli/commands/celery_command.py#L180

I believe you can run airflow worker as non-daemon - with all the caveats that you have to take care of - like handling stdin/out, signals, switching working directory and everything elas that "daemonizing" does https://manpages.ubuntu.com/manpages/bionic/man1/daemonize.1.html

@JonnyWaffles
Copy link
Contributor

Thanks for your response! Unfortunately, I believe the issue is related to how the Celery Worker itself handles concurrency, regardless of the daemonic flag on the worker command itself. Looking more in to it I think my choices are

  1. Monkey patch the third party lib to use billiards; or
  2. change Celery's concurrency setting to use something other than multiprocessing, like eventlet

@potiuk
Copy link
Member

potiuk commented Aug 12, 2022

Thanks for your response! Unfortunately, I believe the issue is related to how the Celery Worker itself handles concurrency, regardless of the daemonic flag on the worker command itself. Looking more in to it I think my choices are

Yeah. This is actually quite likely.

  1. Monkey patch the third party lib to use billiards; or
  2. change Celery's concurrency setting to use something other than multiprocessing, like eventlet
  1. You can also vendor-in the 3rd-party library and replace the use of mutliprocessing with billiard.

@JonnyWaffles
Copy link
Contributor

You can also vendor-in the 3rd-party library and replace the use of mutliprocessing with billiard.
I really like this idea too! Thanks a bunch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.0 Issues Reported for 2.0 area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests