Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PythonOperator failed with multiprocessing python callable #15501

Closed
SelamaAshalanore opened this issue Apr 23, 2021 · 3 comments
Closed

PythonOperator failed with multiprocessing python callable #15501

SelamaAshalanore opened this issue Apr 23, 2021 · 3 comments
Labels
kind:bug This is a clearly a bug

Comments

@SelamaAshalanore
Copy link

Apache Airflow version:
2.0.2

Kubernetes version (if you are using kubernetes) (use kubectl version):

Environment:

What happened:

Error log as follows:

*** Reading local file: /opt/airflow/logs/mp_task/mp_task/2021-04-23T09:30:12.412763+00:00/1.log
[2021-04-23 09:30:13,505] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: mp_task.mp_task 2021-04-23T09:30:12.412763+00:00 [queued]>
[2021-04-23 09:30:13,587] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: mp_task.mp_task 2021-04-23T09:30:12.412763+00:00 [queued]>
[2021-04-23 09:30:13,587] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2021-04-23 09:30:13,588] {taskinstance.py:1069} INFO - Starting attempt 1 of 1
[2021-04-23 09:30:13,588] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2021-04-23 09:30:13,641] {taskinstance.py:1089} INFO - Executing <Task(PythonOperator): mp_task> on 2021-04-23T09:30:12.412763+00:00
[2021-04-23 09:30:13,650] {standard_task_runner.py:52} INFO - Started process 1931 to run task
[2021-04-23 09:30:13,663] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'mp_task', 'mp_task', '2021-04-23T09:30:12.412763+00:00', '--job-id', '16', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpqsd8ybxy', '--error-file', '/tmp/tmpjxjr7k0d']
[2021-04-23 09:30:13,672] {standard_task_runner.py:77} INFO - Job 16: Subtask mp_task
[2021-04-23 09:30:13,826] {logging_mixin.py:104} INFO - Running <TaskInstance: mp_task.mp_task 2021-04-23T09:30:12.412763+00:00 [running]> on host bd0f9e35dd82
[2021-04-23 09:30:13,965] {taskinstance.py:1283} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=mp_task
AIRFLOW_CTX_TASK_ID=mp_task
AIRFLOW_CTX_EXECUTION_DATE=2021-04-23T09:30:12.412763+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-04-23T09:30:12.412763+00:00
[2021-04-23 09:30:13,989] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 128, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/test.py", line 23, in mp_task
    with mp.Pool(processes=mp.cpu_count()) as pool:
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 119, in Pool
    context=self.get_context())
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 174, in __init__
    self._repopulate_pool()
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 239, in _repopulate_pool
    w.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
[2021-04-23 09:30:13,996] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=mp_task, task_id=mp_task, execution_date=20210423T093012, start_date=20210423T093013, end_date=20210423T093013
[2021-04-23 09:30:14,119] {local_task_job.py:146} INFO - Task exited with return code 1

What you expected to happen:

Task runs successfully as it does with airflow 1.10.10

*** Reading local file: /usr/local/airflow/logs/mp_task/mp_task/2021-04-23T09:53:23.166977+00:00/1.log
[2021-04-23 09:53:41,693] {{taskinstance.py:669}} INFO - Dependencies all met for <TaskInstance: mp_task.mp_task 2021-04-23T09:53:23.166977+00:00 [queued]>
[2021-04-23 09:53:41,754] {{taskinstance.py:669}} INFO - Dependencies all met for <TaskInstance: mp_task.mp_task 2021-04-23T09:53:23.166977+00:00 [queued]>
[2021-04-23 09:53:41,755] {{taskinstance.py:879}} INFO - 
--------------------------------------------------------------------------------
[2021-04-23 09:53:41,755] {{taskinstance.py:880}} INFO - Starting attempt 1 of 1
[2021-04-23 09:53:41,755] {{taskinstance.py:881}} INFO - 
--------------------------------------------------------------------------------
[2021-04-23 09:53:41,806] {{taskinstance.py:900}} INFO - Executing <Task(PythonOperator): mp_task> on 2021-04-23T09:53:23.166977+00:00
[2021-04-23 09:53:41,812] {{standard_task_runner.py:53}} INFO - Started process 969 to run task
[2021-04-23 09:53:42,031] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: mp_task.mp_task 2021-04-23T09:53:23.166977+00:00 [running]> dev_02.localdomain
[2021-04-23 09:53:42,197] {{logging_mixin.py:112}} INFO - 0
[2021-04-23 09:53:42,198] {{logging_mixin.py:112}} INFO - 1
[2021-04-23 09:53:42,199] {{logging_mixin.py:112}} INFO - 2
[2021-04-23 09:53:42,201] {{logging_mixin.py:112}} INFO - 4
[2021-04-23 09:53:42,200] {{logging_mixin.py:112}} INFO - 3
[2021-04-23 09:53:42,201] {{logging_mixin.py:112}} INFO - 5
[2021-04-23 09:53:42,202] {{logging_mixin.py:112}} INFO - 6
[2021-04-23 09:53:42,202] {{logging_mixin.py:112}} INFO - 7
[2021-04-23 09:53:42,202] {{logging_mixin.py:112}} INFO - 8
[2021-04-23 09:53:42,203] {{logging_mixin.py:112}} INFO - 9
[2021-04-23 09:53:42,312] {{python_operator.py:114}} INFO - Done. Returned value was: None
[2021-04-23 09:53:42,337] {{taskinstance.py:1065}} INFO - Marking task as SUCCESS.dag_id=mp_task, task_id=mp_task, execution_date=20210423T095323, start_date=20210423T095341, end_date=20210423T095342

How to reproduce it:
test code as follows:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import multiprocessing as mp

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': '2021-01-01'
}


dag = DAG(
    'mp_task',
    default_args=default_args,
    catchup=False,
    schedule_interval=None,
    max_active_runs=1
)


def mp_task():
    num_list = range(10)
    with mp.Pool(processes=mp.cpu_count()) as pool:
        pool.map(print, num_list)


def normal_taks():
    num_list = range(10)
    for n in num_list:
        print(n)


mp_task = PythonOperator(
    task_id='mp_task',
    python_callable=mp_task,
    dag=dag
)


normal_task = PythonOperator(
    task_id='normal_taks',
    python_callable=normal_taks,
    dag=dag
)

Anything else we need to know:

@SelamaAshalanore SelamaAshalanore added the kind:bug This is a clearly a bug label Apr 23, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Apr 23, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@raphaelauv
Copy link
Contributor

same as -> #14896

@SelamaAshalanore
Copy link
Author

Got it. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

2 participants