From e39ca4fbb7d194a1503fa9ae831266e2384789a7 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 22 Dec 2021 11:01:54 +0800 Subject: [PATCH 1/2] Avoid calling following_schedule for ti context This can use a more modern mechanism since get_template_context() has enough context (namely, the current data interval). --- airflow/models/taskinstance.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index d051c5d5a7187..c894e47453529 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1845,14 +1845,14 @@ def get_prev_start_date_success() -> Optional[pendulum.DateTime]: @cache def get_yesterday_ds() -> str: - return (self.execution_date - timedelta(1)).strftime('%Y-%m-%d') + return (logical_date - timedelta(1)).strftime('%Y-%m-%d') def get_yesterday_ds_nodash() -> str: return get_yesterday_ds().replace('-', '') @cache def get_tomorrow_ds() -> str: - return (self.execution_date + timedelta(1)).strftime('%Y-%m-%d') + return (logical_date + timedelta(1)).strftime('%Y-%m-%d') def get_tomorrow_ds_nodash() -> str: return get_tomorrow_ds().replace('-', '') @@ -1860,18 +1860,15 @@ def get_tomorrow_ds_nodash() -> str: @cache def get_next_execution_date() -> Optional[pendulum.DateTime]: # For manually triggered dagruns that aren't run on a schedule, - # next/previous execution dates don't make sense, and should be set + # the "next" execution date doesn't make sense, and should be set # to execution date for consistency with how execution_date is set # for manually triggered tasks, i.e. triggered_date == execution_date. if dag_run.external_trigger: - next_execution_date = dag_run.execution_date - else: - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - next_execution_date = dag.following_schedule(self.execution_date) - if next_execution_date is None: + return logical_date + next_info = dag.next_dagrun_info(data_interval, restricted=False) + if next_info is None: return None - return timezone.coerce_datetime(next_execution_date) + return timezone.coerce_datetime(next_info.logical_date) def get_next_ds() -> Optional[str]: execution_date = get_next_execution_date() @@ -1887,11 +1884,15 @@ def get_next_ds_nodash() -> Optional[str]: @cache def get_prev_execution_date(): + # For manually triggered dagruns that aren't run on a schedule, + # the "previous" execution date doesn't make sense, and should be set + # to execution date for consistency with how execution_date is set + # for manually triggered tasks, i.e. triggered_date == execution_date. if dag_run.external_trigger: - return timezone.coerce_datetime(self.execution_date) + return logical_date with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) - return dag.previous_schedule(self.execution_date) + return dag.previous_schedule(logical_date) @cache def get_prev_ds() -> Optional[str]: From fde0c3266c68dc24cb8cc6a487fffb1ce10e9e71 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 22 Dec 2021 18:31:29 +0800 Subject: [PATCH 2/2] Test to ensure context works with custom timetable --- tests/conftest.py | 12 +++++++++++- tests/models/test_taskinstance.py | 25 +++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index e9a6b61efab4a..bfea15d0ab56d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -699,7 +699,15 @@ def create_task_instance(dag_maker, create_dummy_dag): Uses ``create_dummy_dag`` to create the dag structure. """ - def maker(execution_date=None, dagrun_state=None, state=None, run_id=None, run_type=None, **kwargs): + def maker( + execution_date=None, + dagrun_state=None, + state=None, + run_id=None, + run_type=None, + data_interval=None, + **kwargs, + ): if execution_date is None: from airflow.utils import timezone @@ -711,6 +719,8 @@ def maker(execution_date=None, dagrun_state=None, state=None, run_id=None, run_t dagrun_kwargs["run_id"] = run_id if run_type is not None: dagrun_kwargs["run_type"] = run_type + if data_interval is not None: + dagrun_kwargs["data_interval"] = data_interval dagrun = dag_maker.create_dagrun(**dagrun_kwargs) (ti,) = dagrun.task_instances ti.state = state diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index db6e9e384dfc6..14a8c7800df14 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -30,6 +30,7 @@ from freezegun import freeze_time from airflow import models, settings +from airflow.example_dags.plugins.workday import AfterWorkdayTimetable from airflow.exceptions import ( AirflowException, AirflowFailException, @@ -1634,6 +1635,30 @@ def test_template_with_json_variable_missing(self, create_task_instance): with pytest.raises(KeyError): ti.task.render_template('{{ var.json.get("missing_variable") }}', context) + def test_tempalte_with_custom_timetable_deprecated_context(self, create_task_instance): + ti = create_task_instance( + start_date=DEFAULT_DATE, + timetable=AfterWorkdayTimetable(), + run_type=DagRunType.SCHEDULED, + execution_date=timezone.datetime(2021, 9, 6), + data_interval=(timezone.datetime(2021, 9, 6), timezone.datetime(2021, 9, 7)), + ) + context = ti.get_template_context() + with pytest.deprecated_call(): + assert context["execution_date"] == pendulum.DateTime(2021, 9, 6, tzinfo=timezone.TIMEZONE) + with pytest.deprecated_call(): + assert context["next_ds"] == "2021-09-07" + with pytest.deprecated_call(): + assert context["next_ds_nodash"] == "20210907" + with pytest.deprecated_call(): + assert context["next_execution_date"] == pendulum.DateTime(2021, 9, 7, tzinfo=timezone.TIMEZONE) + with pytest.deprecated_call(): + assert context["prev_ds"] is None, "Does not make sense for custom timetable" + with pytest.deprecated_call(): + assert context["prev_ds_nodash"] is None, "Does not make sense for custom timetable" + with pytest.deprecated_call(): + assert context["prev_execution_date"] is None, "Does not make sense for custom timetable" + def test_execute_callback(self, create_task_instance): called = False