Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Each scheduler may process same zombie tasks in HA mode #25843

Closed
1 of 2 tasks
pingzh opened this issue Aug 19, 2022 · 3 comments
Closed
1 of 2 tasks

Each scheduler may process same zombie tasks in HA mode #25843

pingzh opened this issue Aug 19, 2022 · 3 comments
Labels
area:core duplicate Issue that is duplicated kind:bug This is a clearly a bug

Comments

@pingzh
Copy link
Contributor

pingzh commented Aug 19, 2022

Apache Airflow version

main (development)

What happened

def _find_zombies(self, session: Session) -> None:
"""
Find zombie task instances, which are tasks haven't heartbeated for too long
or have a no-longer-running LocalTaskJob, and create a TaskCallbackRequest
to be handled by the DAG processor.
"""
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
zombies = (
session.query(TaskInstance, DagModel.fileloc)
.with_hint(TI, 'USE INDEX (ti_state)', dialect_name='mysql')
.join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id)
.join(DagModel, TaskInstance.dag_id == DagModel.dag_id)
.filter(TaskInstance.state == TaskInstanceState.RUNNING)
.filter(
or_(
LocalTaskJob.state != State.RUNNING,
LocalTaskJob.latest_heartbeat < limit_dttm,
)
)
.filter(TaskInstance.queued_by_job_id == self.id)
.all()
)
if zombies:
self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)
for ti, file_loc in zombies:
request = TaskCallbackRequest(
full_filepath=file_loc,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=f"Detected {ti} as zombie",
)
self.log.error("Detected zombie job: %s", request)
self.executor.send_callback(request)
Stats.incr('zombies_killed')

_find_zombies should also support HA. Each scheduler tries to find all zombie tasks. When there are more than one schedulers, it might cause issues.

What you think should happen instead

No response

How to reproduce

No response

Operating System

Apple M1 Max, version: 12.2

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

NA

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@pingzh pingzh added kind:bug This is a clearly a bug area:core labels Aug 19, 2022
@eladkal
Copy link
Contributor

eladkal commented Aug 20, 2022

Can you try 2.3.4RC1 #25846?
#24906 probably fixed it?

@potiuk
Copy link
Member

potiuk commented Aug 20, 2022

Yep. This is fixed in #24906

@potiuk potiuk closed this as completed Aug 20, 2022
@potiuk potiuk added the duplicate Issue that is duplicated label Aug 20, 2022
@pingzh
Copy link
Contributor Author

pingzh commented Aug 22, 2022

nice, thanks @eladkal and @potiuk

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core duplicate Issue that is duplicated kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

3 participants