New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Scheduler crash when executing task instances of missing DAG #20349
Conversation
airflow/jobs/scheduler_job.py
Outdated
@@ -316,6 +316,15 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = | |||
|
|||
pool_to_task_instances: DefaultDict[str, List[models.Pool]] = defaultdict(list) | |||
for task_instance in task_instances_to_examine: | |||
# If the dag is no longer in the dagbag, don't bother |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is that case possible?
The path that leads to here is:
_do_scheduling
-> _critical_section_execute_task_instances
-> _executable_task_instances_to_queued
It's odd that dagruns were created but the task eventually is "dagless". How was dagrun created if the dag is not in the dagbag?
Is this designed to handle the edge case of removing the DAG after a dagrun was created?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can hardly happen but can happen.
The way to reproduce it is to remove a dag file of a running dag and mark the task instances as failed then delete the dag from the UI. You'll observe the crashlooping. The dag must have max_active_tasks_per_dag
set
As for the change, it doesn't stop a running task from completing, it only errors out that the dag is missing.
I could actually only error when checking concurrency because that's where we accessed a dagbag and used it to check for task which can cause the scheduler to crash with nonetype attribute error.
And sorry for the misleading comment at the concurrency point. The task would get there and still print error log.
I'm considering to only have the log there. WDYT?
a2a1161
to
5ce6d00
Compare
airflow/jobs/scheduler_job.py
Outdated
# If the dag is missing, continue to the next task. | ||
if not serialized_dag: | ||
self.log.error( | ||
"DAG '%s' for taskinstance %s not found in serialized_dag table", | ||
dag_id, | ||
task_instance, | ||
) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can't find them, shouldn't these be set to failed or something, otherwise these TIs will always come up each time around the loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my testing, the scheduler continues to run the task instances even though the dag is no more.
I think we should not fail it but allow it to finish since it's not stuck or raising exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allow it to finish
If the code is not there anymore, how can the task instance execute? What am I missing here 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay, wanted to properly reproduce it before responding.
Here's a reproduction step:
Run this dag:
from airflow import DAG
from datetime import datetime
dag = DAG(
"airflow_bug",
schedule_interval="0 1 * * *",
start_date=datetime(2021, 1, 1),
max_active_runs=1,
concurrency=1,
)
for i in range(100):
@dag.task(task_id=f'mytasrk{i}')
def sleeping():
import time
time.sleep(60)
sleeping()
Once you unpause and the task start running, remove the file from the Dag folder.
Watch the scheduler logs, after some time it'll start crashing and won't recover(until the whole tasks would start failing, I think).
[2021-12-24 05:55:22,877] {scheduler_job.py:623} ERROR - Executor reports task instance <TaskInstance: airflow_bug.mytasrk97 scheduled__2021-01-01T01:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2021-12-24 05:55:22,881] {scheduler_job.py:630} ERROR - Marking task instance <TaskInstance: airflow_bug.mytasrk97 scheduled__2021-01-01T01:00:00+00:00 [queued]> as failed
Traceback (most recent call last):
File "/opt/airflow/airflow/jobs/scheduler_job.py", line 628, in _process_executor_events
task = dag.get_task(ti.task_id)
AttributeError: 'NoneType' object has no attribute 'get_task'
cc: @kaxil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error/reproduction step is not quite right, but the same idea can trigger this behaviour -- if the dag is deleted at the "right" time, this bit of the scheduler will fail.
I think in that case though we should fail the task instances as the DAG doesn't exist anymore, and as TP said, it can't run successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, if the DAG is missing entirely, we should stop the entire run from continuing because nothing afterwards would run. If I understand correctly, skipping the task instance (as this PR currently implements) means the ti would stay queued and unnecessarily be tried again (and again…), which seems suboptimal.
8ce697e
to
dd24862
Compare
dd24862
to
4b0391a
Compare
airflow/jobs/scheduler_job.py
Outdated
dag_id, | ||
task_instance, | ||
) | ||
task_instance.set_state(State.FAILED, session=session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to set the entire DAG to FAILED
here? Depending on the trigger rules, there could be downstream tasks which still attempt to execute, which will then be marked failed by the same check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there could be downstream tasks which still attempt to execute, which will then be marked failed by the same check
I think this is a good thing in this case. This code is reached because the DAG declaring those tasks is gone, so it doesn’t make sense to execute those tasks IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will opt for setting all scheduled tasks to None. I doubt that failing the DAG here will really fail it when there're tasks being executed in executor. There's a bug that when you mark a DAG as failed, it comes up again as running #16078
So I propose to set all scheduled tasks to None. The scheduler will no longer move the task instances to scheduled when the dag can no longer be found and it makes sense to set it to None instead of failing it since the task instances won't have logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good thing in this case. This code is reached because the DAG declaring those tasks is gone, so it doesn’t make sense to execute those tasks IMO.
Yeah, I agree that they won't be executable, I'm just wondering if we can mark them all as failed in a single pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
349305c
to
86f1e93
Compare
airflow/jobs/scheduler_job.py
Outdated
task_instance, | ||
) | ||
session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update( | ||
{TI.state: State.NONE}, synchronize_session='fetch' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should set it to None
! We should Mark it failed for sure. There is also TaskInstanceState.REMOVED
which partially applies and partially doesn't as it is not the task alone that is removed but the entire dag
When executing task instances, we do not check if the dag is missing in the dagbag. This PR fixes it by ignoring task instances if we can't find the dag in serialized dag table
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
3b3876c
to
3c5079f
Compare
I was testing something else with dynamic dags that has a lot of tasks. I noticed that the DAGs do have import errors at times and the task instances get failed. After some time, the dag appears again with failed task instances. I'm wondering if this PR will cause issues for users because there's no log to tell what happened to the task instances that failed. |
@ephraimbuddy, you're concerned that the message is in the scheduler logs, not something user facing (i.e. in the UI)? |
I'm particularly concerned about dynamic dags. Sometimes, they give import errors while the DAGs are still running and within some minutes, the DAG would parse fine and reappear again. |
I think honestly crashing on data entry is arguably "never" a good thing. Why the import errors would appear actually ? I am not sure if I "feel" where it comes from. I though such crashes on import might only come from not-yet-synced files (when you are using something else than git-sync - there you have atomic swaps for all files in a commit) but I cannot see how it could lead to missing DAG in serialized table and running tasks at the same time. I thought we can only have running task when we also have serialized dag. But maybe I am missing something :) Do we actually have a case that DAG is deleted while being re-generated? Maybe we should fix that instead ? BTW. AIP-43 will mean that such errors will not crash scheduler anyway. |
When executing task instances, we do not check if the dag is missing in
the dagbag. This PR fixes it by ignoring task instances if we can't find
the dag in serialized dag table
Closes: #20099
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.