Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-39: Move some catchup logic back into DAG #15473

Closed
133 changes: 48 additions & 85 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
cast,
)

import cached_property
import jinja2
import pendulum
from croniter import croniter
Expand All @@ -65,6 +66,9 @@
from airflow.models.taskinstance import Context, TaskInstance, clear_task_instances
from airflow.security import permissions
from airflow.stats import Stats
from airflow.timetables.base import TimeRestriction, TimeTable
from airflow.timetables.interval import CronDataIntervalTimeTable, DeltaDataIntervalTimeTable
from airflow.timetables.simple import NullTimeTable, OnceTimeTable
from airflow.utils import timezone
from airflow.utils.dates import cron_presets, date_range as utils_date_range
from airflow.utils.file import correct_maybe_zipped
Expand Down Expand Up @@ -533,94 +537,53 @@ def next_dagrun_info(
"automated" DagRuns for this dag (scheduled or backfill, but not
manual)
"""
if (
self.schedule_interval == "@once" and date_last_automated_dagrun
) or self.schedule_interval is None:
# Manual trigger, or already created the run for @once, can short circuit
# XXX: The timezone.coerce_datetime calls in this function should not
# be necessary since the function annotation suggests it only accepts
# pendulum.DateTime, and someone is passing datetime.datetime into this
# function. We should fix whatever is doing that.
if self.is_subdag:
return (None, None)
next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)

if next_execution_date is None:
time_table: TimeTable = self.time_table
restriction = self._format_time_restriction()
if not self.catchup:
restriction = time_table.cancel_catchup(restriction)
next_info = time_table.next_dagrun_info(
timezone.coerce_datetime(date_last_automated_dagrun),
restriction,
)
if next_info is None:
return (None, None)

if self.schedule_interval == "@once":
# For "@once" it can be created "now"
return (next_execution_date, next_execution_date)

return (next_execution_date, self.following_schedule(next_execution_date))

def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]):
"""
Get the next execution date after the given ``date_last_automated_dagrun``, according to
schedule_interval, start_date, end_date etc. This doesn't check max active run or any other
"concurrency" type limits, it only performs calculations based on the various date and interval fields
of this dag and it's tasks.

:param date_last_automated_dagrun: The execution_date of the last scheduler or
backfill triggered run for this dag
:type date_last_automated_dagrun: pendulum.Pendulum
"""
if not self.schedule_interval or self.is_subdag:
return None

# don't schedule @once again
if self.schedule_interval == '@once' and date_last_automated_dagrun:
return None

# don't do scheduler catchup for dag's that don't have dag.catchup = True
if not (self.catchup or self.schedule_interval == '@once'):
# The logic is that we move start_date up until
# one period before, so that timezone.utcnow() is AFTER
# the period end, and the job can be created...
now = timezone.utcnow()
next_start = self.following_schedule(now)
last_start = self.previous_schedule(now)
if next_start <= now or isinstance(self.schedule_interval, timedelta):
new_start = last_start
else:
new_start = self.previous_schedule(last_start)

if self.start_date:
if new_start >= self.start_date:
self.start_date = new_start
else:
self.start_date = new_start

next_run_date = None
if not date_last_automated_dagrun:
# First run
task_start_dates = [t.start_date for t in self.tasks if t.start_date]
if task_start_dates:
next_run_date = self.normalize_schedule(min(task_start_dates))
self.log.debug("Next run date based on tasks %s", next_run_date)
return (next_info.data_interval.start, next_info.run_after)

def _format_time_restriction(self) -> TimeRestriction:
start_dates = [t.start_date for t in self.tasks if t.start_date]
if self.start_date is not None:
start_dates.append(self.start_date)
if start_dates:
restriction_earliest = timezone.coerce_datetime(min(start_dates))
else:
next_run_date = self.following_schedule(date_last_automated_dagrun)

if date_last_automated_dagrun and next_run_date:
while next_run_date <= date_last_automated_dagrun:
next_run_date = self.following_schedule(next_run_date)

# don't ever schedule prior to the dag's start_date
if self.start_date:
next_run_date = self.start_date if not next_run_date else max(next_run_date, self.start_date)
if next_run_date == self.start_date:
next_run_date = self.normalize_schedule(self.start_date)

self.log.debug("Dag start date: %s. Next run date: %s", self.start_date, next_run_date)

# Don't schedule a dag beyond its end_date (as specified by the dag param)
if next_run_date and self.end_date and next_run_date > self.end_date:
return None

# Don't schedule a dag beyond its end_date (as specified by the task params)
# Get the min task end date, which may come from the dag.default_args
task_end_dates = [t.end_date for t in self.tasks if t.end_date]
if task_end_dates and next_run_date:
min_task_end_date = min(task_end_dates)
if next_run_date > min_task_end_date:
return None

return next_run_date
restriction_earliest = None
end_dates = [t.end_date for t in self.tasks if t.end_date]
if self.end_date is not None:
end_dates.append(self.end_date)
if end_dates:
restriction_latest = timezone.coerce_datetime(max(end_dates))
else:
restriction_latest = None
return TimeRestriction(restriction_earliest, restriction_latest)

@cached_property.cached_property
def time_table(self) -> TimeTable:
interval = self.schedule_interval
if interval is None:
return NullTimeTable()
if interval == "@once":
return OnceTimeTable()
if not isinstance(interval, str):
assert isinstance(interval, (timedelta, relativedelta))
return DeltaDataIntervalTimeTable(interval)
tz = pendulum.timezone(self.timezone.name)
return CronDataIntervalTimeTable(interval, tz)

def get_run_dates(self, start_date, end_date=None):
"""
Expand Down
16 changes: 16 additions & 0 deletions airflow/timetables/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
105 changes: 105 additions & 0 deletions airflow/timetables/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import NamedTuple, Optional

from pendulum import DateTime

from airflow.typing_compat import Protocol


class DataInterval(NamedTuple):
"""A data interval for a DagRun to operate over.

The represented interval is ``[start, end)``.
"""

start: DateTime
end: DateTime


class TimeRestriction(NamedTuple):
"""A period to restrict a datetime between two values.

This is used to bound the next DagRun schedule to a time period. If the
scheduled time is earlier than ``earliest``, it is set to ``earliest``. If
the time is later than ``latest``, the DagRun is not scheduled.

Both values are inclusive; a DagRun can happen exactly at either
``earliest`` or ``latest``.
"""

earliest: Optional[DateTime]
latest: Optional[DateTime]


class DagRunInfo(NamedTuple):
"""Information to schedule a DagRun.

Instances of this will be returned by TimeTables when they are asked to
schedule a DagRun creation.
"""

run_after: DateTime
"""The earliest time this DagRun is created and its tasks scheduled."""

data_interval: DataInterval
"""The data interval this DagRun to operate over, if applicable."""

@classmethod
def exact(cls, at: DateTime) -> "DagRunInfo":
"""Represent a run on an exact time."""
return cls(run_after=at, data_interval=DataInterval(at, at))

@classmethod
def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo":
"""Represent a run on a continuous schedule.

In such a schedule, each data interval starts right after the previous
one ends, and each run is scheduled right after the interval ends. This
applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
"""
return cls(run_after=end, data_interval=DataInterval(start, end))


class TimeTable(Protocol):
"""Protocol that all TimeTable classes are expected to implement."""

def cancel_catchup(self, between: TimeRestriction) -> TimeRestriction:
"""Fix time restriction to not perform catchup."""
raise NotImplementedError()

def next_dagrun_info(
self,
last_automated_dagrun: Optional[DateTime],
between: TimeRestriction,
) -> Optional[DagRunInfo]:
"""Provide information to schedule the next DagRun.

:param last_automated_dagrun: The execution_date of the associated DAG's
last scheduled or backfilled run (manual runs not considered).
:param later_than: The next DagRun must be scheduled later than this
time. This is generally the earliest of ``DAG.start_date`` and each
``BaseOperator.start_date`` in the DAG. None means the next DagRun
can happen anytime.

:return: Information on when the next DagRun can be scheduled. None
means a DagRun will not happen. This does not mean no more runs
will be scheduled even again for this DAG; the time table can
return a DagRunInfo when asked later.
"""
raise NotImplementedError()
83 changes: 83 additions & 0 deletions airflow/timetables/interval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Optional

from pendulum import DateTime
from pendulum.tz.timezone import Timezone

from airflow.timetables.base import DagRunInfo, TimeRestriction, TimeTable
from airflow.timetables.schedules import CronSchedule, Delta, DeltaSchedule, Schedule


class DataIntervalTimeTable(TimeTable):
"""Basis for time table implementations that schedule data intervals.

This kind of time tables create periodic data intervals from an underlying
schedule representation (e.g. a cron expression, or a timedelta instance),
and schedule a DagRun at the end of each interval.
"""

_schedule: Schedule

def cancel_catchup(self, between: TimeRestriction) -> TimeRestriction:
return self._schedule.cancel_catchup(between)

def next_dagrun_info(
self,
last_automated_dagrun: Optional[DateTime],
between: TimeRestriction,
) -> Optional[DagRunInfo]:
if last_automated_dagrun is None:
# First run; schedule the run at the first available time matching
# the schedule, and retrospectively create a data interval for it.
if between.earliest is None:
return None
start = self._schedule.get_next_schedule(between.earliest)
else:
# There's a previous run. Create a data interval starting from when
# the end of the previous interval.
start = self._schedule.get_next(last_automated_dagrun)
if between.latest is not None and start > between.latest:
return None
end = self._schedule.get_next(start)
return DagRunInfo.interval(start=start, end=end)


class CronDataIntervalTimeTable(DataIntervalTimeTable):
"""Time table that schedules data intervals with a cron expression.

This corresponds to ``schedule_interval=<cron>``, where ``<cron>`` is either
a five/six-segment representation, or one of ``cron_presets``.

Don't pass ``@once`` in here; use ``OnceTimeTable`` instead.
"""

def __init__(self, cron: str, timezone: Timezone) -> None:
self._schedule = CronSchedule(cron, timezone)


class DeltaDataIntervalTimeTable(DataIntervalTimeTable):
"""Time table that schedules data intervals with a time delta.

This corresponds to ``schedule_interval=<delta>``, where ``<delta>`` is
either a ``datetime.timedelta`` or ``dateutil.relativedelta.relativedelta``
instance.
"""

def __init__(self, delta: Delta) -> None:
self._schedule = DeltaSchedule(delta)