Skip to content

Commit

Permalink
Implement timetable class for AIP-39
Browse files Browse the repository at this point in the history
This creates a new subpackage airflow.timetables, and implements
timetable constructs that provides DAG scheduling logic. The timetable
classes are used to refactor schedule inference logic out of the DAG
class, and existing functions related to scheduling are refactored to
use timetables (and deprecated).

Usages of the deprecated DAG functions in Airflow's code base are
modified to either use the timetable, or infer the information by other
means. For example, usages of previous_schedule() (what was a DAG last
scheduled to run before this run?) are refactored to query the database
when the previous scheduled run actually happened, instead of using the
schedule interval (cron or timedelta) in infer the information. This is
because an AIP-39 timetable does not necessarily run on a periodic-ish
schedule, and we cannot reliably infer when the previous run happened.
  • Loading branch information
uranusjr committed Jun 28, 2021
1 parent 57dcac2 commit f5b55ee
Show file tree
Hide file tree
Showing 31 changed files with 1,056 additions and 395 deletions.
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/mark_tasks.py
Expand Up @@ -257,7 +257,7 @@ def get_execution_dates(dag, execution_date, future, past):
dag_runs = dag.get_dagruns_between(start_date=start_date, end_date=end_date)
dates = sorted({d.execution_date for d in dag_runs})
else:
dates = dag.date_range(start_date=start_date, end_date=end_date)
dates = dag.get_run_dates(start_date, end_date, align=False)
return dates


Expand Down
4 changes: 2 additions & 2 deletions airflow/cli/commands/dag_command.py
Expand Up @@ -265,11 +265,11 @@ def dag_next_execution(args):
)
print(None)
else:
print(next_execution_dttm)
print(next_execution_dttm.isoformat())

for _ in range(1, args.num_executions):
next_execution_dttm = dag.following_schedule(next_execution_dttm)
print(next_execution_dttm)
print(next_execution_dttm.isoformat())
else:
print("[WARN] Only applicable when there is execution record found for the DAG.", file=sys.stderr)
print(None)
Expand Down
27 changes: 27 additions & 0 deletions airflow/compat/functools.pyi
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# This stub exists to work around false linter errors due to python/mypy#10408.
# TODO: Remove this file after the upstream fix is available in our toolchain.

from typing import Callable, TypeVar

T = TypeVar("T")

def cached_property(f: Callable[..., T]) -> T: ...
def cache(f: T) -> T: ...
4 changes: 4 additions & 0 deletions airflow/exceptions.py
Expand Up @@ -115,6 +115,10 @@ class AirflowClusterPolicyViolation(AirflowException):
"""Raise when there is a violation of a Cluster Policy in Dag definition"""


class AirflowTimetableInvalid(AirflowException):
"""Raise when a DAG has an invalid timetable."""


class DagNotFound(AirflowNotFoundException):
"""Raise when a DAG is not available in the system"""

Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/backfill_job.py
Expand Up @@ -756,7 +756,7 @@ def _execute(self, session=None):
start_date = self.bf_start_date

# Get intervals between the start/end dates, which will turn into dag runs
run_dates = self.dag.get_run_dates(start_date=start_date, end_date=self.bf_end_date)
run_dates = self.dag.get_run_dates(start_date=start_date, end_date=self.bf_end_date, align=True)
if self.run_backwards:
tasks_that_depend_on_past = [t.task_id for t in self.dag.task_dict.values() if t.depends_on_past]
if tasks_that_depend_on_past:
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/baseoperator.py
Expand Up @@ -1244,7 +1244,7 @@ def run(
start_date = start_date or self.start_date
end_date = end_date or self.end_date or timezone.utcnow()

for execution_date in self.dag.date_range(start_date, end_date=end_date):
for execution_date in self.dag.get_run_dates(start_date, end_date, align=False):
TaskInstance(self, execution_date).run(
mark_success=mark_success,
ignore_depends_on_past=(execution_date == start_date and ignore_first_depends_on_past),
Expand Down

0 comments on commit f5b55ee

Please sign in to comment.