Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not queue tasks when the DAG goes missing #16182

Closed
wants to merge 3 commits into from

Conversation

ephraimbuddy
Copy link
Contributor

@ephraimbuddy ephraimbuddy commented May 31, 2021

Currently, if a dag goes missing, the scheduler continues to queue the task instances
until the executor reports the tasks as failed and then the scheduler would now set the state to failed.

This change ensures that tasks are not queued when the dag goes missing. Instead of waiting on the
executor to fail the task without explicit reason, the task fails here with the reason why it failed. Thanks to
this, the Pool's queued slots will be freed for other tasks to be queued

Closes: #15488


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg boring-cyborg bot added the area:Scheduler Scheduler or dag parsing Issues label May 31, 2021
@ephraimbuddy ephraimbuddy marked this pull request as ready for review May 31, 2021 12:07
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some changes to the current implementation

Additionally I'm not sure if it makes sense to check this every time through the loop -- perhaps we could get away with something like we do with SchedulerJob._clean_tis_without_dagrun -- we call that once every 15s.

What would be the effect/drawback if we only did the tidy up then?

airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
@ephraimbuddy ephraimbuddy force-pushed the dont-queue-missing-dag branch 6 times, most recently from ff88c3e to 885df4c Compare June 4, 2021 10:22
airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion but LGTM

@ephraimbuddy ephraimbuddy force-pushed the dont-queue-missing-dag branch 3 times, most recently from c9a829a to 9e1c7dc Compare June 5, 2021 11:11
airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
@jhtimmins
Copy link
Contributor

@ashb can you take another look and sign off if your requested changes have been sufficiently addressed?

@ephraimbuddy ephraimbuddy force-pushed the dont-queue-missing-dag branch 2 times, most recently from 20bc4f6 to 93ce02a Compare June 24, 2021 17:46
@ephraimbuddy ephraimbuddy added this to the Airflow 2.1.2 milestone Jun 24, 2021
@ephraimbuddy ephraimbuddy reopened this Jun 25, 2021
@ephraimbuddy ephraimbuddy force-pushed the dont-queue-missing-dag branch 4 times, most recently from df667e1 to adef9b4 Compare June 27, 2021 21:40
airflow/models/dagbag.py Outdated Show resolved Hide resolved
Currently, if a dag goes missing, the scheduler continues to queue the task instances
until the executor reports the tasks as failed and then the scheduler would now set the state properly.

This change ensures that tasks are not queued when the dag file goes missing. Instead of waiting on the
executor to fail the task without explicit reason, the task fails here with the reason why it failed. Thanks to
this, the Pool's queued slots will be freed for other tasks to be queued

fixup! Do not queue tasks when the DAG goes missing

add tests

fixup! fixup! Do not queue tasks when the DAG goes missing

Change implementation to check at regular interval instead of at every loop

fixup! Change implementation to check at regular interval instead of at every loop

fixup! fixup! Change implementation to check at regular interval instead of at every loop

fixup! fixup! fixup! Change implementation to check at regular interval instead of at every loop

change configuration name

apply review suggestions

fixup! apply review suggestions

add has_dag method to DagBag and improve missing dag fail method in scheduler

Update airflow/jobs/scheduler_job.py

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>

Fix dag_bag.has_dag behaviour

fixup! Fix dag_bag.has_dag behaviour

Apply suggestions from code review

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>

fixup! Apply suggestions from code review

fixup! fixup! Apply suggestions from code review

fixup! Do not queue tasks when the DAG goes missing
airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
Comment on lines +177 to +190
sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
dag_id=dag_id,
session=session,
)
sd_has_dag = sd_last_updated_datetime is not None
if dag_id not in self.dags:
return sd_has_dag
if dag_id not in self.dags_last_fetched:
return sd_has_dag
min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
if timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs:
return sd_has_dag
if sd_has_dag:
return True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should refactor this to delay the DB check until we need it -- for instance if we have the dag locally, and it was fetched less than the configured timeout already, then we don't need to ask the DB

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So something like this (pseudo-python):

        if dag_id in self.dags and timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs:
            return True

        sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
            dag_id=dag_id,
            session=session,
        )
        # etc ...

continue
# Dag no longer in dagbag?
if not self.dagbag.has_dag(ti.dag_id, session=session):
ti.set_state(State.FAILED, session=session)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this (and L823) should be set to State.REMOVED? It might be clearer for debugging for the user than a failure without any logs.


dag_bag = DagBag(read_dags_from_db=True)
dag_bag.get_dag(dag_id) # Add dag to self.dags
assert dag_bag.has_dag(dag_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should put a query count assertion around this line to ensure it is 0

tests/models/test_dagbag.py Outdated Show resolved Hide resolved
Comment on lines +1517 to +1520
with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars(
{('scheduler', 'clean_tis_without_dag_interval'): '0.001'}
):
self.scheduler_job._run_scheduler_loop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need to run the whole scheduler loop here -- we could just call self.scheduler_job._clean_tis_without_dag() directly.

By calling scheduler_loop the only thing extra we check is that we've added this to the timer, but we can see that pretty easily.

Dunno :)

Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
@ephraimbuddy
Copy link
Contributor Author

Hi @ashb @kaxil, I'm closing this as I can't reproduce this scenario again manually.
This PR: #16368 easily handles missing dag file and any dag that's missing is immediately failed in executor and subsequently by this PR: #15929 in scheduler.

Unlike before, I tried severally to reproduce this case in manual testing but the two PRs above are hit thus not being able to reproduce it.

I'm closing now as I believe it has been solved

@ephraimbuddy ephraimbuddy deleted the dont-queue-missing-dag branch July 6, 2021 08:33
@ashb ashb modified the milestones: Airflow 2.1.2, Airflow 2.1.3 Jul 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Dynamic DAGs that disappear end up stuck in queued state.
5 participants