From 55a4645044a46d33dd5934a43856af0e46ba4757 Mon Sep 17 00:00:00 2001 From: Jorrick Sleijster Date: Sat, 3 Jul 2021 00:02:01 +0200 Subject: [PATCH] Fix slow (cleared) tasks being be adopted by Celery worker. (#16718) Celery executor is currently adopting anything that has ever run before and has been cleared since then. **Example of the issue:** We have a DAG that runs over 150 sensor tasks and 50 ETL tasks while having a concurrency of 3 and max_active_runs of 16. This setup is required because we want to divide the resources and we don't want this DAG to take up all the resources. What will happen is that many tasks will be in scheduled for a bit as it can't queue them due to the concurrency of 3. However, because of the current implementations, if these tasks ever run before, they would get adopted by the schedulers executor instance and become stuck forever [without this PR](https://github.com/apache/airflow/pull/16550). However, they should have never been adopted in the first place. **Contents of the PR**: 1. Tasks that are in scheduled should never have arrived at an executor. Hence, we remove the task state scheduled from the option to be adopted. 2. Given this task instance `external_executor_id` is quite important in deciding whether it is adopted, we will also reset this when we reset the state of the TaskInstance. (cherry picked from commit 554a23928efb4ff1d87d115ae2664edec3a9408c) --- airflow/jobs/scheduler_job.py | 2 +- airflow/models/taskinstance.py | 1 + tests/executors/test_celery_executor.py | 2 ++ tests/jobs/test_scheduler_job.py | 14 +++++++------- tests/models/test_cleartasks.py | 24 ++++++++++++++++++++++++ 5 files changed, 35 insertions(+), 8 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 1758ae1e3d264..fe8e0b0cb64e8 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1859,7 +1859,7 @@ def adopt_or_reset_orphaned_tasks(self, session: Session = None): self.log.info("Marked %d SchedulerJob instances as failed", num_failed) Stats.incr(self.__class__.__name__.lower() + '_end', num_failed) - resettable_states = [State.SCHEDULED, State.QUEUED, State.RUNNING] + resettable_states = [State.QUEUED, State.RUNNING] query = ( session.query(TI) .filter(TI.state.in_(resettable_states)) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index ea5a72ef92284..b99fa34ef8187 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -172,6 +172,7 @@ def clear_task_instances( # original max_tries or the last attempted try number. ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries) ti.state = State.NONE + ti.external_executor_id = None session.merge(ti) task_id_by_key[ti.dag_id][ti.execution_date][ti.try_number].add(ti.task_id) diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index e24a3dd9779a9..88ea95c4d9a93 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -329,9 +329,11 @@ def test_try_adopt_task_instances(self): ti1 = TaskInstance(task=task_1, execution_date=exec_date) ti1.external_executor_id = '231' ti1.queued_dttm = queued_dttm + ti1.state = State.QUEUED ti2 = TaskInstance(task=task_2, execution_date=exec_date) ti2.external_executor_id = '232' ti2.queued_dttm = queued_dttm + ti2.state = State.QUEUED tis = [ti1, ti2] executor = celery_executor.CeleryExecutor() diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 37ae65b2ded35..fe0b257d1f5aa 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2124,9 +2124,9 @@ def test_adopt_or_reset_orphaned_tasks(self): session=session, ) ti = dr.get_task_instance(task_id=op1.task_id, session=session) - ti.state = State.SCHEDULED + ti.state = State.QUEUED ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session) - ti2.state = State.SCHEDULED + ti2.state = State.QUEUED session.commit() processor = mock.MagicMock() @@ -2140,7 +2140,7 @@ def test_adopt_or_reset_orphaned_tasks(self): assert ti.state == State.NONE ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session) - assert ti2.state == State.SCHEDULED, "Tasks run by Backfill Jobs should not be reset" + assert ti2.state == State.QUEUED, "Tasks run by Backfill Jobs should not be reset" @parameterized.expand( [ @@ -3654,7 +3654,7 @@ def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self): session=session, ) ti = dr1.get_task_instances(session=session)[0] - ti.state = State.SCHEDULED + ti.state = State.QUEUED session.merge(ti) session.merge(dr1) session.commit() @@ -3799,12 +3799,12 @@ def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self): ti1, ti2 = dr1.get_task_instances(session=session) dr1.state = State.RUNNING - ti1.state = State.SCHEDULED + ti1.state = State.QUEUED ti1.queued_by_job_id = old_job.id session.merge(dr1) session.merge(ti1) - ti2.state = State.SCHEDULED + ti2.state = State.QUEUED ti2.queued_by_job_id = self.scheduler_job.id session.merge(ti2) session.flush() @@ -3816,7 +3816,7 @@ def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self): session.refresh(ti1) assert ti1.state is None session.refresh(ti2) - assert State.SCHEDULED == ti2.state + assert ti2.state == State.QUEUED session.rollback() if old_job.processor_agent: old_job.processor_agent.end() diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py index 1c5606e66acd8..9b8fbd0330aed 100644 --- a/tests/models/test_cleartasks.py +++ b/tests/models/test_cleartasks.py @@ -56,6 +56,7 @@ def test_clear_task_instances(self): ti0.run() ti1.run() + with create_session() as session: qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all() clear_task_instances(qry, session, dag=dag) @@ -68,6 +69,29 @@ def test_clear_task_instances(self): assert ti1.try_number == 2 assert ti1.max_tries == 3 + def test_clear_task_instances_external_executor_id(self): + dag = DAG( + 'test_clear_task_instances_external_executor_id', + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=10), + ) + task0 = DummyOperator(task_id='task0', owner='test', dag=dag) + ti0 = TI(task=task0, execution_date=DEFAULT_DATE) + ti0.state = State.SUCCESS + ti0.external_executor_id = "some_external_executor_id" + + with create_session() as session: + session.add(ti0) + session.commit() + + qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all() + clear_task_instances(qry, session, dag=dag) + + ti0.refresh_from_db() + + assert ti0.state is None + assert ti0.external_executor_id is None + def test_clear_task_instances_without_task(self): dag = DAG( 'test_clear_task_instances_without_task',