Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CeleryKubernetesExecutor #16700

Merged
merged 1 commit into from Jun 29, 2021
Merged

Fix CeleryKubernetesExecutor #16700

merged 1 commit into from Jun 29, 2021

Commits on Jun 29, 2021

  1. Fix CeleryKubernetesExecutor

    closes apache#16326
    
    Currently when running celery tasks when running with ``CeleryKubernetesExecutor``,
    we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly
    instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager
    which fails.
    
    ```
    [2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children.
    Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork
        args.func(args)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
        return func(*args, **kwargs)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
        return f(*args, **kwargs)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run
        _run_task_by_selected_method(args, dag, ti)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
        _run_task_by_local_task_job(args, ti)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job
        pool=args.pool,
      File "<string>", line 4, in __init__
      File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
        manager.dispatch.init_failure(self, args, kwargs)
      File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
        with_traceback=exc_tb,
      File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
        raise exception
      File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
        return manager.original_init(*mixed[1:], **kwargs)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__
        super().__init__(*args, **kwargs)
      File "<string>", line 6, in __init__
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__
        self.executor = executor or ExecutorLoader.get_default_executor()
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor
        cls._default_executor = cls.load_executor(executor_name)
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor
        return cls.__load_celery_kubernetes_executor()
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor
        kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
      File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__
        self._manager = multiprocessing.Manager()
      File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager
        m.start()
      File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start
        self._process.start()
      File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
        'daemonic processes are not allowed to have children'
    AssertionError: daemonic processes are not allowed to have children
    ```
    
    We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.
    kaxil committed Jun 29, 2021
    Copy the full SHA
    eaccf25 View commit details
    Browse the repository at this point in the history