Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not queue tasks when the DAG goes missing #16182

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow/config_templates/config.yml
Expand Up @@ -1709,6 +1709,14 @@
type: float
example: ~
default: "15.0"
- name: clean_tis_without_dag_interval
description: |
How often (in seconds) to check and fail task instances and dagruns whose corresponding
DAG is missing
version_added: 2.1.1
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
type: float
example: ~
default: "15.0"
- name: scheduler_heartbeat_sec
description: |
The scheduler constantly tries to trigger new tasks (look at the
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Expand Up @@ -854,6 +854,10 @@ job_heartbeat_sec = 5
# that no longer have a matching DagRun
clean_tis_without_dagrun_interval = 15.0

# How often (in seconds) to check and fail task instances and dagruns whose corresponding
# DAG is missing
clean_tis_without_dag_interval = 15.0

# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
Expand Down
36 changes: 35 additions & 1 deletion airflow/jobs/scheduler_job.py
Expand Up @@ -549,7 +549,6 @@ def _critical_section_execute_task_instances(self, session: Session) -> int:
else:
max_tis = min(self.max_tis_per_query, self.executor.slots_available)
queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)

self._enqueue_task_instances_with_queued_state(queued_tis)
return len(queued_tis)

Expand Down Expand Up @@ -733,6 +732,10 @@ def _run_scheduler_loop(self) -> None:
conf.getfloat('scheduler', 'clean_tis_without_dagrun_interval', fallback=15.0),
self._clean_tis_without_dagrun,
)
timers.call_regular_interval(
conf.getfloat('scheduler', 'clean_tis_without_dag_interval', fallback=15.0),
self._clean_tis_without_dag,
)

for loop_count in itertools.count(start=1):
with Stats.timer() as timer:
Expand Down Expand Up @@ -807,6 +810,37 @@ def _clean_tis_without_dagrun(self, session):
raise
guard.commit()

@provide_session
def _clean_tis_without_dag(self, session: Session = None):
"""Fails task instances and DagRuns of DAGs that no longer exist in the dagbag/DB"""
states_to_check = State.unfinished - frozenset([State.NONE, State.SHUTDOWN])
tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
missing_dags = {}
dag_runs = set()

for ti in tis:
if ti.dag_id in missing_dags:
ti.set_state(State.FAILED, session=session)
continue
# Dag no longer in dagbag?
if not self.dagbag.has_dag(ti.dag_id, session=session):
ti.set_state(State.FAILED, session=session)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this (and L823) should be set to State.REMOVED? It might be clearer for debugging for the user than a failure without any logs.

dag_runs.add(ti.dag_run)
missing_dags[ti.dag_id] = [ti]
continue
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
if missing_dags:
self.log.warning(
"The following Dags are missing, therefore the DAG's "
"task instances have been set to failed: \t\n%s",
[
(f"Missing DAGs: {dag_name}", f"Failed TaskInstances: [{ti}]")
for dag_name, ti in missing_dags.items()
],
)
self.log.warning("Failing the corresponding DagRuns of the missing DAGs. DagRuns: %s", dag_runs)
for dr in dag_runs:
dr.set_state(State.FAILED)
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved

def _do_scheduling(self, session) -> int:
"""
This function is where the main scheduling decisions take places. It:
Expand Down
28 changes: 28 additions & 0 deletions airflow/models/dagbag.py
Expand Up @@ -166,6 +166,34 @@ def dag_ids(self) -> List[str]:
"""
return list(self.dags.keys())

@provide_session
def has_dag(self, dag_id: str, session: Session = None):
"""
Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
else check in the DB if the dag exists, return True, False otherwise
"""
from airflow.models.serialized_dag import SerializedDagModel

sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
dag_id=dag_id,
session=session,
)
sd_has_dag = sd_last_updated_datetime is not None
if dag_id not in self.dags:
return sd_has_dag
if dag_id not in self.dags_last_fetched:
return sd_has_dag
min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
if timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs:
return sd_has_dag
if sd_has_dag:
return True
Comment on lines +177 to +190
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should refactor this to delay the DB check until we need it -- for instance if we have the dag locally, and it was fetched less than the configured timeout already, then we don't need to ask the DB

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So something like this (pseudo-python):

        if dag_id in self.dags and timezone.utcnow() < self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs:
            return True

        sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
            dag_id=dag_id,
            session=session,
        )
        # etc ...

