Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear next method when clearing TIs #23929

Merged
merged 6 commits into from Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/models/taskinstance.py
Expand Up @@ -202,6 +202,7 @@ def clear_task_instances(
ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries)
ti.state = None
ti.external_executor_id = None
ti.clear_next_method_args()
session.merge(ti)

task_id_by_key[ti.dag_id][ti.run_id][ti.map_index][ti.try_number].add(ti.task_id)
Expand Down
23 changes: 23 additions & 0 deletions tests/models/test_cleartasks.py
Expand Up @@ -106,6 +106,29 @@ def test_clear_task_instances_external_executor_id(self, dag_maker):
assert ti0.state is None
assert ti0.external_executor_id is None

def test_clear_task_instances_next_method(self, dag_maker, session):
with dag_maker(
'test_clear_task_instances_next_method',
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=10),
) as dag:
EmptyOperator(task_id='task0')

ti0 = dag_maker.create_dagrun().task_instances[0]
ti0.state = State.DEFERRED
ti0.next_method = "next_method"
ti0.next_kwargs = {}

session.add(ti0)
session.commit()

clear_task_instances([ti0], session, dag=dag)

ti0.refresh_from_db()

assert ti0.next_method is None
assert ti0.next_kwargs is None

@pytest.mark.parametrize(
["state", "last_scheduling"], [(State.QUEUED, None), (State.RUNNING, DEFAULT_DATE)]
)
Expand Down