Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid calling DAG.following_schedule() for TaskInstance.get_template_context() #20486

Merged
merged 2 commits into from Dec 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 13 additions & 12 deletions airflow/models/taskinstance.py
Expand Up @@ -1845,33 +1845,30 @@ def get_prev_start_date_success() -> Optional[pendulum.DateTime]:

@cache
def get_yesterday_ds() -> str:
return (self.execution_date - timedelta(1)).strftime('%Y-%m-%d')
return (logical_date - timedelta(1)).strftime('%Y-%m-%d')

def get_yesterday_ds_nodash() -> str:
return get_yesterday_ds().replace('-', '')

@cache
def get_tomorrow_ds() -> str:
return (self.execution_date + timedelta(1)).strftime('%Y-%m-%d')
return (logical_date + timedelta(1)).strftime('%Y-%m-%d')

def get_tomorrow_ds_nodash() -> str:
return get_tomorrow_ds().replace('-', '')

@cache
def get_next_execution_date() -> Optional[pendulum.DateTime]:
# For manually triggered dagruns that aren't run on a schedule,
# next/previous execution dates don't make sense, and should be set
# the "next" execution date doesn't make sense, and should be set
# to execution date for consistency with how execution_date is set
# for manually triggered tasks, i.e. triggered_date == execution_date.
if dag_run.external_trigger:
next_execution_date = dag_run.execution_date
else:
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
next_execution_date = dag.following_schedule(self.execution_date)
if next_execution_date is None:
return logical_date
next_info = dag.next_dagrun_info(data_interval, restricted=False)
if next_info is None:
return None
return timezone.coerce_datetime(next_execution_date)
return timezone.coerce_datetime(next_info.logical_date)

def get_next_ds() -> Optional[str]:
execution_date = get_next_execution_date()
Expand All @@ -1887,11 +1884,15 @@ def get_next_ds_nodash() -> Optional[str]:

@cache
def get_prev_execution_date():
# For manually triggered dagruns that aren't run on a schedule,
# the "previous" execution date doesn't make sense, and should be set
# to execution date for consistency with how execution_date is set
# for manually triggered tasks, i.e. triggered_date == execution_date.
if dag_run.external_trigger:
return timezone.coerce_datetime(self.execution_date)
return logical_date
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
return dag.previous_schedule(self.execution_date)
return dag.previous_schedule(logical_date)

@cache
def get_prev_ds() -> Optional[str]:
Expand Down
12 changes: 11 additions & 1 deletion tests/conftest.py
Expand Up @@ -699,7 +699,15 @@ def create_task_instance(dag_maker, create_dummy_dag):
Uses ``create_dummy_dag`` to create the dag structure.
"""

def maker(execution_date=None, dagrun_state=None, state=None, run_id=None, run_type=None, **kwargs):
def maker(
execution_date=None,
dagrun_state=None,
state=None,
run_id=None,
run_type=None,
data_interval=None,
**kwargs,
):
if execution_date is None:
from airflow.utils import timezone

Expand All @@ -711,6 +719,8 @@ def maker(execution_date=None, dagrun_state=None, state=None, run_id=None, run_t
dagrun_kwargs["run_id"] = run_id
if run_type is not None:
dagrun_kwargs["run_type"] = run_type
if data_interval is not None:
dagrun_kwargs["data_interval"] = data_interval
dagrun = dag_maker.create_dagrun(**dagrun_kwargs)
(ti,) = dagrun.task_instances
ti.state = state
Expand Down
25 changes: 25 additions & 0 deletions tests/models/test_taskinstance.py
Expand Up @@ -30,6 +30,7 @@
from freezegun import freeze_time

from airflow import models, settings
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
from airflow.exceptions import (
AirflowException,
AirflowFailException,
Expand Down Expand Up @@ -1634,6 +1635,30 @@ def test_template_with_json_variable_missing(self, create_task_instance):
with pytest.raises(KeyError):
ti.task.render_template('{{ var.json.get("missing_variable") }}', context)

def test_tempalte_with_custom_timetable_deprecated_context(self, create_task_instance):
ti = create_task_instance(
start_date=DEFAULT_DATE,
timetable=AfterWorkdayTimetable(),
run_type=DagRunType.SCHEDULED,
execution_date=timezone.datetime(2021, 9, 6),
data_interval=(timezone.datetime(2021, 9, 6), timezone.datetime(2021, 9, 7)),
)
context = ti.get_template_context()
with pytest.deprecated_call():
assert context["execution_date"] == pendulum.DateTime(2021, 9, 6, tzinfo=timezone.TIMEZONE)
with pytest.deprecated_call():
assert context["next_ds"] == "2021-09-07"
with pytest.deprecated_call():
assert context["next_ds_nodash"] == "20210907"
with pytest.deprecated_call():
assert context["next_execution_date"] == pendulum.DateTime(2021, 9, 7, tzinfo=timezone.TIMEZONE)
with pytest.deprecated_call():
assert context["prev_ds"] is None, "Does not make sense for custom timetable"
with pytest.deprecated_call():
assert context["prev_ds_nodash"] is None, "Does not make sense for custom timetable"
with pytest.deprecated_call():
assert context["prev_execution_date"] is None, "Does not make sense for custom timetable"

def test_execute_callback(self, create_task_instance):
called = False

Expand Down