self.log.warning("Serialized DAG %s no longer exists", dag_id)
del self.dags[dag_id]
del self.dags_last_fetched[dag_id]
del self.dags_hash[dag_id]
return False

@provide_session
def get_dag(self, dag_id, session: Session = None):
"""
Expand Down
1 change: 1 addition & 0 deletions airflow/utils/state.py
Expand Up @@ -107,6 +107,7 @@ def color_fg(cls, state):
FAILED,
SKIPPED,
UPSTREAM_FAILED,
REMOVED,
]
)
"""
Expand Down
49 changes: 49 additions & 0 deletions tests/jobs/test_scheduler_job.py
Expand Up @@ -1475,6 +1475,55 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
assert ti.start_date == ti.end_date
assert ti.duration is not None

def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
"""This tests that if dags are missing in dagbag, then it should be failed"""
session = settings.Session()
dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})

with dag:
op1 = BashOperator(task_id='op1', bash_command='sleep 5')

# Write Dag to DB
dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
dagbag.bag_dag(dag, root_dag=dag)
dagbag.sync_to_db()

# Mock dagbag.has_dag
dagbag.has_dag = mock.MagicMock(return_value=False)

dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
execution_date=DEFAULT_DATE + timedelta(days=1),
start_date=DEFAULT_DATE + timedelta(days=1),
session=session,
)
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
ti.state = State.RUNNING
session.commit()

# This poll interval is large, bug the scheduler doesn't sleep that
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
# long, instead we hit the clean_tis_without_dag interval instead
self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30)
self.scheduler_job.dagbag = dagbag
executor = MockExecutor(do_update=False)
self.scheduler_job.executor = executor
processor = mock.MagicMock()
processor.done = False
self.scheduler_job.processor_agent = processor

with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars(
{('scheduler', 'clean_tis_without_dag_interval'): '0.001'}
):
self.scheduler_job._run_scheduler_loop()
Comment on lines +1517 to +1520
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need to run the whole scheduler loop here -- we could just call self.scheduler_job._clean_tis_without_dag() directly.

By calling scheduler_loop the only thing extra we check is that we've added this to the timer, but we can see that pretty easily.

Dunno :)


ti = dr.get_task_instance(task_id=op1.task_id, session=session)
assert ti.state == State.FAILED
assert ti.start_date is not None
assert ti.dag_run.state == State.FAILED

@mock.patch('airflow.jobs.scheduler_job.DagFileProcessorAgent')
def test_executor_end_called(self, mock_processor_agent):
"""
Expand Down
33 changes: 33 additions & 0 deletions tests/models/test_dagbag.py
Expand Up @@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"}
assert updated_ser_dag_1_update_time > ser_dag_1_update_time

@patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
def test_has_dag_method(self):
"""Test has_dag method"""
dag_id = "example_bash_operator"
with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
example_bash_op_dag = DagBag(include_examples=True).dags.get(dag_id)
SerializedDagModel.write_dag(dag=example_bash_op_dag)

dag_bag = DagBag(read_dags_from_db=True)
dag_bag.get_dag(dag_id) # Add dag to self.dags
assert dag_bag.has_dag(dag_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should put a query count assertion around this line to ensure it is 0

assert not dag_bag.has_dag("non_added_dag_id")

# Check that min_serialized_dag_fetch_interval has passed but
# dag is in SerializedDagModel and we return True
with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 6)):
assert dag_bag.has_dag(dag_id)

# We remove the dag from dag_bag.dags and ensure it returns True
dag_bag.dags.pop(dag_id)
assert not dag_bag.dags.get(dag_id)
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
assert dag_bag.has_dag(dag_id)

# We removed dag_id from dag_bag.dags, let's add it back
dag_bag.get_dag(dag_id)
# Remove the dag from SerializedDagModel
SerializedDagModel.remove_dag(dag_id=dag_id)
with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 8)):
assert dag_bag.dags[dag_id]
# Even though that we have dag in dag_bag.dags
# it's no longer in SerializedDagModel and we should return False
assert not dag_bag.has_dag(dag_id)

def test_collect_dags_from_db(self):
"""DAGs are collected from Database"""
example_dags_folder = airflow.example_dags.__path__[0]
Expand Down