From fd1255440670e6367fc35ea428624de4a287ec1f Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Thu, 26 May 2022 12:10:01 +0300 Subject: [PATCH] Clear next method when clearing TIs --- airflow/models/taskinstance.py | 1 + tests/models/test_cleartasks.py | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index e160153af7678..82b9634c3a10d 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -202,6 +202,7 @@ def clear_task_instances( ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries) ti.state = None ti.external_executor_id = None + ti.clear_next_method_args() session.merge(ti) task_id_by_key[ti.dag_id][ti.run_id][ti.map_index][ti.try_number].add(ti.task_id) diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py index ba08692d4215b..05ff9df45809a 100644 --- a/tests/models/test_cleartasks.py +++ b/tests/models/test_cleartasks.py @@ -106,6 +106,29 @@ def test_clear_task_instances_external_executor_id(self, dag_maker): assert ti0.state is None assert ti0.external_executor_id is None + def test_clear_task_instances_next_method(self, dag_maker, session): + with dag_maker( + 'test_clear_task_instances_next_method', + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=10), + ) as dag: + EmptyOperator(task_id='task0') + + ti0 = dag_maker.create_dagrun().task_instances[0] + ti0.state = State.DEFERRED + ti0.next_method = "next_method" + ti0.next_kwargs = {} + + session.add(ti0) + session.commit() + + clear_task_instances([ti0], session, dag=dag) + + ti0.refresh_from_db() + + assert ti0.next_method is None + assert ti0.next_kwargs is None + @pytest.mark.parametrize( ["state", "last_scheduling"], [(State.QUEUED, None), (State.RUNNING, DEFAULT_DATE)] )