Skip to content

Commit

Permalink
Fix slow (cleared) tasks being be adopted by Celery worker.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorricks authored and kaxil committed Jul 1, 2021
1 parent 6b7f3f0 commit 73cd253
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 8 deletions.
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ def adopt_or_reset_orphaned_tasks(self, session: Session = None):
self.log.info("Marked %d SchedulerJob instances as failed", num_failed)
Stats.incr(self.__class__.__name__.lower() + '_end', num_failed)

resettable_states = [State.SCHEDULED, State.QUEUED, State.RUNNING]
resettable_states = [State.QUEUED, State.RUNNING]
query = (
session.query(TI)
.filter(TI.state.in_(resettable_states))
Expand Down
1 change: 1 addition & 0 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def clear_task_instances(
# original max_tries or the last attempted try number.
ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries)
ti.state = State.NONE
ti.external_executor_id = None
session.merge(ti)

task_id_by_key[ti.dag_id][ti.execution_date][ti.try_number].add(ti.task_id)
Expand Down
2 changes: 2 additions & 0 deletions tests/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,11 @@ def test_try_adopt_task_instances(self):
ti1 = TaskInstance(task=task_1, execution_date=exec_date)
ti1.external_executor_id = '231'
ti1.queued_dttm = queued_dttm
ti1.state = State.QUEUED
ti2 = TaskInstance(task=task_2, execution_date=exec_date)
ti2.external_executor_id = '232'
ti2.queued_dttm = queued_dttm
ti2.state = State.QUEUED

tis = [ti1, ti2]
executor = celery_executor.CeleryExecutor()
Expand Down
14 changes: 7 additions & 7 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1396,9 +1396,9 @@ def test_adopt_or_reset_orphaned_tasks(self):
session=session,
)
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
ti.state = State.SCHEDULED
ti.state = State.QUEUED
ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
ti2.state = State.SCHEDULED
ti2.state = State.QUEUED
session.commit()

processor = mock.MagicMock()
Expand All @@ -1412,7 +1412,7 @@ def test_adopt_or_reset_orphaned_tasks(self):
assert ti.state == State.NONE

ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
assert ti2.state == State.SCHEDULED, "Tasks run by Backfill Jobs should not be reset"
assert ti2.state == State.QUEUED, "Tasks run by Backfill Jobs should not be reset"

@parameterized.expand(
[
Expand Down Expand Up @@ -2926,7 +2926,7 @@ def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self):
session=session,
)
ti = dr1.get_task_instances(session=session)[0]
ti.state = State.SCHEDULED
ti.state = State.QUEUED
session.merge(ti)
session.merge(dr1)
session.commit()
Expand Down Expand Up @@ -3071,12 +3071,12 @@ def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self):

ti1, ti2 = dr1.get_task_instances(session=session)
dr1.state = State.RUNNING
ti1.state = State.SCHEDULED
ti1.state = State.QUEUED
ti1.queued_by_job_id = old_job.id
session.merge(dr1)
session.merge(ti1)

ti2.state = State.SCHEDULED
ti2.state = State.QUEUED
ti2.queued_by_job_id = self.scheduler_job.id
session.merge(ti2)
session.flush()
Expand All @@ -3088,7 +3088,7 @@ def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self):
session.refresh(ti1)
assert ti1.state is None
session.refresh(ti2)
assert State.SCHEDULED == ti2.state
assert ti2.state == State.QUEUED
session.rollback()
if old_job.processor_agent:
old_job.processor_agent.end()
Expand Down
24 changes: 24 additions & 0 deletions tests/models/test_cleartasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test_clear_task_instances(self):

ti0.run()
ti1.run()

with create_session() as session:
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
clear_task_instances(qry, session, dag=dag)
Expand All @@ -68,6 +69,29 @@ def test_clear_task_instances(self):
assert ti1.try_number == 2
assert ti1.max_tries == 3

def test_clear_task_instances_external_executor_id(self):
dag = DAG(
'test_clear_task_instances_external_executor_id',
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=10),
)
task0 = DummyOperator(task_id='task0', owner='test', dag=dag)
ti0 = TI(task=task0, execution_date=DEFAULT_DATE)
ti0.state = State.SUCCESS
ti0.external_executor_id = "some_external_executor_id"

with create_session() as session:
session.add(ti0)
session.commit()

qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
clear_task_instances(qry, session, dag=dag)

ti0.refresh_from_db()

assert ti0.state is None
assert ti0.external_executor_id is None

def test_clear_task_instances_without_task(self):
dag = DAG(
'test_clear_task_instances_without_task',
Expand Down

0 comments on commit 73cd253

Please sign in to comment.