diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py index 6adfd11106f28..9a99d4d340435 100644 --- a/airflow/api/common/experimental/mark_tasks.py +++ b/airflow/api/common/experimental/mark_tasks.py @@ -257,7 +257,7 @@ def get_execution_dates(dag, execution_date, future, past): dag_runs = dag.get_dagruns_between(start_date=start_date, end_date=end_date) dates = sorted({d.execution_date for d in dag_runs}) else: - dates = dag.date_range(start_date=start_date, end_date=end_date) + dates = dag.get_run_dates(start_date, end_date, align=False) return dates diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index e739162540fb4..dfd8c3c685b27 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -265,11 +265,11 @@ def dag_next_execution(args): ) print(None) else: - print(next_execution_dttm) + print(next_execution_dttm.isoformat()) for _ in range(1, args.num_executions): next_execution_dttm = dag.following_schedule(next_execution_dttm) - print(next_execution_dttm) + print(next_execution_dttm.isoformat()) else: print("[WARN] Only applicable when there is execution record found for the DAG.", file=sys.stderr) print(None) diff --git a/airflow/compat/functools.pyi b/airflow/compat/functools.pyi new file mode 100644 index 0000000000000..8dabbd60047c0 --- /dev/null +++ b/airflow/compat/functools.pyi @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This stub exists to work around false linter errors due to python/mypy#10408. +# TODO: Remove this file after the upstream fix is available in our toolchain. + +from typing import Callable, TypeVar + +T = TypeVar("T") + +def cached_property(f: Callable[..., T]) -> T: ... +def cache(f: T) -> T: ... diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 0f1a28d867344..e15ee62f51965 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -115,6 +115,10 @@ class AirflowClusterPolicyViolation(AirflowException): """Raise when there is a violation of a Cluster Policy in Dag definition""" +class AirflowTimetableInvalid(AirflowException): + """Raise when a DAG has an invalid timetable.""" + + class DagNotFound(AirflowNotFoundException): """Raise when a DAG is not available in the system""" diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py index 167aa7a337f26..880bdaa51ad3e 100644 --- a/airflow/jobs/backfill_job.py +++ b/airflow/jobs/backfill_job.py @@ -756,7 +756,7 @@ def _execute(self, session=None): start_date = self.bf_start_date # Get intervals between the start/end dates, which will turn into dag runs - run_dates = self.dag.get_run_dates(start_date=start_date, end_date=self.bf_end_date) + run_dates = self.dag.get_run_dates(start_date=start_date, end_date=self.bf_end_date, align=True) if self.run_backwards: tasks_that_depend_on_past = [t.task_id for t in self.dag.task_dict.values() if t.depends_on_past] if tasks_that_depend_on_past: diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 7b8afe44bd5b5..17ad7a7057d7b 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1244,7 +1244,7 @@ def run( start_date = start_date or self.start_date end_date = end_date or self.end_date or timezone.utcnow() - for execution_date in self.dag.date_range(start_date, end_date=end_date): + for execution_date in self.dag.get_run_dates(start_date, end_date, align=False): TaskInstance(self, execution_date).run( mark_success=mark_success, ignore_depends_on_past=(execution_date == start_date and ignore_first_depends_on_past), diff --git a/airflow/models/dag.py b/airflow/models/dag.py index e283b1475d394..c0dd158a5427f 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -47,7 +47,6 @@ import jinja2 import pendulum -from croniter import croniter from dateutil.relativedelta import relativedelta from jinja2.nativetypes import NativeEnvironment from sqlalchemy import Boolean, Column, ForeignKey, Index, Integer, String, Text, func, or_ @@ -57,6 +56,7 @@ import airflow.templates from airflow import settings, utils +from airflow.compat.functools import cached_property from airflow.configuration import conf from airflow.exceptions import AirflowException, DuplicateTaskIdFound, TaskNotFound from airflow.models.base import ID_LEN, Base @@ -69,6 +69,10 @@ from airflow.models.taskinstance import Context, TaskInstance, TaskInstanceKey, clear_task_instances from airflow.security import permissions from airflow.stats import Stats +from airflow.timetables.base import TimeRestriction, Timetable +from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable +from airflow.timetables.schedules import Schedule +from airflow.timetables.simple import NullTimetable, OnceTimetable from airflow.typing_compat import Literal, RePatternType from airflow.utils import timezone from airflow.utils.dates import cron_presets, date_range as utils_date_range @@ -455,29 +459,26 @@ def date_range( num: Optional[int] = None, end_date: Optional[datetime] = timezone.utcnow(), ) -> List[datetime]: + message = "`DAG.date_range()` is deprecated." if num is not None: - end_date = None - return utils_date_range( - start_date=start_date, end_date=end_date, num=num, delta=self.normalized_schedule_interval - ) + result = utils_date_range(start_date=start_date, num=num) + else: + message += " Please use `DAG.get_run_dates(..., align=False)` instead." + result = self.get_run_dates(start_date, end_date, align=False) + warnings.warn(message, category=DeprecationWarning, stacklevel=2) + return result def is_fixed_time_schedule(self): - """ - Figures out if the DAG schedule has a fixed time (e.g. 3 AM). - - :return: True if the schedule has a fixed time, False if not. - """ - now = datetime.now() - cron = croniter(self.normalized_schedule_interval, now) - - start = cron.get_next(datetime) - cron_next = cron.get_next(datetime) - - if cron_next.minute == start.minute and cron_next.hour == start.hour: + warnings.warn( + "`DAG.is_fixed_time_schedule()` is deprecated.", + category=DeprecationWarning, + stacklevel=2, + ) + try: + return not self.timetable._schedule._should_fix_dst + except AttributeError: return True - return False - def following_schedule(self, dttm): """ Calculates the following schedule for this dag in UTC. @@ -485,189 +486,139 @@ def following_schedule(self, dttm): :param dttm: utc datetime :return: utc datetime """ - if isinstance(self.normalized_schedule_interval, str): - # we don't want to rely on the transitions created by - # croniter as they are not always correct - dttm = pendulum.instance(dttm) - naive = timezone.make_naive(dttm, self.timezone) - cron = croniter(self.normalized_schedule_interval, naive) - - # We assume that DST transitions happen on the minute/hour - if not self.is_fixed_time_schedule(): - # relative offset (eg. every 5 minutes) - delta = cron.get_next(datetime) - naive - following = dttm.in_timezone(self.timezone) + delta - else: - # absolute (e.g. 3 AM) - naive = cron.get_next(datetime) - tz = self.timezone - following = timezone.make_aware(naive, tz) - return timezone.convert_to_utc(following) - elif self.normalized_schedule_interval is not None: - return timezone.convert_to_utc(dttm + self.normalized_schedule_interval) + current = pendulum.instance(dttm) + between = TimeRestriction(earliest=None, latest=None, catchup=True) + next_info = self.timetable.next_dagrun_info(current, between) + if next_info is None: + return None + return next_info.data_interval.start def previous_schedule(self, dttm): - """ - Calculates the previous schedule for this dag in UTC - - :param dttm: utc datetime - :return: utc datetime - """ - if isinstance(self.normalized_schedule_interval, str): - # we don't want to rely on the transitions created by - # croniter as they are not always correct - dttm = pendulum.instance(dttm) - naive = timezone.make_naive(dttm, self.timezone) - cron = croniter(self.normalized_schedule_interval, naive) - - # We assume that DST transitions happen on the minute/hour - if not self.is_fixed_time_schedule(): - # relative offset (eg. every 5 minutes) - delta = naive - cron.get_prev(datetime) - previous = dttm.in_timezone(self.timezone) - delta - else: - # absolute (e.g. 3 AM) - naive = cron.get_prev(datetime) - tz = self.timezone - previous = timezone.make_aware(naive, tz) - return timezone.convert_to_utc(previous) - elif self.normalized_schedule_interval is not None: - return timezone.convert_to_utc(dttm - self.normalized_schedule_interval) + warnings.warn( + "`DAG.previous_schedule()` is deprecated.", + category=DeprecationWarning, + stacklevel=2, + ) + try: + schedule: Schedule = self.timetable._schedule + except AttributeError: + return None + return schedule.get_prev(pendulum.instance(dttm)) def next_dagrun_info( self, date_last_automated_dagrun: Optional[pendulum.DateTime], ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]: - """ - Get information about the next DagRun of this dag after ``date_last_automated_dagrun`` -- the - execution date, and the earliest it could be scheduled - - :param date_last_automated_dagrun: The max(execution_date) of existing - "automated" DagRuns for this dag (scheduled or backfill, but not - manual) - """ - if ( - self.schedule_interval == "@once" and date_last_automated_dagrun - ) or self.schedule_interval is None: - # Manual trigger, or already created the run for @once, can short circuit + """Get information about the next DagRun of this dag after ``date_last_automated_dagrun``. + + This calculates what time interval the next DagRun should operate on + (its execution date), and when it can be scheduled, , according to the + dag's timetable, start_date, end_date, etc. This doesn't check max + active run or any other "max_active_tasks" type limits, but only + performs calculations based on the various date and interval fields of + this dag and its tasks. + + :param date_last_automated_dagrun: The ``max(execution_date)`` of + existing "automated" DagRuns for this dag (scheduled or backfill, + but not manual). + :return: A 2-tuple containing the DagRun's execution date, and the + earliest it could be scheduled. + """ + # XXX: The timezone.coerce_datetime calls in this function should not + # be necessary since the function annotation suggests it only accepts + # pendulum.DateTime, and someone is passing datetime.datetime into this + # function. We should fix whatever is doing that. + if self.is_subdag: return (None, None) - next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun) - - if next_execution_date is None: + next_info = self.timetable.next_dagrun_info( + timezone.coerce_datetime(date_last_automated_dagrun), + self._time_restriction, + ) + if next_info is None: return (None, None) - - if self.schedule_interval == "@once": - # For "@once" it can be created "now" - return (next_execution_date, next_execution_date) - - return (next_execution_date, self.following_schedule(next_execution_date)) + return (next_info.data_interval.start, next_info.run_after) def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]): - """ - Get the next execution date after the given ``date_last_automated_dagrun``, according to - schedule_interval, start_date, end_date etc. This doesn't check max active run or any other - "max_active_tasks" type limits, it only performs calculations based on the various date - and interval fields of this dag and it's tasks. - - :param date_last_automated_dagrun: The execution_date of the last scheduler or - backfill triggered run for this dag - :type date_last_automated_dagrun: pendulum.Pendulum - """ - if not self.schedule_interval or self.is_subdag: - return None - - # don't schedule @once again - if self.schedule_interval == '@once' and date_last_automated_dagrun: - return None - - # don't do scheduler catchup for dag's that don't have dag.catchup = True - if not (self.catchup or self.schedule_interval == '@once'): - # The logic is that we move start_date up until - # one period before, so that timezone.utcnow() is AFTER - # the period end, and the job can be created... - now = timezone.utcnow() - next_start = self.following_schedule(now) - last_start = self.previous_schedule(now) - if next_start <= now or isinstance(self.schedule_interval, timedelta): - new_start = last_start - else: - new_start = self.previous_schedule(last_start) - - if self.start_date: - if new_start >= self.start_date: - self.start_date = new_start - else: - self.start_date = new_start - - next_run_date = None - if not date_last_automated_dagrun: - # First run - task_start_dates = [t.start_date for t in self.tasks if t.start_date] - if task_start_dates: - next_run_date = self.normalize_schedule(min(task_start_dates)) - self.log.debug("Next run date based on tasks %s", next_run_date) + warnings.warn( + "`DAG.next_dagrun_after_date()` is deprecated. Please use `DAG.next_dagrun_info()` instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.next_dagrun_info(date_last_automated_dagrun)[0] + + @cached_property + def _time_restriction(self) -> TimeRestriction: + start_dates = [t.start_date for t in self.tasks if t.start_date] + if self.start_date is not None: + start_dates.append(self.start_date) + if start_dates: + earliest = timezone.coerce_datetime(min(start_dates)) else: - next_run_date = self.following_schedule(date_last_automated_dagrun) - - if date_last_automated_dagrun and next_run_date: - while next_run_date <= date_last_automated_dagrun: - next_run_date = self.following_schedule(next_run_date) - - # don't ever schedule prior to the dag's start_date - if self.start_date: - next_run_date = self.start_date if not next_run_date else max(next_run_date, self.start_date) - if next_run_date == self.start_date: - next_run_date = self.normalize_schedule(self.start_date) - - self.log.debug("Dag start date: %s. Next run date: %s", self.start_date, next_run_date) - - # Don't schedule a dag beyond its end_date (as specified by the dag param) - if next_run_date and self.end_date and next_run_date > self.end_date: - return None - - # Don't schedule a dag beyond its end_date (as specified by the task params) - # Get the min task end date, which may come from the dag.default_args - task_end_dates = [t.end_date for t in self.tasks if t.end_date] - if task_end_dates and next_run_date: - min_task_end_date = min(task_end_dates) - if next_run_date > min_task_end_date: - return None - - return next_run_date - - def get_run_dates(self, start_date, end_date=None): + earliest = None + end_dates = [t.end_date for t in self.tasks if t.end_date] + if self.end_date is not None: + end_dates.append(self.end_date) + if end_dates: + latest = timezone.coerce_datetime(max(end_dates)) + else: + latest = None + return TimeRestriction(earliest, latest, self.catchup) + + @cached_property + def timetable(self) -> Timetable: + interval = self.schedule_interval + if interval is None: + return NullTimetable() + if interval == "@once": + return OnceTimetable() + if isinstance(interval, (timedelta, relativedelta)): + return DeltaDataIntervalTimetable(interval) + if isinstance(interval, str): + return CronDataIntervalTimetable(interval, self.timezone) + type_name = type(interval).__name__ + raise TypeError(f"{type_name} is not a valid DAG.schedule_interval.") + + def get_run_dates(self, start_date, end_date=None, *, align: bool = True): """ Returns a list of dates between the interval received as parameter using this dag's schedule interval. Returned dates can be used for execution dates. - :param start_date: the start date of the interval + :param start_date: The start date of the interval. :type start_date: datetime - :param end_date: the end date of the interval, defaults to timezone.utcnow() + :param end_date: The end date of the interval. Defaults to ``timezone.utcnow()``. :type end_date: datetime - :return: a list of dates within the interval following the dag's schedule + :param align: Whether the first run should be delayed to "align" with + the schedule, or can happen immediately at start_date. The default is + ``True``, but subdags will ignore this value and always behave as if + this is set to ``False`` for backward compatibility. + :type align: bool + :return: A list of dates within the interval following the dag's schedule. :rtype: list """ - run_dates = [] - - using_start_date = start_date - using_end_date = end_date - - # dates for dag runs - using_start_date = using_start_date or min(t.start_date for t in self.tasks) - using_end_date = using_end_date or timezone.utcnow() - - # next run date for a subdag isn't relevant (schedule_interval for subdags - # is ignored) so we use the dag run's start date in the case of a subdag - next_run_date = self.normalize_schedule(using_start_date) if not self.is_subdag else using_start_date - - while next_run_date and next_run_date <= using_end_date: - run_dates.append(next_run_date) - next_run_date = self.following_schedule(next_run_date) - - return run_dates + if start_date is None: + start = self._time_restriction.earliest + else: + start = pendulum.instance(start_date) + if end_date is None: + end = pendulum.now(timezone.utc) + else: + end = pendulum.instance(end_date) + # HACK: Sub-DAGs are currently scheduled differently. For example, say + # the schedule is @daily and start is 2021-06-03 22:16:00, a top-level + # DAG should be first scheduled to run on midnight 2021-06-04, but a + # sub-DAG should be first scheduled to run RIGHT NOW. We can change + # this, but since the sub-DAG is going away in 3.0 anyway, let's keep + # compatibility for now and remove this entirely later. + if self.is_subdag: + align = False + return sorted(self.timetable.iter_between(start, end, align=align)) def normalize_schedule(self, dttm): - """Returns dttm + interval unless dttm is first interval then it returns dttm""" + warnings.warn( + "`DAG.normalize_schedule()` is deprecated.", + category=DeprecationWarning, + stacklevel=2, + ) following = self.following_schedule(dttm) # in case of @once @@ -854,14 +805,11 @@ def is_paused(self): @property def normalized_schedule_interval(self) -> Optional[ScheduleInterval]: - """ - Returns Normalized Schedule Interval. This is used internally by the Scheduler to - schedule DAGs. - - 1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``) - 2. If Schedule Interval is "@once" return "None" - 3. If not (1) or (2) returns schedule_interval - """ + warnings.warn( + "DAG.normalized_schedule_interval() is deprecated.", + category=DeprecationWarning, + stacklevel=2, + ) if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets: _schedule_interval = cron_presets.get(self.schedule_interval) # type: Optional[ScheduleInterval] elif self.schedule_interval == '@once': @@ -1060,10 +1008,40 @@ def set_dependency(self, upstream_task_id, downstream_task_id): """ self.get_task(upstream_task_id).set_downstream(self.get_task(downstream_task_id)) + @provide_session + def get_task_instances_before( + self, + base_date: datetime, + num: int, + *, + session: Session, + ) -> List[TaskInstance]: + """Get ``num`` task instances before (including) ``base_date``. + + The returned list may contain exactly ``num`` task instances. It can + have less if there are less than ``num`` scheduled DAG runs before + ``base_date``, or more if there are manual task runs between the + requested period, which does not count toward ``num``. + """ + min_date = ( + session.query(DagRun) + .filter( + DagRun.dag_id == self.dag_id, + DagRun.execution_date <= base_date, + DagRun.run_type != DagRunType.MANUAL, + ) + .order_by(DagRun.execution_date.desc()) + .offset(num) + .first() + ) + if min_date is None: + min_date = timezone.utc_epoch() + return self.get_task_instances(start_date=min_date, end_date=base_date, session=session) + @provide_session def get_task_instances( self, start_date=None, end_date=None, state=None, session=None - ) -> Iterable[TaskInstance]: + ) -> List[TaskInstance]: if not start_date: start_date = (timezone.utcnow() - timedelta(30)).date() start_date = timezone.make_aware(datetime.combine(start_date, datetime.min.time())) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index bd339eec348ad..3e40678a912ba 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -29,7 +29,6 @@ from datetime import datetime, timedelta from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional, Union -from croniter import CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError, croniter from sqlalchemy.exc import OperationalError from sqlalchemy.orm import Session from tabulate import tabulate @@ -40,6 +39,7 @@ AirflowClusterPolicyViolation, AirflowDagCycleException, AirflowDagDuplicatedIdException, + AirflowTimetableInvalid, SerializedDagNotFound, ) from airflow.stats import Stats @@ -393,14 +393,13 @@ def _process_modules(self, filepath, mods, file_last_changed_on_disk): dag.fileloc = filepath try: dag.is_subdag = False - if isinstance(dag.normalized_schedule_interval, str): - croniter(dag.normalized_schedule_interval) + dag.timetable.validate() self.bag_dag(dag=dag, root_dag=dag) found_dags.append(dag) found_dags += dag.subdags - except (CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError) as cron_e: + except AirflowTimetableInvalid as exception: self.log.exception("Failed to bag_dag: %s", dag.full_filepath) - self.import_errors[dag.full_filepath] = f"Invalid Cron expression: {cron_e}" + self.import_errors[dag.full_filepath] = f"Invalid timetable expression: {exception}" self.file_last_changed[dag.full_filepath] = file_last_changed_on_disk except ( AirflowDagCycleException, diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 754b5f02fff24..e2a4783dc5a1a 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -369,14 +369,14 @@ def get_previous_dagrun(self, state: Optional[str] = None, session: Session = No @provide_session def get_previous_scheduled_dagrun(self, session: Session = None) -> Optional['DagRun']: """The previous, SCHEDULED DagRun, if there is one""" - dag = self.get_dag() - return ( session.query(DagRun) .filter( DagRun.dag_id == self.dag_id, - DagRun.execution_date == dag.previous_schedule(self.execution_date), + DagRun.execution_date < self.execution_date, + DagRun.run_type != DagRunType.MANUAL, ) + .order_by(DagRun.execution_date.desc()) .first() ) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 26c134ef15f43..07b4b283f1f10 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -762,11 +762,18 @@ def get_previous_ti( # LEGACY: most likely running from unit tests if not dr: # Means that this TaskInstance is NOT being run from a DR, but from a catchup - previous_scheduled_date = dag.previous_schedule(self.execution_date) - if not previous_scheduled_date: + try: + # XXX: This uses DAG internals, but as the outer comment + # said, the block is only reached for legacy reasons for + # development code, so that's OK-ish. + schedule = dag.timetable._schedule + except AttributeError: return None - - return TaskInstance(task=self.task, execution_date=previous_scheduled_date) + dt = pendulum.instance(self.execution_date) + return TaskInstance( + task=self.task, + execution_date=schedule.get_prev(dt), + ) dr.dag = dag diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py b/airflow/ti_deps/deps/prev_dagrun_dep.py index 7a2b8f7d0c5bc..70573bfbd6e88 100644 --- a/airflow/ti_deps/deps/prev_dagrun_dep.py +++ b/airflow/ti_deps/deps/prev_dagrun_dep.py @@ -34,37 +34,37 @@ class PrevDagrunDep(BaseTIDep): @provide_session def _get_dep_statuses(self, ti, session, dep_context): if dep_context.ignore_depends_on_past: - yield self._passing_status( - reason="The context specified that the state of past DAGs could be ignored." - ) + reason = "The context specified that the state of past DAGs could be ignored." + yield self._passing_status(reason=reason) return if not ti.task.depends_on_past: yield self._passing_status(reason="The task did not have depends_on_past set.") return - # Don't depend on the previous task instance if we are the first task - dag = ti.task.dag - if dag.catchup: - if dag.previous_schedule(ti.execution_date) is None: - yield self._passing_status(reason="This task does not have a schedule or is @once") - return - if dag.previous_schedule(ti.execution_date) < ti.task.start_date: - yield self._passing_status( - reason="This task instance was the first task instance for its task." - ) - return + dr = ti.get_dagrun(session=session) + if not dr: + yield self._passing_status(reason="This task instance does not belong to a DAG.") + return + + # Don't depend on the previous task instance if we are the first task. + catchup = ti.task.dag.catchup + if catchup: + last_dagrun = dr.get_previous_scheduled_dagrun(session) else: - dr = ti.get_dagrun(session=session) - last_dagrun = dr.get_previous_dagrun(session=session) if dr else None + last_dagrun = dr.get_previous_dagrun(session=session) - if not last_dagrun: - yield self._passing_status( - reason="This task instance was the first task instance for its task." - ) - return + # First ever run for this DAG. + if not last_dagrun: + yield self._passing_status(reason="This task instance was the first task instance for its task.") + return + + # There was a DAG run, but the task wasn't active back then. + if catchup and last_dagrun.execution_date < ti.task.start_date: + yield self._passing_status(reason="This task instance was the first task instance for its task.") + return - previous_ti = ti.get_previous_ti(session=session) + previous_ti = last_dagrun.get_task_instance(ti.task_id, session=session) if not previous_ti: yield self._failing_status( reason="depends_on_past is true for this task's DAG, but the previous " diff --git a/airflow/timetables/__init__.py b/airflow/timetables/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/timetables/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py new file mode 100644 index 0000000000000..5faf9dba33413 --- /dev/null +++ b/airflow/timetables/base.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Iterator, NamedTuple, Optional + +from pendulum import DateTime + +from airflow.typing_compat import Protocol + + +class DataInterval(NamedTuple): + """A data interval for a DagRun to operate over. + + The represented interval is ``[start, end)``. + """ + + start: DateTime + end: DateTime + + +class TimeRestriction(NamedTuple): + """Restriction on when a DAG can be scheduled for a run. + + Specifically, the run must not be earlier than ``earliest``, nor later than + ``latest``. If ``catchup`` is *False*, the run must also not be earlier than + the current time, i.e. "missed" schedules are not backfilled. + + These values are generally set on the DAG or task's ``start_date``, + ``end_date``, and ``catchup`` arguments. + + Both ``earliest`` and ``latest`` are inclusive; a DAG run can happen exactly + at either point of time. + """ + + earliest: Optional[DateTime] + latest: Optional[DateTime] + catchup: bool + + +class DagRunInfo(NamedTuple): + """Information to schedule a DagRun. + + Instances of this will be returned by timetables when they are asked to + schedule a DagRun creation. + """ + + run_after: DateTime + """The earliest time this DagRun is created and its tasks scheduled.""" + + data_interval: DataInterval + """The data interval this DagRun to operate over, if applicable.""" + + @classmethod + def exact(cls, at: DateTime) -> "DagRunInfo": + """Represent a run on an exact time.""" + return cls(run_after=at, data_interval=DataInterval(at, at)) + + @classmethod + def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo": + """Represent a run on a continuous schedule. + + In such a schedule, each data interval starts right after the previous + one ends, and each run is scheduled right after the interval ends. This + applies to all schedules prior to AIP-39 except ``@once`` and ``None``. + """ + return cls(run_after=end, data_interval=DataInterval(start, end)) + + +class Timetable(Protocol): + """Protocol that all Timetable classes are expected to implement.""" + + def validate(self) -> None: + """Validate the timetable is correctly specified. + + This should raise AirflowTimetableInvalid on validation failure. + """ + raise NotImplementedError() + + def next_dagrun_info( + self, + last_automated_dagrun: Optional[DateTime], + restriction: TimeRestriction, + ) -> Optional[DagRunInfo]: + """Provide information to schedule the next DagRun. + + :param last_automated_dagrun: The ``execution_date`` of the associated + DAG's last scheduled or backfilled run (manual runs not considered). + :param restriction: Restriction to apply when scheduling the DAG run. + See documentation of :class:`TimeRestriction` for details. + + :return: Information on when the next DagRun can be scheduled. None + means a DagRun will not happen. This does not mean no more runs + will be scheduled even again for this DAG; the timetable can return + a DagRunInfo object when asked at another time. + """ + raise NotImplementedError() + + def iter_between( + self, + start: DateTime, + end: DateTime, + *, + align: bool, + ) -> Iterator[DateTime]: + """Get schedules between the *start* and *end*.""" + if start > end: + raise ValueError(f"start ({start}) > end ({end})") + between = TimeRestriction(start, end, catchup=True) + + if align: + next_info = self.next_dagrun_info(None, between) + else: + yield start + next_info = self.next_dagrun_info(start, between) + + while next_info is not None: + dagrun_start = next_info.data_interval.start + yield dagrun_start + next_info = self.next_dagrun_info(dagrun_start, between) diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py new file mode 100644 index 0000000000000..6b3a46e465a8b --- /dev/null +++ b/airflow/timetables/interval.py @@ -0,0 +1,92 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime +from typing import Any, Optional + +from pendulum import DateTime + +from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable +from airflow.timetables.schedules import CronSchedule, Delta, DeltaSchedule, Schedule + + +class _DataIntervalTimetable(Timetable): + """Basis for timetable implementations that schedule data intervals. + + This kind of timetable classes create periodic data intervals from an + underlying schedule representation (e.g. a cron expression, or a timedelta + instance), and schedule a DagRun at the end of each interval. + """ + + _schedule: Schedule + + def __eq__(self, other: Any) -> bool: + """Delegate to the schedule.""" + if not isinstance(other, _DataIntervalTimetable): + return NotImplemented + return self._schedule == other._schedule + + def validate(self) -> None: + self._schedule.validate() + + def next_dagrun_info( + self, + last_automated_dagrun: Optional[DateTime], + restriction: TimeRestriction, + ) -> Optional[DagRunInfo]: + earliest = restriction.earliest + if not restriction.catchup: + earliest = self._schedule.skip_to_latest(earliest) + if last_automated_dagrun is None: + # First run; schedule the run at the first available time matching + # the schedule, and retrospectively create a data interval for it. + if earliest is None: + return None + start = self._schedule.align(earliest) + else: + # There's a previous run. Create a data interval starting from when + # the end of the previous interval. + start = self._schedule.get_next(last_automated_dagrun) + if restriction.latest is not None and start > restriction.latest: + return None + end = self._schedule.get_next(start) + return DagRunInfo.interval(start=start, end=end) + + +class CronDataIntervalTimetable(_DataIntervalTimetable): + """Timetable that schedules data intervals with a cron expression. + + This corresponds to ``schedule_interval=``, where ```` is either + a five/six-segment representation, or one of ``cron_presets``. + + Don't pass ``@once`` in here; use ``OnceTimetable`` instead. + """ + + def __init__(self, cron: str, timezone: datetime.tzinfo) -> None: + self._schedule = CronSchedule(cron, timezone) + + +class DeltaDataIntervalTimetable(_DataIntervalTimetable): + """Timetable that schedules data intervals with a time delta. + + This corresponds to ``schedule_interval=``, where ```` is + either a ``datetime.timedelta`` or ``dateutil.relativedelta.relativedelta`` + instance. + """ + + def __init__(self, delta: Delta) -> None: + self._schedule = DeltaSchedule(delta) diff --git a/airflow/timetables/schedules.py b/airflow/timetables/schedules.py new file mode 100644 index 0000000000000..180d8bc57b501 --- /dev/null +++ b/airflow/timetables/schedules.py @@ -0,0 +1,207 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime +import typing + +from cached_property import cached_property +from croniter import CroniterBadCronError, CroniterBadDateError, croniter +from dateutil.relativedelta import relativedelta +from pendulum import DateTime + +from airflow.exceptions import AirflowTimetableInvalid +from airflow.typing_compat import Protocol +from airflow.utils.dates import cron_presets +from airflow.utils.timezone import convert_to_utc, make_aware, make_naive + +Delta = typing.Union[datetime.timedelta, relativedelta] + + +class Schedule(Protocol): + """Base protocol for schedules.""" + + def skip_to_latest(self, earliest: typing.Optional[DateTime]) -> DateTime: + """Bound the earliest time a run can be scheduled. + + This is called when ``catchup=False``. See docstring of subclasses for + exact skipping behaviour of a schedule. + """ + raise NotImplementedError() + + def validate(self) -> None: + """Validate the timetable is correctly specified. + + This should raise AirflowTimetableInvalid on validation failure. + """ + raise NotImplementedError() + + def get_next(self, current: DateTime) -> DateTime: + """Get the first schedule after the current time.""" + raise NotImplementedError() + + def get_prev(self, current: DateTime) -> DateTime: + """Get the last schedule before the current time.""" + raise NotImplementedError() + + def align(self, current: DateTime) -> DateTime: + """Align given time to the scheduled. + + For fixed schedules (e.g. every midnight); this finds the next time that + aligns to the declared time, if the given time does not align. If the + schedule is not fixed (e.g. every hour), the given time is returned. + """ + raise NotImplementedError() + + +def _is_schedule_fixed(expression: str) -> bool: + """Figures out if the schedule has a fixed time (e.g. 3 AM every day). + + :return: True if the schedule has a fixed time, False if not. + + Detection is done by "peeking" the next two cron trigger time; if the + two times have the same minute and hour value, the schedule is fixed, + and we *don't* need to perform the DST fix. + + This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00). + """ + cron = croniter(expression) + next_a = cron.get_next(datetime.datetime) + next_b = cron.get_next(datetime.datetime) + return next_b.minute == next_a.minute and next_b.hour == next_a.hour + + +class CronSchedule(Schedule): + """Schedule things from a cron expression. + + The implementation extends on croniter to add timezone awareness. This is + because crontier works only with naive timestamps, and cannot consider DST + when determining the next/previous time. + """ + + def __init__(self, expression: str, timezone: datetime.tzinfo) -> None: + self._expression = expression = cron_presets.get(expression, expression) + self._timezone = timezone + + def __eq__(self, other: typing.Any) -> bool: + """Both expression and timezone should match.""" + if not isinstance(other, CronSchedule): + return NotImplemented + return self._expression == other._expression and self._timezone == other._timezone + + def validate(self) -> None: + try: + croniter(self._expression) + except (CroniterBadCronError, CroniterBadDateError) as e: + raise AirflowTimetableInvalid(str(e)) + + @cached_property + def _should_fix_dst(self) -> bool: + # This is lazy so instantiating a schedule does not immediately raise + # an exception. Validity is checked with validate() during DAG-bagging. + return not _is_schedule_fixed(self._expression) + + def get_next(self, current: DateTime) -> DateTime: + """Get the first schedule after specified time, with DST fixed.""" + naive = make_naive(current, self._timezone) + cron = croniter(self._expression, start_time=naive) + scheduled = cron.get_next(datetime.datetime) + if not self._should_fix_dst: + return convert_to_utc(make_aware(scheduled, self._timezone)) + delta = scheduled - naive + return convert_to_utc(current.in_timezone(self._timezone) + delta) + + def get_prev(self, current: DateTime) -> DateTime: + """Get the first schedule before specified time, with DST fixed.""" + naive = make_naive(current, self._timezone) + cron = croniter(self._expression, start_time=naive) + scheduled = cron.get_prev(datetime.datetime) + if not self._should_fix_dst: + return convert_to_utc(make_aware(scheduled, self._timezone)) + delta = naive - scheduled + return convert_to_utc(current.in_timezone(self._timezone) - delta) + + def align(self, current: DateTime) -> DateTime: + """Get the next scheduled time. + + This is ``current + interval``, unless ``current`` is first interval, + then ``current`` is returned. + """ + next_time = self.get_next(current) + if self.get_prev(next_time) != current: + return next_time + return current + + def skip_to_latest(self, earliest: typing.Optional[DateTime]) -> DateTime: + """Bound the earliest time a run can be scheduled. + + The logic is that we move start_date up until one period before, so the + current time is AFTER the period end, and the job can be created... + + This is slightly different from the delta version at terminal values. + If the next schedule should start *right now*, we want the data interval + that start right now now, not the one that ends now. + """ + current_time = DateTime.utcnow() + next_start = self.get_next(current_time) + last_start = self.get_prev(current_time) + if next_start == current_time: + new_start = last_start + elif next_start > current_time: + new_start = self.get_prev(last_start) + else: + raise AssertionError("next schedule shouldn't be earlier") + if earliest is None: + return new_start + return max(new_start, earliest) + + +class DeltaSchedule(Schedule): + """Schedule things on a fixed time delta.""" + + def __init__(self, delta: Delta) -> None: + self._delta = delta + + def __eq__(self, other: typing.Any) -> bool: + """The offset should match.""" + if not isinstance(other, DeltaSchedule): + return NotImplemented + return self._delta == other._delta + + def validate(self) -> None: + pass # TODO: Check the delta is positive? + + def get_next(self, current: DateTime) -> DateTime: + return convert_to_utc(current + self._delta) + + def get_prev(self, current: DateTime) -> DateTime: + return convert_to_utc(current - self._delta) + + def align(self, current: DateTime) -> DateTime: + return current + + def skip_to_latest(self, earliest: typing.Optional[DateTime]) -> DateTime: + """Bound the earliest time a run can be scheduled. + + The logic is that we move start_date up until one period before, so the + current time is AFTER the period end, and the job can be created... + + This is slightly different from the cron version at terminal values. + """ + new_start = self.get_prev(DateTime.utcnow()) + if earliest is None: + return new_start + return max(new_start, earliest) diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py new file mode 100644 index 0000000000000..b6208a646542c --- /dev/null +++ b/airflow/timetables/simple.py @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any, Optional + +from pendulum import DateTime + +from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable + + +class NullTimetable(Timetable): + """Timetable that never schedules anything. + + This corresponds to ``schedule_interval=None``. + """ + + def __eq__(self, other: Any) -> bool: + """As long as *other* is of the same type.""" + if not isinstance(other, NullTimetable): + return NotImplemented + return True + + def validate(self) -> None: + pass + + def next_dagrun_info( + self, + last_automated_dagrun: Optional[DateTime], + restriction: TimeRestriction, + ) -> Optional[DagRunInfo]: + return None + + +class OnceTimetable(Timetable): + """Timetable that schedules the execution once as soon as possible. + + This corresponds to ``schedule_interval="@once"``. + """ + + def __eq__(self, other: Any) -> bool: + """As long as *other* is of the same type.""" + if not isinstance(other, OnceTimetable): + return NotImplemented + return True + + def validate(self) -> None: + pass + + def next_dagrun_info( + self, + last_automated_dagrun: Optional[DateTime], + restriction: TimeRestriction, + ) -> Optional[DagRunInfo]: + if last_automated_dagrun is not None: + return None # Already run, no more scheduling. + if restriction.earliest is None: # No start date, won't run. + return None + # "@once" always schedule to the start_date determined by the DAG and + # tasks, regardless of catchup or not. This has been the case since 1.10 + # and we're inheriting it. See AIRFLOW-1928. + run_after = restriction.earliest + if restriction.latest is not None and run_after > restriction.latest: + return None + return DagRunInfo.exact(run_after) diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py index 11690989edd04..30771b1b2c360 100644 --- a/airflow/utils/dates.py +++ b/airflow/utils/dates.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. +import warnings from datetime import datetime, timedelta from typing import Dict, List, Optional, Union @@ -72,6 +73,12 @@ def date_range( :param delta: step length. It can be datetime.timedelta or cron expression as string :type delta: datetime.timedelta or str or dateutil.relativedelta """ + warnings.warn( + "`airflow.utils.dates.date_range()` is deprecated. Please use `airflow.timetables`.", + category=DeprecationWarning, + stacklevel=2, + ) + if not delta: return [] if end_date: diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index 798c723da8723..67c3b26fd2e1f 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -17,6 +17,7 @@ # under the License. # import datetime as dt +from typing import Optional, Union import pendulum from pendulum.datetime import DateTime @@ -172,3 +173,14 @@ def parse(string: str, timezone=None) -> DateTime: :param timezone: the timezone """ return pendulum.parse(string, tz=timezone or TIMEZONE, strict=False) # type: ignore + + +def coerce_datetime(v: Union[None, dt.datetime, DateTime]) -> Optional[DateTime]: + """Convert whatever is passed in to ``pendulum.DateTime``.""" + if v is None: + return None + if isinstance(v, DateTime): + return v + if v.tzinfo is None: + v = make_aware(v) + return pendulum.instance(v) diff --git a/airflow/www/views.py b/airflow/www/views.py index 39e2e86794dc3..760d736040f9f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2334,21 +2334,18 @@ def duration(self, session=None): except airflow.exceptions.SerializedDagNotFound: dag = None - base_date = request.args.get('base_date') - num_runs = request.args.get('num_runs', default=default_dag_run, type=int) - if dag is None: flash(f'DAG "{dag_id}" seems to be missing.', "error") return redirect(url_for('Airflow.index')) + base_date = request.args.get('base_date') + num_runs = request.args.get('num_runs', default=default_dag_run, type=int) + if base_date: base_date = timezone.parse(base_date) else: base_date = dag.get_latest_execution_date() or timezone.utcnow() - dates = dag.date_range(base_date, num=-abs(num_runs)) - min_date = dates[0] if dates else timezone.utc_epoch() - root = request.args.get('root') if root: dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False) @@ -2364,7 +2361,11 @@ def duration(self, session=None): x_points = defaultdict(list) cumulative_y = defaultdict(list) - task_instances = dag.get_task_instances(start_date=min_date, end_date=base_date) + task_instances = dag.get_task_instances_before(base_date, num_runs, session=session) + if task_instances: + min_date = task_instances[0].execution_date + else: + min_date = timezone.utc_epoch() ti_fails = ( session.query(TaskFail) .filter( @@ -2468,9 +2469,6 @@ def tries(self, session=None): else: base_date = dag.get_latest_execution_date() or timezone.utcnow() - dates = dag.date_range(base_date, num=-abs(num_runs)) - min_date = dates[0] if dates else timezone.utc_epoch() - root = request.args.get('root') if root: dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False) @@ -2484,10 +2482,11 @@ def tries(self, session=None): chart_attr=self.line_chart_attr, ) + tis = dag.get_task_instances_before(base_date, num_runs, session=session) for task in dag.tasks: y_points = [] x_points = [] - for ti in task.get_task_instances(start_date=min_date, end_date=base_date): + for ti in tis: dttm = wwwutils.epoch(ti.execution_date) x_points.append(dttm) # y value should reflect completed tries to have a 0 baseline. @@ -2495,7 +2494,6 @@ def tries(self, session=None): if x_points: chart.add_serie(name=task.task_id, x=x_points, y=y_points) - tis = dag.get_task_instances(start_date=min_date, end_date=base_date) tries = sorted({ti.try_number for ti in tis}) max_date = max(ti.execution_date for ti in tis) if tries else None chart.create_y_axis('yAxis', format='.02f', custom_format=False, label='Tries') @@ -2543,13 +2541,12 @@ def landing_times(self, session=None): else: base_date = dag.get_latest_execution_date() or timezone.utcnow() - dates = dag.date_range(base_date, num=-abs(num_runs)) - min_date = dates[0] if dates else timezone.utc_epoch() - root = request.args.get('root') if root: dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False) + tis = dag.get_task_instances_before(base_date, num_runs, session=session) + chart_height = wwwutils.get_chart_height(dag) chart = nvd3.lineChart( name="lineChart", x_is_date=True, height=chart_height, chart_attr=self.line_chart_attr @@ -2560,7 +2557,7 @@ def landing_times(self, session=None): task_id = task.task_id y_points[task_id] = [] x_points[task_id] = [] - for ti in task.get_task_instances(start_date=min_date, end_date=base_date): + for ti in tis: ts = ti.execution_date if dag.schedule_interval and dag.following_schedule(ts): ts = dag.following_schedule(ts) @@ -2584,7 +2581,6 @@ def landing_times(self, session=None): y=scale_time_units(y_points[task_id], y_unit), ) - tis = dag.get_task_instances(start_date=min_date, end_date=base_date) dates = sorted({ti.execution_date for ti in tis}) max_date = max(ti.execution_date for ti in tis) if dates else None diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index df2ea76f51cb4..4a510f88bb66b 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -298,10 +298,15 @@ def test_next_execution(self): # The details below is determined by the schedule_interval of example DAGs now = DEFAULT_DATE - expected_output = [str(now + timedelta(days=1)), str(now + timedelta(hours=4)), "None", "None"] + expected_output = [ + (now + timedelta(days=1)).isoformat(), + (now + timedelta(hours=4)).isoformat(), + "None", + "None", + ] expected_output_2 = [ - str(now + timedelta(days=1)) + os.linesep + str(now + timedelta(days=2)), - str(now + timedelta(hours=4)) + os.linesep + str(now + timedelta(hours=8)), + (now + timedelta(days=1)).isoformat() + os.linesep + (now + timedelta(days=2)).isoformat(), + (now + timedelta(hours=4)).isoformat() + os.linesep + (now + timedelta(hours=8)).isoformat(), "None", "None", ] diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index 1bcc3fc94ccaf..e02dc672ad6bf 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -1362,8 +1362,8 @@ def test_update_counters(self): session.close() def test_dag_get_run_dates(self): - def get_test_dag_for_backfill(schedule_interval=None): - dag = DAG(dag_id='test_get_dates', start_date=DEFAULT_DATE, schedule_interval=schedule_interval) + def get_test_dag_for_backfill(): + dag = DAG(dag_id='test_get_dates', start_date=DEFAULT_DATE, schedule_interval="@hourly") DummyOperator( task_id='dummy', dag=dag, @@ -1372,9 +1372,13 @@ def get_test_dag_for_backfill(schedule_interval=None): return dag test_dag = get_test_dag_for_backfill() - assert [DEFAULT_DATE] == test_dag.get_run_dates(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + assert [DEFAULT_DATE] == test_dag.get_run_dates( + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + align=True, + ) - test_dag = get_test_dag_for_backfill(schedule_interval="@hourly") + test_dag = get_test_dag_for_backfill() assert [ DEFAULT_DATE - datetime.timedelta(hours=3), DEFAULT_DATE - datetime.timedelta(hours=2), @@ -1383,6 +1387,7 @@ def get_test_dag_for_backfill(schedule_interval=None): ] == test_dag.get_run_dates( start_date=DEFAULT_DATE - datetime.timedelta(hours=3), end_date=DEFAULT_DATE, + align=True, ) def test_backfill_run_backwards(self): diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 87be3e8d16638..35ca8eb7c53a0 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1780,7 +1780,7 @@ def evaluate_dagrun( dag = self.dagbag.get_dag(dag_id) dr = dag.create_dagrun( run_type=DagRunType.SCHEDULED, - execution_date=dag.next_dagrun_after_date(None), + execution_date=dag.next_dagrun_info(None)[0], state=State.RUNNING, ) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 902ae4819339e..33154c4a6586b 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -48,6 +48,7 @@ from airflow.operators.dummy import DummyOperator from airflow.operators.subdag import SubDagOperator from airflow.security import permissions +from airflow.timetables.simple import NullTimetable, OnceTimetable from airflow.utils import timezone from airflow.utils.file import list_py_file_paths from airflow.utils.session import create_session, provide_session @@ -58,6 +59,7 @@ from tests.models import DEFAULT_DATE from tests.test_utils.asserts import assert_queries_count from tests.test_utils.db import clear_db_dags, clear_db_runs +from tests.test_utils.timetables import cron_timetable, delta_timetable TEST_DATE = datetime_tz(2015, 1, 2, 0, 0) @@ -1135,7 +1137,7 @@ def test_schedule_dag_once(self): dag_id = "test_schedule_dag_once" dag = DAG(dag_id=dag_id) dag.schedule_interval = '@once' - assert dag.normalized_schedule_interval is None + assert isinstance(dag.timetable, OnceTimetable) dag.add_task(BaseOperator(task_id="faketastic", owner='Also fake', start_date=TEST_DATE)) # Sync once to create the DagModel @@ -1247,20 +1249,20 @@ def test_get_paused_dag_ids(self): @parameterized.expand( [ - (None, None), - ("@daily", "0 0 * * *"), - ("@weekly", "0 0 * * 0"), - ("@monthly", "0 0 1 * *"), - ("@quarterly", "0 0 1 */3 *"), - ("@yearly", "0 0 1 1 *"), - ("@once", None), - (datetime.timedelta(days=1), datetime.timedelta(days=1)), + (None, NullTimetable()), + ("@daily", cron_timetable("0 0 * * *")), + ("@weekly", cron_timetable("0 0 * * 0")), + ("@monthly", cron_timetable("0 0 1 * *")), + ("@quarterly", cron_timetable("0 0 1 */3 *")), + ("@yearly", cron_timetable("0 0 1 1 *")), + ("@once", OnceTimetable()), + (datetime.timedelta(days=1), delta_timetable(datetime.timedelta(days=1))), ] ) - def test_normalized_schedule_interval(self, schedule_interval, expected_n_schedule_interval): + def test_timetable(self, schedule_interval, expected_timetable): dag = DAG("test_schedule_interval", schedule_interval=schedule_interval) - assert dag.normalized_schedule_interval == expected_n_schedule_interval + assert dag.timetable == expected_timetable assert dag.schedule_interval == schedule_interval def test_create_dagrun_run_id_is_generated(self): @@ -1474,19 +1476,19 @@ def test_clear_dag(self, ti_state_begin, ti_state_end: Optional[str]): assert task_instance.state == ti_state_end self._clean_up(dag_id) - def test_next_dagrun_after_date_once(self): + def test_next_dagrun_info_once(self): dag = DAG( 'test_scheduler_dagrun_once', start_date=timezone.datetime(2015, 1, 1), schedule_interval="@once" ) - next_date = dag.next_dagrun_after_date(None) + next_date, _ = dag.next_dagrun_info(None) assert next_date == timezone.datetime(2015, 1, 1) - next_date = dag.next_dagrun_after_date(next_date) + next_date, _ = dag.next_dagrun_info(next_date) assert next_date is None - def test_next_dagrun_after_date_start_end_dates(self): + def test_next_dagrun_info_start_end_dates(self): """ Tests that an attempt to schedule a task after the Dag's end_date does not succeed. @@ -1503,7 +1505,7 @@ def test_next_dagrun_after_date_start_end_dates(self): dates = [] date = None for _ in range(runs): - date = dag.next_dagrun_after_date(date) + date, _ = dag.next_dagrun_info(date) dates.append(date) for date in dates: @@ -1511,9 +1513,9 @@ def test_next_dagrun_after_date_start_end_dates(self): assert dates[-1] == end_date - assert dag.next_dagrun_after_date(date) is None + assert dag.next_dagrun_info(date)[0] is None - def test_next_dagrun_after_date_catcup(self): + def test_next_dagrun_info_catchup(self): """ Test to check that a DAG with catchup = False only schedules beginning now, not back to the start date """ @@ -1551,7 +1553,7 @@ def make_dag(dag_id, schedule_interval, start_date, catchup): start_date=six_hours_ago_to_the_hour, catchup=False, ) - next_date = dag1.next_dagrun_after_date(None) + next_date, _ = dag1.next_dagrun_info(None) # The DR should be scheduled in the last half an hour, not 6 hours ago assert next_date > half_an_hour_ago assert next_date < timezone.utcnow() @@ -1563,7 +1565,7 @@ def make_dag(dag_id, schedule_interval, start_date, catchup): catchup=False, ) - next_date = dag2.next_dagrun_after_date(None) + next_date, _ = dag2.next_dagrun_info(None) # The DR should be scheduled in the last 2 hours, not 6 hours ago assert next_date > two_hours_ago # The DR should be scheduled BEFORE now @@ -1576,12 +1578,12 @@ def make_dag(dag_id, schedule_interval, start_date, catchup): catchup=False, ) - next_date = dag3.next_dagrun_after_date(None) + next_date, _ = dag3.next_dagrun_info(None) # The DR should be scheduled in the last 2 hours, not 6 hours ago assert next_date == six_hours_ago_to_the_hour @freeze_time(timezone.datetime(2020, 1, 5)) - def test_next_dagrun_after_date_timedelta_schedule_and_catchup_false(self): + def test_next_dagrun_info_timedelta_schedule_and_catchup_false(self): """ Test that the dag file processor does not create multiple dagruns if a dag is scheduled with 'timedelta' and catchup=False @@ -1593,15 +1595,15 @@ def test_next_dagrun_after_date_timedelta_schedule_and_catchup_false(self): catchup=False, ) - next_date = dag.next_dagrun_after_date(None) + next_date, _ = dag.next_dagrun_info(None) assert next_date == timezone.datetime(2020, 1, 4) # The date to create is in the future, this is handled by "DagModel.dags_needing_dagruns" - next_date = dag.next_dagrun_after_date(next_date) + next_date, _ = dag.next_dagrun_info(next_date) assert next_date == timezone.datetime(2020, 1, 5) @freeze_time(timezone.datetime(2020, 5, 4)) - def test_next_dagrun_after_date_timedelta_schedule_and_catchup_true(self): + def test_next_dagrun_info_timedelta_schedule_and_catchup_true(self): """ Test that the dag file processor creates multiple dagruns if a dag is scheduled with 'timedelta' and catchup=True @@ -1613,17 +1615,17 @@ def test_next_dagrun_after_date_timedelta_schedule_and_catchup_true(self): catchup=True, ) - next_date = dag.next_dagrun_after_date(None) + next_date, _ = dag.next_dagrun_info(None) assert next_date == timezone.datetime(2020, 5, 1) - next_date = dag.next_dagrun_after_date(next_date) + next_date, _ = dag.next_dagrun_info(next_date) assert next_date == timezone.datetime(2020, 5, 2) - next_date = dag.next_dagrun_after_date(next_date) + next_date, _ = dag.next_dagrun_info(next_date) assert next_date == timezone.datetime(2020, 5, 3) # The date to create is in the future, this is handled by "DagModel.dags_needing_dagruns" - next_date = dag.next_dagrun_after_date(next_date) + next_date, _ = dag.next_dagrun_info(next_date) assert next_date == timezone.datetime(2020, 5, 4) def test_next_dagrun_after_auto_align(self): @@ -1640,7 +1642,7 @@ def test_next_dagrun_after_auto_align(self): ) DummyOperator(task_id='dummy', dag=dag, owner='airflow') - next_date = dag.next_dagrun_after_date(None) + next_date, _ = dag.next_dagrun_info(None) assert next_date == timezone.datetime(2016, 1, 2, 5, 4) dag = DAG( @@ -1650,7 +1652,7 @@ def test_next_dagrun_after_auto_align(self): ) DummyOperator(task_id='dummy', dag=dag, owner='airflow') - next_date = dag.next_dagrun_after_date(None) + next_date, _ = dag.next_dagrun_info(None) assert next_date == timezone.datetime(2016, 1, 1, 10, 10) def test_next_dagrun_after_not_for_subdags(self): @@ -1690,10 +1692,10 @@ def subdag(parent_dag_name, child_dag_name, args): subdag.parent_dag = dag subdag.is_subdag = True - next_date = dag.next_dagrun_after_date(None) + next_date, _ = dag.next_dagrun_info(None) assert next_date == timezone.datetime(2019, 1, 1, 0, 0) - next_subdag_date = subdag.next_dagrun_after_date(None) + next_subdag_date, _ = subdag.next_dagrun_info(None) assert next_subdag_date is None, "SubDags should never have DagRuns created by the scheduler" def test_replace_outdated_access_control_actions(self): diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index 78991999ffde1..57e3e3a56e0bb 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -651,8 +651,8 @@ def test_depends_on_past(self, prev_ti_state, is_ti_success): dag = self.dagbag.get_dag(dag_id) task = dag.tasks[0] - self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0)) - self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2, 0, 0, 0)) + self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0), is_backfill=True) + self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2, 0, 0, 0), is_backfill=True) prev_ti = TI(task, timezone.datetime(2016, 1, 1, 0, 0, 0)) ti = TI(task, timezone.datetime(2016, 1, 2, 0, 0, 0)) @@ -678,8 +678,8 @@ def test_wait_for_downstream(self, prev_ti_state, is_ti_success): # For ti.set_state() to work, the DagRun has to exist, # Otherwise ti.previous_ti returns an unpersisted TI - self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0)) - self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2, 0, 0, 0)) + self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0), is_backfill=True) + self.create_dag_run(dag, execution_date=timezone.datetime(2016, 1, 2, 0, 0, 0), is_backfill=True) prev_ti_downstream = TI(task=downstream, execution_date=timezone.datetime(2016, 1, 1, 0, 0, 0)) ti = TI(task=upstream, execution_date=timezone.datetime(2016, 1, 2, 0, 0, 0)) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 2ac1381a7df11..25a42581465cd 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -868,6 +868,12 @@ def test_depends_on_past(self): ) dag.clear() + dag.create_dagrun( + execution_date=DEFAULT_DATE, + state=State.FAILED, + run_type=DagRunType.SCHEDULED, + ) + run_date = task.start_date + datetime.timedelta(days=5) dag.create_dagrun( diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py index e83087101611f..f9e0ebfbb196a 100644 --- a/tests/sensors/test_base.py +++ b/tests/sensors/test_base.py @@ -421,7 +421,8 @@ def test_reschedule_with_test_mode(self): # poke returns False and AirflowRescheduleException is raised date1 = timezone.utcnow() with freeze_time(date1): - for date in self.dag.date_range(DEFAULT_DATE, end_date=DEFAULT_DATE): + dates = self.dag.get_run_dates(DEFAULT_DATE, end_date=DEFAULT_DATE, align=True) + for date in dates: TaskInstance(sensor, date).run(ignore_ti_state=True, test_mode=True) tis = dr.get_task_instances() assert len(tis) == 2 diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 7f78ca61078e1..04ef07bdacdab 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -41,7 +41,9 @@ from airflow.security import permissions from airflow.serialization.json_schema import load_dag_schema_dict from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG +from airflow.timetables.simple import NullTimetable, OnceTimetable from tests.test_utils.mock_operators import CustomOperator, CustomOpLink, GoogleLink +from tests.test_utils.timetables import cron_timetable, delta_timetable executor_config_pod = k8s.V1Pod( metadata=k8s.V1ObjectMeta(name="my-name"), @@ -503,14 +505,21 @@ def test_deserialization_end_date(self, dag_end_date, task_end_date, expected_ta @parameterized.expand( [ - (None, None, None), - ("@weekly", "@weekly", "0 0 * * 0"), - ("@once", "@once", None), - ({"__type": "timedelta", "__var": 86400.0}, timedelta(days=1), timedelta(days=1)), + (None, None, NullTimetable()), + ("@weekly", "@weekly", cron_timetable("0 0 * * 0")), + ("@once", "@once", OnceTimetable()), + ( + {"__type": "timedelta", "__var": 86400.0}, + timedelta(days=1), + delta_timetable(timedelta(days=1)), + ), ] ) def test_deserialization_schedule_interval( - self, serialized_schedule_interval, expected_schedule_interval, expected_n_schedule_interval + self, + serialized_schedule_interval, + expected_schedule_interval, + expected_timetable, ): serialized = { "__version": 1, @@ -529,7 +538,7 @@ def test_deserialization_schedule_interval( dag = SerializedDAG.from_dict(serialized) assert dag.schedule_interval == expected_schedule_interval - assert dag.normalized_schedule_interval == expected_n_schedule_interval + assert dag.timetable == expected_timetable @parameterized.expand( [ diff --git a/tests/test_utils/perf/scheduler_dag_execution_timing.py b/tests/test_utils/perf/scheduler_dag_execution_timing.py index e593690076233..0998a979497fa 100755 --- a/tests/test_utils/perf/scheduler_dag_execution_timing.py +++ b/tests/test_utils/perf/scheduler_dag_execution_timing.py @@ -163,18 +163,18 @@ def create_dag_runs(dag, num_runs, session): id_prefix = DagRun.ID_PREFIX - next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks)) - + last_dagrun_at = None for _ in range(num_runs): + next_info = dag.next_dagrun_info(last_dagrun_at) + last_dagrun_at = next_info.data_interval.start dag.create_dagrun( - run_id=id_prefix + next_run_date.isoformat(), - execution_date=next_run_date, + run_id=f"{id_prefix}{last_dagrun_at.isoformat()}", + execution_date=last_dagrun_at, start_date=timezone.utcnow(), state=State.RUNNING, external_trigger=False, session=session, ) - next_run_date = dag.following_schedule(next_run_date) @click.command() diff --git a/tests/test_utils/timetables.py b/tests/test_utils/timetables.py new file mode 100644 index 0000000000000..c6db4c7394038 --- /dev/null +++ b/tests/test_utils/timetables.py @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow import settings +from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable + + +def cron_timetable(expr: str) -> CronDataIntervalTimetable: + return CronDataIntervalTimetable(expr, settings.TIMEZONE) + + +def delta_timetable(delta) -> DeltaDataIntervalTimetable: + return DeltaDataIntervalTimetable(delta) diff --git a/tests/ti_deps/deps/test_prev_dagrun_dep.py b/tests/ti_deps/deps/test_prev_dagrun_dep.py index d80d47533d715..5970b5c1048d9 100644 --- a/tests/ti_deps/deps/test_prev_dagrun_dep.py +++ b/tests/ti_deps/deps/test_prev_dagrun_dep.py @@ -17,105 +17,117 @@ # under the License. -import unittest -from datetime import datetime from unittest.mock import Mock +import pytest + from airflow.models import DAG from airflow.models.baseoperator import BaseOperator from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep from airflow.utils.state import State - - -class TestPrevDagrunDep(unittest.TestCase): - def _get_task(self, **kwargs): - return BaseOperator(task_id='test_task', dag=DAG('test_dag'), **kwargs) - - def test_not_depends_on_past(self): - """ - If depends on past isn't set in the task then the previous dagrun should be - ignored, even though there is no previous_ti which would normally fail the dep - """ - task = self._get_task( - depends_on_past=False, start_date=datetime(2016, 1, 1), wait_for_downstream=False - ) - prev_ti = Mock( - task=task, - state=State.SUCCESS, - are_dependents_done=Mock(return_value=True), +from airflow.utils.timezone import datetime + + +@pytest.mark.parametrize( + "depends_on_past, wait_for_downstream, prev_ti, context_ignore_depends_on_past, dep_met", + [ + # If the task does not set depends_on_past, the previous dagrun should + # be ignored, even though previous_ti would otherwise fail the dep. + pytest.param( + False, + False, # wait_for_downstream=True overrides depends_on_past=False. + Mock( + state=State.NONE, + **{"are_dependents_done.return_value": False}, + ), + False, + True, + id="not_depends_on_past", + ), + # If the context overrides depends_on_past, the dep should be met even + # though there is no previous_ti which would normally fail the dep. + pytest.param( + True, + False, + Mock( + state=State.SUCCESS, + **{"are_dependents_done.return_value": True}, + ), + True, + True, + id="context_ignore_depends_on_past", + ), + # The first task run should pass since it has no previous dagrun. + pytest.param(True, False, None, False, True, id="first_task_run"), + # Previous TI did not complete execution. This dep should fail. + pytest.param( + True, + False, + Mock( + state=State.NONE, + **{"are_dependents_done.return_value": True}, + ), + False, + False, + id="prev_ti_bad_state", + ), + # Previous TI specified to wait for the downstream tasks of the previous + # dagrun. It should fail this dep if the previous TI's downstream TIs + # are not done. + pytest.param( + True, + True, + Mock( + state=State.SUCCESS, + **{"are_dependents_done.return_value": False}, + ), + False, + False, + id="failed_wait_for_downstream", + ), + # All the conditions for the dep are met. + pytest.param( + True, + True, + Mock( + state=State.SUCCESS, + **{"are_dependents_done.return_value": True}, + ), + False, + True, + id="all_met", + ), + ], +) +def test_dagrun_dep( + depends_on_past, + wait_for_downstream, + prev_ti, + context_ignore_depends_on_past, + dep_met, +): + task = BaseOperator( + task_id="test_task", + dag=DAG("test_dag"), + depends_on_past=depends_on_past, + start_date=datetime(2016, 1, 1), + wait_for_downstream=wait_for_downstream, + ) + if prev_ti: + prev_dagrun = Mock( execution_date=datetime(2016, 1, 2), + **{"get_task_instance.return_value": prev_ti}, ) - ti = Mock(task=task, previous_ti=prev_ti, execution_date=datetime(2016, 1, 3)) - dep_context = DepContext(ignore_depends_on_past=False) - - assert PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) - - def test_context_ignore_depends_on_past(self): - """ - If the context overrides depends_on_past then the dep should be met, - even though there is no previous_ti which would normally fail the dep - """ - task = self._get_task( - depends_on_past=True, start_date=datetime(2016, 1, 1), wait_for_downstream=False - ) - prev_ti = Mock( - task=task, - state=State.SUCCESS, - are_dependents_done=Mock(return_value=True), - execution_date=datetime(2016, 1, 2), - ) - ti = Mock(task=task, previous_ti=prev_ti, execution_date=datetime(2016, 1, 3)) - dep_context = DepContext(ignore_depends_on_past=True) - - assert PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) - - def test_first_task_run(self): - """ - The first task run for a TI should pass since it has no previous dagrun. - """ - task = self._get_task( - depends_on_past=True, start_date=datetime(2016, 1, 1), wait_for_downstream=False - ) - prev_ti = None - ti = Mock(task=task, previous_ti=prev_ti, execution_date=datetime(2016, 1, 1)) - dep_context = DepContext(ignore_depends_on_past=False) - - assert PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) - - def test_prev_ti_bad_state(self): - """ - If the previous TI did not complete execution this dep should fail. - """ - task = self._get_task( - depends_on_past=True, start_date=datetime(2016, 1, 1), wait_for_downstream=False - ) - prev_ti = Mock(state=State.NONE, are_dependents_done=Mock(return_value=True)) - ti = Mock(task=task, previous_ti=prev_ti, execution_date=datetime(2016, 1, 2)) - dep_context = DepContext(ignore_depends_on_past=False) - - assert not PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) - - def test_failed_wait_for_downstream(self): - """ - If the previous TI specified to wait for the downstream tasks of the - previous dagrun then it should fail this dep if the downstream TIs of - the previous TI are not done. - """ - task = self._get_task(depends_on_past=True, start_date=datetime(2016, 1, 1), wait_for_downstream=True) - prev_ti = Mock(state=State.SUCCESS, are_dependents_done=Mock(return_value=False)) - ti = Mock(task=task, previous_ti=prev_ti, execution_date=datetime(2016, 1, 2)) - dep_context = DepContext(ignore_depends_on_past=False) - - assert not PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) - - def test_all_met(self): - """ - Test to make sure all the conditions for the dep are met - """ - task = self._get_task(depends_on_past=True, start_date=datetime(2016, 1, 1), wait_for_downstream=True) - prev_ti = Mock(state=State.SUCCESS, are_dependents_done=Mock(return_value=True)) - ti = Mock(task=task, execution_date=datetime(2016, 1, 2), **{'get_previous_ti.return_value': prev_ti}) - dep_context = DepContext(ignore_depends_on_past=False) - - assert PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) + else: + prev_dagrun = None + dagrun = Mock( + **{ + "get_previous_scheduled_dagrun.return_value": prev_dagrun, + "get_previous_dagrun.return_value": prev_dagrun, + }, + ) + ti = Mock(task=task, **{"get_dagrun.return_value": dagrun}) + dep_context = DepContext(ignore_depends_on_past=context_ignore_depends_on_past) + + assert PrevDagrunDep().is_met(ti=ti, dep_context=dep_context) == dep_met diff --git a/tests/timetables/test_time_table_iter_ranges.py b/tests/timetables/test_time_table_iter_ranges.py new file mode 100644 index 0000000000000..c9ee747888ac1 --- /dev/null +++ b/tests/timetables/test_time_table_iter_ranges.py @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Tests for Timetable.iter_between().""" + +from datetime import datetime, timedelta + +import pytest + +from airflow.settings import TIMEZONE +from airflow.timetables.interval import DeltaDataIntervalTimetable + + +@pytest.fixture() +def timetable_1s(): + return DeltaDataIntervalTimetable(timedelta(seconds=1)) + + +def test_end_date_before_start_date(timetable_1s): + start = datetime(2016, 2, 1, tzinfo=TIMEZONE) + end = datetime(2016, 1, 1, tzinfo=TIMEZONE) + message = r"start \([- :+\d]{25}\) > end \([- :+\d]{25}\)" + with pytest.raises(ValueError, match=message): + list(timetable_1s.iter_between(start, end, align=True))