Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow 2.0 + Celery + Dask #7707

Closed
Nihhaar opened this issue May 26, 2021 · 3 comments
Closed

Airflow 2.0 + Celery + Dask #7707

Nihhaar opened this issue May 26, 2021 · 3 comments

Comments

@Nihhaar
Copy link

Nihhaar commented May 26, 2021

What happened:
I'm not sure whether to raise this on airflow github or here. I've been using dask in airflow 1.10 + celery and it was working fine. But now I migrated to airflow 2.0 and it stopped working (Error and stacktrace below).

What you expected to happen:
To work normally like in airflow 1.10, or atleast have a way of invoking some backcompatibility.

Minimal Complete Verifiable Example:
Airflow DAG:

import logging
from datetime import datetime, timedelta
import dask.bag as db

from airflow import DAG
from airflow.decorators import task

@task
def test_dask():
    bag = db.from_sequence([1, 2, 3, 4], partition_size=2)
    logging.info(bag.compute())

# Global variables / Config
default_args = {
    'owner': 'nihhaar',
    'depends_on_past': False,
    'retries': 0,
}

dag = DAG(
    'demo-dask-dag',
    default_args=default_args,
    schedule_interval='@once',
    start_date=datetime.strptime('2021-05-18', '%Y-%m-%d'),
    catchup=False
)

with dag:
    test_dask_task = test_dask()

Error and Stacktrace:

--------------------------------------------------------------------------------
[2021-05-26 11:49:00,678] {taskinstance.py:1043} INFO - Starting attempt 1 of 1
[2021-05-26 11:49:00,678] {taskinstance.py:1044} INFO - 
--------------------------------------------------------------------------------
[2021-05-26 11:49:00,686] {taskinstance.py:1063} INFO - Executing <Task(_PythonDecoratedOperator): test_dask> on 2021-05-18T00:00:00+00:00
[2021-05-26 11:49:00,689] {standard_task_runner.py:52} INFO - Started process 32198 to run task
[2021-05-26 11:49:00,691] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'demo-dask-dag', 'test_dask', '2021-05-18T00:00:00+00:00', '--job-id', '1071', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/dask_dag.py', '--cfg-path', '/tmp/tmp9ii13deg', '--error-file', '/tmp/tmpvs7k5h5y']
[2021-05-26 11:49:00,692] {standard_task_runner.py:77} INFO - Job 1071: Subtask test_dask
[2021-05-26 11:49:00,739] {logging_mixin.py:104} INFO - Running <TaskInstance: demo-dask-dag.test_dask 2021-05-18T00:00:00+00:00 [running]> on host ip-xxxx
[2021-05-26 11:49:00,774] {taskinstance.py:1257} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=nihhaar
AIRFLOW_CTX_DAG_ID=demo-dask-dag
AIRFLOW_CTX_TASK_ID=test_dask
AIRFLOW_CTX_EXECUTION_DATE=2021-05-18T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-05-18T00:00:00+00:00
[2021-05-26 11:49:00,791] {taskinstance.py:1455} ERROR - Pickling an AuthenticationString object is disallowed for security reasons
Traceback (most recent call last):
  File "/efs/env/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/efs/env/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/efs/env/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/efs/env/lib/python3.6/site-packages/airflow/operators/python.py", line 233, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/efs/airflow/dags/dask_dag.py", line 11, in test_dask
    logging.info(bag.compute())
  File "/efs/env/lib/python3.6/site-packages/dask/base.py", line 281, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/efs/env/lib/python3.6/site-packages/dask/base.py", line 563, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/efs/env/lib/python3.6/site-packages/dask/multiprocessing.py", line 196, in get
    pool = context.Pool(num_workers, initializer=initialize_worker_process)
  File "/usr/lib/python3.6/multiprocessing/context.py", line 119, in Pool
    context=self.get_context())
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 174, in __init__
    self._repopulate_pool()
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 239, in _repopulate_pool
    w.start()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/usr/lib/python3.6/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/lib/python3.6/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/lib/python3.6/multiprocessing/popen_spawn_posix.py", line 46, in _launch
    reduction.dump(prep_data, fp)
  File "/usr/lib/python3.6/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "/efs/env/lib/python3.6/site-packages/billiard/process.py", line 366, in __reduce__
    'Pickling an AuthenticationString object is '
TypeError: Pickling an AuthenticationString object is disallowed for security reasons
[2021-05-26 11:49:00,813] {taskinstance.py:1503} INFO - Marking task as FAILED. dag_id=demo-dask-dag, task_id=test_dask, execution_date=20210518T000000, start_date=20210526T114900, end_date=20210526T114900
[2021-05-26 11:49:00,862] {local_task_job.py:146} INFO - Task exited with return code 1

Anything else we need to know?:
Airflow 2.0 + Celery setup ran with PYTHONOPTIMIZE=1 as per advice in apache/airflow#14896

Environment:

  • Dask version: 2021.2.0
  • Python version: 3.6
  • Operating System: Ubuntu
  • Install method (conda, pip, source): pip
@Nihhaar
Copy link
Author

Nihhaar commented May 26, 2021

I know its not recommended to use celery and dask together since each of them fork their own processes. But is there a way to make this work for legacy purposes?

@quasiben
Copy link
Member

Seems like this ia an airflow issue related to auth and pickling ?

Pickling an AuthenticationString object is disallowed for security reasons

@Nihhaar
Copy link
Author

Nihhaar commented May 26, 2021

I think it is trying to pickle the process (from here) but I'm not sure why.

@Nihhaar Nihhaar closed this as completed May 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants