Skip to content

Commit

Permalink
Avoid calling DAG.following_schedule() for TaskInstance.get_template_…
Browse files Browse the repository at this point in the history
…context() (#20486)

This can use a more modern mechanism since get_template_context() has
enough context (namely, the current data interval).

(cherry picked from commit 9e315ff)
  • Loading branch information
uranusjr authored and jedcunningham committed Jan 20, 2022
1 parent b19dfdb commit b43882c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 13 deletions.
25 changes: 13 additions & 12 deletions airflow/models/taskinstance.py
Expand Up @@ -1839,33 +1839,30 @@ def get_prev_start_date_success() -> Optional[pendulum.DateTime]:

@cache
def get_yesterday_ds() -> str:
return (self.execution_date - timedelta(1)).strftime('%Y-%m-%d')
return (logical_date - timedelta(1)).strftime('%Y-%m-%d')

def get_yesterday_ds_nodash() -> str:
return get_yesterday_ds().replace('-', '')

@cache
def get_tomorrow_ds() -> str:
return (self.execution_date + timedelta(1)).strftime('%Y-%m-%d')
return (logical_date + timedelta(1)).strftime('%Y-%m-%d')

def get_tomorrow_ds_nodash() -> str:
return get_tomorrow_ds().replace('-', '')

@cache
def get_next_execution_date() -> Optional[pendulum.DateTime]:
# For manually triggered dagruns that aren't run on a schedule,
# next/previous execution dates don't make sense, and should be set
# the "next" execution date doesn't make sense, and should be set
# to execution date for consistency with how execution_date is set
# for manually triggered tasks, i.e. triggered_date == execution_date.
if dag_run.external_trigger:
next_execution_date = dag_run.execution_date
else:
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
next_execution_date = dag.following_schedule(self.execution_date)
if next_execution_date is None:
return logical_date
next_info = dag.next_dagrun_info(data_interval, restricted=False)
if next_info is None:
return None
return timezone.coerce_datetime(next_execution_date)
return timezone.coerce_datetime(next_info.logical_date)

def get_next_ds() -> Optional[str]:
execution_date = get_next_execution_date()
Expand All @@ -1881,11 +1878,15 @@ def get_next_ds_nodash() -> Optional[str]:

@cache
def get_prev_execution_date():
# For manually triggered dagruns that aren't run on a schedule,
# the "previous" execution date doesn't make sense, and should be set
# to execution date for consistency with how execution_date is set
# for manually triggered tasks, i.e. triggered_date == execution_date.
if dag_run.external_trigger:
return timezone.coerce_datetime(self.execution_date)
return logical_date
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
return dag.previous_schedule(self.execution_date)
return dag.previous_schedule(logical_date)

@cache
def get_prev_ds() -> Optional[str]:
Expand Down
12 changes: 11 additions & 1 deletion tests/conftest.py
Expand Up @@ -709,7 +709,15 @@ def create_task_instance(dag_maker, create_dummy_dag):
Uses ``create_dummy_dag`` to create the dag structure.
"""

def maker(execution_date=None, dagrun_state=None, state=None, run_id=None, run_type=None, **kwargs):
def maker(
execution_date=None,
dagrun_state=None,
state=None,
run_id=None,
run_type=None,
data_interval=None,
**kwargs,
):
if execution_date is None:
from airflow.utils import timezone

Expand All @@ -721,6 +729,8 @@ def maker(execution_date=None, dagrun_state=None, state=None, run_id=None, run_t
dagrun_kwargs["run_id"] = run_id
if run_type is not None:
dagrun_kwargs["run_type"] = run_type
if data_interval is not None:
dagrun_kwargs["data_interval"] = data_interval
dagrun = dag_maker.create_dagrun(**dagrun_kwargs)
(ti,) = dagrun.task_instances
ti.state = state
Expand Down
25 changes: 25 additions & 0 deletions tests/models/test_taskinstance.py
Expand Up @@ -30,6 +30,7 @@
from freezegun import freeze_time

from airflow import models, settings
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
from airflow.exceptions import (
AirflowException,
AirflowFailException,
Expand Down Expand Up @@ -1630,6 +1631,30 @@ def test_template_with_json_variable_missing(self, create_task_instance):
with pytest.raises(KeyError):
ti.task.render_template('{{ var.json.get("missing_variable") }}', context)

def test_tempalte_with_custom_timetable_deprecated_context(self, create_task_instance):
ti = create_task_instance(
start_date=DEFAULT_DATE,
timetable=AfterWorkdayTimetable(),
run_type=DagRunType.SCHEDULED,
execution_date=timezone.datetime(2021, 9, 6),
data_interval=(timezone.datetime(2021, 9, 6), timezone.datetime(2021, 9, 7)),
)
context = ti.get_template_context()
with pytest.deprecated_call():
assert context["execution_date"] == pendulum.DateTime(2021, 9, 6, tzinfo=timezone.TIMEZONE)
with pytest.deprecated_call():
assert context["next_ds"] == "2021-09-07"
with pytest.deprecated_call():
assert context["next_ds_nodash"] == "20210907"
with pytest.deprecated_call():
assert context["next_execution_date"] == pendulum.DateTime(2021, 9, 7, tzinfo=timezone.TIMEZONE)
with pytest.deprecated_call():
assert context["prev_ds"] is None, "Does not make sense for custom timetable"
with pytest.deprecated_call():
assert context["prev_ds_nodash"] is None, "Does not make sense for custom timetable"
with pytest.deprecated_call():
assert context["prev_execution_date"] is None, "Does not make sense for custom timetable"

def test_execute_callback(self, create_task_instance):
called = False

Expand Down

0 comments on commit b43882c

Please sign in to comment.