diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index cade4c7ad4179..b02241abe0106 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -201,6 +201,7 @@ def clear_task_instances( ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries) ti.state = None ti.external_executor_id = None + ti.clear_next_method_args() session.merge(ti) task_id_by_key[ti.dag_id][ti.run_id][ti.map_index][ti.try_number].add(ti.task_id) diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py index ba08692d4215b..05ff9df45809a 100644 --- a/tests/models/test_cleartasks.py +++ b/tests/models/test_cleartasks.py @@ -106,6 +106,29 @@ def test_clear_task_instances_external_executor_id(self, dag_maker): assert ti0.state is None assert ti0.external_executor_id is None + def test_clear_task_instances_next_method(self, dag_maker, session): + with dag_maker( + 'test_clear_task_instances_next_method', + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=10), + ) as dag: + EmptyOperator(task_id='task0') + + ti0 = dag_maker.create_dagrun().task_instances[0] + ti0.state = State.DEFERRED + ti0.next_method = "next_method" + ti0.next_kwargs = {} + + session.add(ti0) + session.commit() + + clear_task_instances([ti0], session, dag=dag) + + ti0.refresh_from_db() + + assert ti0.next_method is None + assert ti0.next_kwargs is None + @pytest.mark.parametrize( ["state", "last_scheduling"], [(State.QUEUED, None), (State.RUNNING, DEFAULT_DATE)] )