From b43882ca1a8f26a582f13a5e4443e9d95c8842f5 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 29 Dec 2021 03:12:21 +0800 Subject: [PATCH] Avoid calling DAG.following_schedule() for TaskInstance.get_template_context() (#20486) This can use a more modern mechanism since get_template_context() has enough context (namely, the current data interval). (cherry picked from commit 9e315ff7caec7fd3d4c0dfe8b89ee2a1c7b5fe3a) --- airflow/models/taskinstance.py | 25 +++++++++++++------------ tests/conftest.py | 12 +++++++++++- tests/models/test_taskinstance.py | 25 +++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 13 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 716167c20f9b8..281d067861f44 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1839,14 +1839,14 @@ def get_prev_start_date_success() -> Optional[pendulum.DateTime]: @cache def get_yesterday_ds() -> str: - return (self.execution_date - timedelta(1)).strftime('%Y-%m-%d') + return (logical_date - timedelta(1)).strftime('%Y-%m-%d') def get_yesterday_ds_nodash() -> str: return get_yesterday_ds().replace('-', '') @cache def get_tomorrow_ds() -> str: - return (self.execution_date + timedelta(1)).strftime('%Y-%m-%d') + return (logical_date + timedelta(1)).strftime('%Y-%m-%d') def get_tomorrow_ds_nodash() -> str: return get_tomorrow_ds().replace('-', '') @@ -1854,18 +1854,15 @@ def get_tomorrow_ds_nodash() -> str: @cache def get_next_execution_date() -> Optional[pendulum.DateTime]: # For manually triggered dagruns that aren't run on a schedule, - # next/previous execution dates don't make sense, and should be set + # the "next" execution date doesn't make sense, and should be set # to execution date for consistency with how execution_date is set # for manually triggered tasks, i.e. triggered_date == execution_date. if dag_run.external_trigger: - next_execution_date = dag_run.execution_date - else: - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - next_execution_date = dag.following_schedule(self.execution_date) - if next_execution_date is None: + return logical_date + next_info = dag.next_dagrun_info(data_interval, restricted=False) + if next_info is None: return None - return timezone.coerce_datetime(next_execution_date) + return timezone.coerce_datetime(next_info.logical_date) def get_next_ds() -> Optional[str]: execution_date = get_next_execution_date() @@ -1881,11 +1878,15 @@ def get_next_ds_nodash() -> Optional[str]: @cache def get_prev_execution_date(): + # For manually triggered dagruns that aren't run on a schedule, + # the "previous" execution date doesn't make sense, and should be set + # to execution date for consistency with how execution_date is set + # for manually triggered tasks, i.e. triggered_date == execution_date. if dag_run.external_trigger: - return timezone.coerce_datetime(self.execution_date) + return logical_date with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) - return dag.previous_schedule(self.execution_date) + return dag.previous_schedule(logical_date) @cache def get_prev_ds() -> Optional[str]: diff --git a/tests/conftest.py b/tests/conftest.py index f7248d1d73dff..9e72d37a5ea83 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -709,7 +709,15 @@ def create_task_instance(dag_maker, create_dummy_dag): Uses ``create_dummy_dag`` to create the dag structure. """ - def maker(execution_date=None, dagrun_state=None, state=None, run_id=None, run_type=None, **kwargs): + def maker( + execution_date=None, + dagrun_state=None, + state=None, + run_id=None, + run_type=None, + data_interval=None, + **kwargs, + ): if execution_date is None: from airflow.utils import timezone @@ -721,6 +729,8 @@ def maker(execution_date=None, dagrun_state=None, state=None, run_id=None, run_t dagrun_kwargs["run_id"] = run_id if run_type is not None: dagrun_kwargs["run_type"] = run_type + if data_interval is not None: + dagrun_kwargs["data_interval"] = data_interval dagrun = dag_maker.create_dagrun(**dagrun_kwargs) (ti,) = dagrun.task_instances ti.state = state diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 8458ea91540a6..d1113714071fd 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -30,6 +30,7 @@ from freezegun import freeze_time from airflow import models, settings +from airflow.example_dags.plugins.workday import AfterWorkdayTimetable from airflow.exceptions import ( AirflowException, AirflowFailException, @@ -1630,6 +1631,30 @@ def test_template_with_json_variable_missing(self, create_task_instance): with pytest.raises(KeyError): ti.task.render_template('{{ var.json.get("missing_variable") }}', context) + def test_tempalte_with_custom_timetable_deprecated_context(self, create_task_instance): + ti = create_task_instance( + start_date=DEFAULT_DATE, + timetable=AfterWorkdayTimetable(), + run_type=DagRunType.SCHEDULED, + execution_date=timezone.datetime(2021, 9, 6), + data_interval=(timezone.datetime(2021, 9, 6), timezone.datetime(2021, 9, 7)), + ) + context = ti.get_template_context() + with pytest.deprecated_call(): + assert context["execution_date"] == pendulum.DateTime(2021, 9, 6, tzinfo=timezone.TIMEZONE) + with pytest.deprecated_call(): + assert context["next_ds"] == "2021-09-07" + with pytest.deprecated_call(): + assert context["next_ds_nodash"] == "20210907" + with pytest.deprecated_call(): + assert context["next_execution_date"] == pendulum.DateTime(2021, 9, 7, tzinfo=timezone.TIMEZONE) + with pytest.deprecated_call(): + assert context["prev_ds"] is None, "Does not make sense for custom timetable" + with pytest.deprecated_call(): + assert context["prev_ds_nodash"] is None, "Does not make sense for custom timetable" + with pytest.deprecated_call(): + assert context["prev_execution_date"] is None, "Does not make sense for custom timetable" + def test_execute_callback(self, create_task_instance): called = False