From 83fb4bf9e4f43a4c5b052d6188bef363eac26144 Mon Sep 17 00:00:00 2001 From: Jorrick Sleijster Date: Tue, 22 Jun 2021 10:08:00 +0200 Subject: [PATCH] Fix Orphaned tasks stuck in CeleryExecutor as running (#16550) (cherry picked from commit 90f0088c5752b56177597725cc716f707f2f8456) --- airflow/executors/celery_executor.py | 4 +--- tests/executors/test_celery_executor.py | 2 ++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index bc321c665dbcf..ce383ef277ebe 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -377,9 +377,7 @@ def _check_for_stalled_adopted_tasks(self): "\n\t".join([repr(x) for x in timedout_keys]), ) for key in timedout_keys: - self.event_buffer[key] = (State.FAILED, None) - del self.tasks[key] - del self.adopted_task_timeouts[key] + self.change_state(key, State.FAILED) def debug_dump(self) -> None: """Called in response to SIGUSR2 by the scheduler""" diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index f454c5a419f23..9c94b6406ae21 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -370,10 +370,12 @@ def test_check_for_stalled_adopted_tasks(self): key_1: queued_dttm + executor.task_adoption_timeout, key_2: queued_dttm + executor.task_adoption_timeout, } + executor.running = {key_1, key_2} executor.tasks = {key_1: AsyncResult("231"), key_2: AsyncResult("232")} executor.sync() assert executor.event_buffer == {key_1: (State.FAILED, None), key_2: (State.FAILED, None)} assert executor.tasks == {} + assert executor.running == set() assert executor.adopted_task_timeouts == {}