Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CeleryExecutor adopts any task with an external_executor_id #15457

Closed
leonsmith opened this issue Apr 20, 2021 · 6 comments
Closed

CeleryExecutor adopts any task with an external_executor_id #15457

leonsmith opened this issue Apr 20, 2021 · 6 comments
Labels
affected_version:2.0 Issues Reported for 2.0 area:Scheduler Scheduler or dag parsing Issues kind:bug This is a clearly a bug pending-response

Comments

@leonsmith
Copy link
Contributor

leonsmith commented Apr 20, 2021

Apache Airflow version: 2.0.1
Kubernetes version (if you are using kubernetes) (use kubectl version): N/A
Environment:

  • Cloud provider or hardware configuration: AWS
  • OS (e.g. from /etc/os-release): Debian
  • Kernel (e.g. uname -a): Linux 4.14.225-168.357.amzn2.x86_64
  • Install tools:
  • Others:

What happened:

Switching to a CeleryExecutor from another executor (that makes use of external_executor_id) results in Celery adopting the task and the task staying in the queued state indefinitely.

What you expected to happen:

Celery to correctly return TaskInstances it can't adopt, allowing them to be re-submitted by airflow.

How to reproduce it:

Queue a TaskInstance in some manner but make sure it stays in the queued state and doesn't progress to running.
(You can do this via using the CeleryExecutor but not starting a worker)

Kill the Executor and adjust the external_executor_id of the queued TaskInstance to some random string
(To simulate a task being queued by a different executor of which Celery knows nothing about)

Start the CeleryExecutor & corresponding workers.
Observe that the task gets "adopted" by the CeleryExecutor as it passes the following snippet

        for ti in tis:
            if ti.external_executor_id is not None:
                celery_tasks[ti.external_executor_id] = (AsyncResult(ti.external_executor_id), ti)
            else:
                not_adopted_tis.append(ti)

Observe the task instance stays in the queued state indefinitely as the return of the following code is an empty mapping

        states_by_celery_task_id = self.bulk_state_fetcher.get_many(...)

Anything else we need to know:
N/A

@leonsmith leonsmith added the kind:bug This is a clearly a bug label Apr 20, 2021
@leonsmith leonsmith changed the title Celery Executor Adopts any task with an external_executor_id CeleryExecutor adopts any task with an external_executor_id Apr 20, 2021
@jedcunningham jedcunningham added area:Scheduler Scheduler or dag parsing Issues affected_version:2.0 Issues Reported for 2.0 labels Apr 20, 2021
@kaxil kaxil added this to the Airflow 2.0.3 milestone Apr 20, 2021
@ashb ashb modified the milestones: Airflow 2.0.3, Airflow 2.1.1 May 7, 2021
@jhtimmins
Copy link
Contributor

@leonsmith are you interested in fixing this? We can target the next release with the fix

@leonsmith
Copy link
Contributor Author

I don't mind having a go, it is pretty low on my list though as we worked around it by cancelling running tasks when switching executors.

@ephraimbuddy
Copy link
Contributor

ephraimbuddy commented Jun 17, 2021

I was not able to reproduce this in main, I suspect that it has been resolved. Can you try it on main or 2.1.0 @leonsmith

@ephraimbuddy
Copy link
Contributor

ephraimbuddy commented Jun 17, 2021

I changed the external_executor_id as you said and made sure that the task was queued before changing it.

It adopted the tasks and ran them. It wasn't stuck in queued

@Jorricks
Copy link
Contributor

It seems that part of the issue here, to be specific the following part:

Observe the task instance stays in the queued state indefinitely as the return of the following code is an empty mapping

will be fixed by this PR #16550

@kaxil kaxil modified the milestones: Airflow 2.1.1, Airflow 2.1.2 Jun 22, 2021
@ashb ashb modified the milestones: Airflow 2.1.2, Airflow 2.1.3 Jul 7, 2021
@kaxil kaxil removed this from the Airflow 2.1.3 milestone Jul 27, 2021
@kaxil
Copy link
Member

kaxil commented Jul 27, 2021

Closed by #16550 - This was already released in Airflow 2.1.1

@kaxil kaxil closed this as completed Jul 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.0 Issues Reported for 2.0 area:Scheduler Scheduler or dag parsing Issues kind:bug This is a clearly a bug pending-response
Projects
None yet
Development

No branches or pull requests

7 participants