From 9876e19273cd56dc53d3a4e287db43acbfa65c4b Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Tue, 21 Dec 2021 18:00:46 +0800 Subject: [PATCH] Un-ignore DeprecationWarning (#20322) --- airflow/models/taskinstance.py | 40 ++++------ airflow/operators/datetime.py | 2 +- airflow/operators/python.py | 26 +++--- airflow/operators/weekday.py | 2 +- airflow/providers/http/operators/http.py | 10 +-- airflow/providers/http/sensors/http.py | 7 +- airflow/sensors/external_task.py | 24 +++--- airflow/sensors/weekday.py | 2 +- airflow/utils/context.py | 35 ++++++++ airflow/utils/context.pyi | 5 +- airflow/utils/helpers.py | 2 +- .../log/task_handler_with_custom_formatter.py | 4 +- airflow/utils/operator_helpers.py | 80 ++++++++++++++----- scripts/ci/images/ci_run_docker_tests.py | 2 - .../ci/kubernetes/ci_run_kubernetes_tests.sh | 7 +- scripts/in_container/entrypoint_ci.sh | 2 - tests/cli/commands/test_task_command.py | 2 + tests/core/test_core.py | 21 ++--- tests/operators/test_email.py | 2 +- tests/operators/test_python.py | 9 ++- tests/operators/test_trigger_dagrun.py | 2 +- tests/providers/http/sensors/test_http.py | 4 +- tests/sensors/test_external_task_sensor.py | 8 +- tests/utils/test_log_handlers.py | 6 +- 24 files changed, 192 insertions(+), 112 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 660da432d9806..d051c5d5a7187 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -99,6 +99,7 @@ from airflow.utils import timezone from airflow.utils.context import ConnectionAccessor, Context, VariableAccessor from airflow.utils.email import send_email +from airflow.utils.helpers import render_template_to_string from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname from airflow.utils.operator_helpers import context_to_airflow_vars @@ -2023,7 +2024,7 @@ def render_k8s_pod_yaml(self) -> Optional[dict]: sanitized_pod = ApiClient().sanitize_for_serialization(pod) return sanitized_pod - def get_email_subject_content(self, exception): + def get_email_subject_content(self, exception: BaseException) -> Tuple[str, str, str]: """Get the email subject content for exceptions.""" # For a ti from DB (without ti.task), return the default value # Reuse it for smart sensor to send default email alert @@ -2050,18 +2051,18 @@ def get_email_subject_content(self, exception): 'Mark success: Link
' ) + # This function is called after changing the state from State.RUNNING, + # so we need to subtract 1 from self.try_number here. + current_try_number = self.try_number - 1 + additional_context = { + "exception": exception, + "exception_html": exception_html, + "try_number": current_try_number, + "max_tries": self.max_tries, + } + if use_default: - jinja_context = {'ti': self} - # This function is called after changing the state - # from State.RUNNING so need to subtract 1 from self.try_number. - jinja_context.update( - dict( - exception=exception, - exception_html=exception_html, - try_number=self.try_number - 1, - max_tries=self.max_tries, - ) - ) + jinja_context = {"ti": self, **additional_context} jinja_env = jinja2.Environment( loader=jinja2.FileSystemLoader(os.path.dirname(__file__)), autoescape=True ) @@ -2071,24 +2072,15 @@ def get_email_subject_content(self, exception): else: jinja_context = self.get_template_context() - - jinja_context.update( - dict( - exception=exception, - exception_html=exception_html, - try_number=self.try_number - 1, - max_tries=self.max_tries, - ) - ) - + jinja_context.update(additional_context) jinja_env = self.task.get_template_env() - def render(key, content): + def render(key: str, content: str) -> str: if conf.has_option('email', key): path = conf.get('email', key) with open(path) as f: content = f.read() - return jinja_env.from_string(content).render(**jinja_context) + return render_template_to_string(jinja_env.from_string(content), jinja_context) subject = render('subject_template', default_subject) html_content = render('html_content_template', default_html_content) diff --git a/airflow/operators/datetime.py b/airflow/operators/datetime.py index 6750f12a7eb61..9e3620d5f067b 100644 --- a/airflow/operators/datetime.py +++ b/airflow/operators/datetime.py @@ -72,7 +72,7 @@ def __init__( def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]: if self.use_task_execution_date is True: - now = timezone.make_naive(context["execution_date"], self.dag.timezone) + now = timezone.make_naive(context["logical_date"], self.dag.timezone) else: now = timezone.make_naive(timezone.utcnow(), self.dag.timezone) diff --git a/airflow/operators/python.py b/airflow/operators/python.py index c908bcea92299..0f497460c3557 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -24,7 +24,7 @@ import warnings from tempfile import TemporaryDirectory from textwrap import dedent -from typing import Any, Callable, Dict, Iterable, List, Optional, Union +from typing import Any, Callable, Collection, Dict, Iterable, List, Mapping, Optional, Union import dill @@ -33,7 +33,7 @@ from airflow.models.skipmixin import SkipMixin from airflow.models.taskinstance import _CURRENT_CONTEXT from airflow.utils.context import Context, context_copy_partial -from airflow.utils.operator_helpers import determine_kwargs +from airflow.utils.operator_helpers import KeywordParameters from airflow.utils.process_utils import execute_in_subprocess from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script @@ -147,8 +147,8 @@ def __init__( self, *, python_callable: Callable, - op_args: Optional[List] = None, - op_kwargs: Optional[Dict] = None, + op_args: Optional[Collection[Any]] = None, + op_kwargs: Optional[Mapping[str, Any]] = None, templates_dict: Optional[Dict] = None, templates_exts: Optional[List[str]] = None, show_return_value_in_logs: bool = True, @@ -165,7 +165,7 @@ def __init__( if not callable(python_callable): raise AirflowException('`python_callable` param must be callable') self.python_callable = python_callable - self.op_args = op_args or [] + self.op_args = op_args or () self.op_kwargs = op_kwargs or {} self.templates_dict = templates_dict if templates_exts: @@ -176,7 +176,7 @@ def execute(self, context: Context) -> Any: context.update(self.op_kwargs) context['templates_dict'] = self.templates_dict - self.op_kwargs = determine_kwargs(self.python_callable, self.op_args, context) + self.op_kwargs = self.determine_kwargs(context) return_value = self.execute_callable() if self.show_return_value_in_logs: @@ -186,6 +186,9 @@ def execute(self, context: Context) -> Any: return return_value + def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: + return KeywordParameters.determine(self.python_callable, self.op_args, context).unpacking() + def execute_callable(self): """ Calls the python callable with the given arguments. @@ -252,11 +255,11 @@ def execute(self, context: Context) -> Any: self.log.info('Skipping downstream tasks...') - downstream_tasks = context['task'].get_flat_relatives(upstream=False) + downstream_tasks = context["task"].get_flat_relatives(upstream=False) self.log.debug("Downstream task_ids %s", downstream_tasks) if downstream_tasks: - self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks) + self.skip(context["dag_run"], context["logical_date"], downstream_tasks) self.log.info("Done.") @@ -356,8 +359,8 @@ def __init__( python_version: Optional[Union[str, int, float]] = None, use_dill: bool = False, system_site_packages: bool = True, - op_args: Optional[List] = None, - op_kwargs: Optional[Dict] = None, + op_args: Optional[Collection[Any]] = None, + op_kwargs: Optional[Mapping[str, Any]] = None, string_args: Optional[Iterable[str]] = None, templates_dict: Optional[Dict] = None, templates_exts: Optional[List[str]] = None, @@ -403,6 +406,9 @@ def execute(self, context: Context) -> Any: serializable_context = context_copy_partial(context, serializable_keys) return super().execute(context=serializable_context) + def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: + return KeywordParameters.determine(self.python_callable, self.op_args, context).serializing() + def execute_callable(self): with TemporaryDirectory(prefix='venv') as tmp_dir: if self.templates_dict: diff --git a/airflow/operators/weekday.py b/airflow/operators/weekday.py index e1167a5137d98..2e4e656fae1f2 100644 --- a/airflow/operators/weekday.py +++ b/airflow/operators/weekday.py @@ -67,7 +67,7 @@ def __init__( def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]: if self.use_task_execution_day: - now = context["execution_date"] + now = context["logical_date"] else: now = timezone.make_naive(timezone.utcnow(), self.dag.timezone) diff --git a/airflow/providers/http/operators/http.py b/airflow/providers/http/operators/http.py index b6295185d8ba8..d36ceb21b73aa 100644 --- a/airflow/providers/http/operators/http.py +++ b/airflow/providers/http/operators/http.py @@ -104,7 +104,7 @@ def __init__( raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead") def execute(self, context: Dict[str, Any]) -> Any: - from airflow.utils.operator_helpers import make_kwargs_callable + from airflow.utils.operator_helpers import determine_kwargs http = HttpHook(self.method, http_conn_id=self.http_conn_id, auth_type=self.auth_type) @@ -114,10 +114,10 @@ def execute(self, context: Dict[str, Any]) -> Any: if self.log_response: self.log.info(response.text) if self.response_check: - kwargs_callable = make_kwargs_callable(self.response_check) - if not kwargs_callable(response, **context): + kwargs = determine_kwargs(self.response_check, [response], context) + if not self.response_check(response, **kwargs): raise AirflowException("Response check returned False.") if self.response_filter: - kwargs_callable = make_kwargs_callable(self.response_filter) - return kwargs_callable(response, **context) + kwargs = determine_kwargs(self.response_filter, [response], context) + return self.response_filter(response, **kwargs) return response.text diff --git a/airflow/providers/http/sensors/http.py b/airflow/providers/http/sensors/http.py index 6ef55ea5a5641..e052c014cc851 100644 --- a/airflow/providers/http/sensors/http.py +++ b/airflow/providers/http/sensors/http.py @@ -96,7 +96,7 @@ def __init__( self.hook = HttpHook(method=method, http_conn_id=http_conn_id) def poke(self, context: Dict[Any, Any]) -> bool: - from airflow.utils.operator_helpers import make_kwargs_callable + from airflow.utils.operator_helpers import determine_kwargs self.log.info('Poking: %s', self.endpoint) try: @@ -107,9 +107,8 @@ def poke(self, context: Dict[Any, Any]) -> bool: extra_options=self.extra_options, ) if self.response_check: - kwargs_callable = make_kwargs_callable(self.response_check) - return kwargs_callable(response, **context) - + kwargs = determine_kwargs(self.response_check, [response], context) + return self.response_check(response, **kwargs) except AirflowException as exc: if str(exc).startswith("404"): return False diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py index 9295682fd822a..c9c025d326046 100644 --- a/airflow/sensors/external_task.py +++ b/airflow/sensors/external_task.py @@ -47,7 +47,7 @@ def get_link(self, operator, dttm): class ExternalTaskSensor(BaseSensorOperator): """ Waits for a different DAG or a task in a different DAG to complete for a - specific execution_date + specific logical date. :param external_dag_id: The dag_id that contains the task you want to wait for @@ -65,14 +65,14 @@ class ExternalTaskSensor(BaseSensorOperator): :param failed_states: Iterable of failed or dis-allowed states, default is ``None`` :type failed_states: Iterable :param execution_delta: time difference with the previous execution to - look at, the default is the same execution_date as the current task or DAG. + look at, the default is the same logical date as the current task or DAG. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both. :type execution_delta: Optional[datetime.timedelta] - :param execution_date_fn: function that receives the current execution date as the first + :param execution_date_fn: function that receives the current execution's logical date as the first positional argument and optionally any number of keyword arguments available in the - context dictionary, and returns the desired execution dates to query. + context dictionary, and returns the desired logical dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both. :type execution_date_fn: Optional[Callable] @@ -156,11 +156,11 @@ def __init__( @provide_session def poke(self, context, session=None): if self.execution_delta: - dttm = context['execution_date'] - self.execution_delta + dttm = context['logical_date'] - self.execution_delta elif self.execution_date_fn: dttm = self._handle_execution_date_fn(context=context) else: - dttm = context['execution_date'] + dttm = context['logical_date'] dttm_filter = dttm if isinstance(dttm, list) else [dttm] serialized_dttm_filter = ','.join(dt.isoformat() for dt in dttm_filter) @@ -259,16 +259,16 @@ def _handle_execution_date_fn(self, context) -> Any: """ from airflow.utils.operator_helpers import make_kwargs_callable - # Remove "execution_date" because it is already a mandatory positional argument - execution_date = context["execution_date"] - kwargs = {k: v for k, v in context.items() if k != "execution_date"} + # Remove "logical_date" because it is already a mandatory positional argument + logical_date = context["logical_date"] + kwargs = {k: v for k, v in context.items() if k not in {"execution_date", "logical_date"}} # Add "context" in the kwargs for backward compatibility (because context used to be # an acceptable argument of execution_date_fn) kwargs["context"] = context if TYPE_CHECKING: assert self.execution_date_fn is not None kwargs_callable = make_kwargs_callable(self.execution_date_fn) - return kwargs_callable(execution_date, **kwargs) + return kwargs_callable(logical_date, **kwargs) class ExternalTaskMarker(DummyOperator): @@ -282,7 +282,7 @@ class ExternalTaskMarker(DummyOperator): :type external_dag_id: str :param external_task_id: The task_id of the dependent task that needs to be cleared. :type external_task_id: str - :param execution_date: The execution_date of the dependent task that needs to be cleared. + :param execution_date: The logical date of the dependent task execution that needs to be cleared. :type execution_date: str or datetime.datetime :param recursion_depth: The maximum level of transitive dependencies allowed. Default is 10. This is mostly used for preventing cyclic dependencies. It is fine to increase @@ -301,7 +301,7 @@ def __init__( *, external_dag_id: str, external_task_id: str, - execution_date: Optional[Union[str, datetime.datetime]] = "{{ execution_date.isoformat() }}", + execution_date: Optional[Union[str, datetime.datetime]] = "{{ logical_date.isoformat() }}", recursion_depth: int = 10, **kwargs, ): diff --git a/airflow/sensors/weekday.py b/airflow/sensors/weekday.py index 03e3221493b9c..741e1660251db 100644 --- a/airflow/sensors/weekday.py +++ b/airflow/sensors/weekday.py @@ -84,6 +84,6 @@ def poke(self, context): WeekDay(timezone.utcnow().isoweekday()).name, ) if self.use_task_execution_day: - return context['execution_date'].isoweekday() in self._week_day_num + return context['logical_date'].isoweekday() in self._week_day_num else: return timezone.utcnow().isoweekday() in self._week_day_num diff --git a/airflow/utils/context.py b/airflow/utils/context.py index 5412b0965fd75..2568f138b74d8 100644 --- a/airflow/utils/context.py +++ b/airflow/utils/context.py @@ -20,6 +20,7 @@ import contextlib import copy +import functools import warnings from typing import ( AbstractSet, @@ -29,12 +30,15 @@ ItemsView, Iterator, List, + Mapping, MutableMapping, Optional, Tuple, ValuesView, ) +import lazy_object_proxy + from airflow.utils.types import NOTSET @@ -198,7 +202,38 @@ def context_copy_partial(source: Context, keys: Container[str]) -> "Context": This is implemented as a free function because the ``Context`` type is "faked" as a ``TypedDict`` in ``context.pyi``, which cannot have custom functions. + + :meta private: """ new = Context({k: v for k, v in source._context.items() if k in keys}) new._deprecation_replacements = source._deprecation_replacements.copy() return new + + +def lazy_mapping_from_context(source: Context) -> Mapping[str, Any]: + """Create a mapping that wraps deprecated entries in a lazy object proxy. + + This further delays deprecation warning to until when the entry is actually + used, instead of when it's accessed in the context. The result is useful for + passing into a callable with ``**kwargs``, which would unpack the mapping + too eagerly otherwise. + + This is implemented as a free function because the ``Context`` type is + "faked" as a ``TypedDict`` in ``context.pyi``, which cannot have custom + functions. + + :meta private: + """ + + def _deprecated_proxy_factory(k: str, v: Any) -> Any: + replacements = source._deprecation_replacements[k] + warnings.warn(_create_deprecation_warning(k, replacements)) + return v + + def _create_value(k: str, v: Any) -> Any: + if k not in source._deprecation_replacements: + return v + factory = functools.partial(_deprecated_proxy_factory, k, v) + return lazy_object_proxy.Proxy(factory) + + return {k: _create_value(k, v) for k, v in source._context.items()} diff --git a/airflow/utils/context.pyi b/airflow/utils/context.pyi index c479991cb975d..0e58005b96058 100644 --- a/airflow/utils/context.pyi +++ b/airflow/utils/context.pyi @@ -25,7 +25,7 @@ # undefined attribute errors from Mypy. Hopefully there will be a mechanism to # declare "these are defined, but don't error if others are accessed" someday. -from typing import Any, Container, Optional, Union +from typing import Any, Container, Mapping, Optional, Union from pendulum import DateTime @@ -89,4 +89,7 @@ class Context(TypedDict, total=False): yesterday_ds: str yesterday_ds_nodash: str +class AirflowContextDeprecationWarning(DeprecationWarning): ... + def context_copy_partial(source: Context, keys: Container[str]) -> Context: ... +def lazy_mapping_from_context(source: Context) -> Mapping[str, Any]: ... diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index 8950dcbbbc17c..611c2e4a3eb09 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -193,7 +193,7 @@ def render_log_filename(ti: "TaskInstance", try_number, filename_template) -> st if filename_jinja_template: jinja_context = ti.get_template_context() jinja_context['try_number'] = try_number - return filename_jinja_template.render(**jinja_context) + return render_template_to_string(filename_jinja_template, jinja_context) return filename_template.format( dag_id=ti.dag_id, diff --git a/airflow/utils/log/task_handler_with_custom_formatter.py b/airflow/utils/log/task_handler_with_custom_formatter.py index 5034d00fe16e9..b7b431b63222a 100644 --- a/airflow/utils/log/task_handler_with_custom_formatter.py +++ b/airflow/utils/log/task_handler_with_custom_formatter.py @@ -20,7 +20,7 @@ from logging import StreamHandler from airflow.configuration import conf -from airflow.utils.helpers import parse_template_string +from airflow.utils.helpers import parse_template_string, render_template_to_string class TaskHandlerWithCustomFormatter(StreamHandler): @@ -52,6 +52,6 @@ def set_context(self, ti): def _render_prefix(self, ti): if self.prefix_jinja_template: jinja_context = ti.get_template_context() - return self.prefix_jinja_template.render(**jinja_context) + return render_template_to_string(self.prefix_jinja_template, jinja_context) logging.warning("'task_log_prefix_template' is in invalid format, ignoring the variable value") return "" diff --git a/airflow/utils/operator_helpers.py b/airflow/utils/operator_helpers.py index e320f3c68c000..a0899ba196eb5 100644 --- a/airflow/utils/operator_helpers.py +++ b/airflow/utils/operator_helpers.py @@ -17,7 +17,9 @@ # under the License. # from datetime import datetime -from typing import Any, Callable, Dict, Mapping, Sequence, TypeVar +from typing import Any, Callable, Collection, Dict, Mapping, TypeVar + +from airflow.utils.context import Context, lazy_mapping_from_context R = TypeVar("R") @@ -90,9 +92,65 @@ def context_to_airflow_vars(context: Mapping[str, Any], in_env_var_format: bool return params +class KeywordParameters: + """Wrapper representing ``**kwargs`` to a callable. + + The actual ``kwargs`` can be obtained by calling either ``unpacking()`` or + ``serializing()``. They behave almost the same and are only different if + the containing ``kwargs`` is an Airflow Context object, and the calling + function uses ``**kwargs`` in the argument list. + + In this particular case, ``unpacking()`` uses ``lazy-object-proxy`` to + prevent the Context from emitting deprecation warnings too eagerly when it's + unpacked by ``**``. ``serializing()`` does not do this, and will allow the + warnings to be emitted eagerly, which is useful when you want to dump the + content and use it somewhere else without needing ``lazy-object-proxy``. + """ + + def __init__(self, kwargs: Mapping[str, Any], *, wildcard: bool) -> None: + self._kwargs = kwargs + self._wildcard = wildcard + + @classmethod + def determine( + cls, + func: Callable[..., Any], + args: Collection[Any], + kwargs: Mapping[str, Any], + ) -> "KeywordParameters": + import inspect + import itertools + + signature = inspect.signature(func) + has_wildcard_kwargs = any(p.kind == p.VAR_KEYWORD for p in signature.parameters.values()) + + for name in itertools.islice(signature.parameters.keys(), len(args)): + # Check if args conflict with names in kwargs. + if name in kwargs: + raise ValueError(f"The key {name!r} in args is a part of kwargs and therefore reserved.") + + if has_wildcard_kwargs: + # If the callable has a **kwargs argument, it's ready to accept all the kwargs. + return cls(kwargs, wildcard=True) + + # If the callable has no **kwargs argument, it only wants the arguments it requested. + kwargs = {key: kwargs[key] for key in signature.parameters if key in kwargs} + return cls(kwargs, wildcard=False) + + def unpacking(self) -> Mapping[str, Any]: + """Dump the kwargs mapping to unpack with ``**`` in a function call.""" + if self._wildcard and isinstance(self._kwargs, Context): + return lazy_mapping_from_context(self._kwargs) + return self._kwargs + + def serializing(self) -> Mapping[str, Any]: + """Dump the kwargs mapping for serialization purposes.""" + return self._kwargs + + def determine_kwargs( func: Callable[..., Any], - args: Sequence[Any], + args: Collection[Any], kwargs: Mapping[str, Any], ) -> Mapping[str, Any]: """ @@ -105,23 +163,7 @@ def determine_kwargs( :param kwargs: The keyword arguments that need to be filtered before passing to the callable. :return: A dictionary which contains the keyword arguments that are compatible with the callable. """ - import inspect - import itertools - - signature = inspect.signature(func) - has_kwargs = any(p.kind == p.VAR_KEYWORD for p in signature.parameters.values()) - - for name in itertools.islice(signature.parameters.keys(), len(args)): - # Check if args conflict with names in kwargs - if name in kwargs: - raise ValueError(f"The key {name} in args is part of kwargs and therefore reserved.") - - if has_kwargs: - # If the callable has a **kwargs argument, it's ready to accept all the kwargs. - return kwargs - - # If the callable has no **kwargs argument, it only wants the arguments it requested. - return {key: kwargs[key] for key in signature.parameters if key in kwargs} + return KeywordParameters.determine(func, args, kwargs).unpacking() def make_kwargs_callable(func: Callable[..., R]) -> Callable[..., R]: diff --git a/scripts/ci/images/ci_run_docker_tests.py b/scripts/ci/images/ci_run_docker_tests.py index c9c8a0571396c..19477233ee3d8 100755 --- a/scripts/ci/images/ci_run_docker_tests.py +++ b/scripts/ci/images/ci_run_docker_tests.py @@ -88,8 +88,6 @@ def main(): raise SystemExit("You must select the tests to run.") pytest_args = ( - "--pythonwarnings=ignore::DeprecationWarning", - "--pythonwarnings=ignore::PendingDeprecationWarning", "-n", "auto", ) diff --git a/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh b/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh index f73969152873a..ef920b4b5d547 100755 --- a/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh +++ b/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh @@ -52,10 +52,7 @@ function parse_tests_to_run() { else tests_to_run=("${@}") fi - pytest_args=( - "--pythonwarnings=ignore::DeprecationWarning" - "--pythonwarnings=ignore::PendingDeprecationWarning" - ) + pytest_args=() else tests_to_run=("kubernetes_tests") pytest_args=( @@ -64,8 +61,6 @@ function parse_tests_to_run() { "--durations=100" "--color=yes" "--maxfail=50" - "--pythonwarnings=ignore::DeprecationWarning" - "--pythonwarnings=ignore::PendingDeprecationWarning" ) fi diff --git a/scripts/in_container/entrypoint_ci.sh b/scripts/in_container/entrypoint_ci.sh index 1416149d2db50..6683b7a1eb35c 100755 --- a/scripts/in_container/entrypoint_ci.sh +++ b/scripts/in_container/entrypoint_ci.sh @@ -207,8 +207,6 @@ EXTRA_PYTEST_ARGS=( "--durations=100" "--maxfail=50" "--color=yes" - "--pythonwarnings=ignore::DeprecationWarning" - "--pythonwarnings=ignore::PendingDeprecationWarning" "--junitxml=${RESULT_LOG_FILE}" # timeouts in seconds for individual tests "--timeouts-order" diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index 0cc548e2874f3..324c8edace305 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -90,6 +90,7 @@ def test_cli_list_tasks(self): args = self.parser.parse_args(['tasks', 'list', 'example_bash_operator', '--tree']) task_command.task_list(args) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_test(self): """Test the `airflow test` command""" args = self.parser.parse_args( @@ -102,6 +103,7 @@ def test_test(self): # Check that prints, and log messages, are shown assert "'example_python_operator__print_the_context__20180101'" in stdout.getvalue() + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_test_with_existing_dag_run(self): """Test the `airflow test` command""" task_id = 'print_the_context' diff --git a/tests/core/test_core.py b/tests/core/test_core.py index 3f58c9e9b1448..c84419cd145ac 100644 --- a/tests/core/test_core.py +++ b/tests/core/test_core.py @@ -217,7 +217,7 @@ def test_timeout(self, dag_maker): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) def test_python_op(self, dag_maker): - def test_py_op(templates_dict, ds, **kwargs): + def test_py_op(templates_dict, ds): if not templates_dict['ds'] == ds: raise Exception("failure") @@ -245,10 +245,6 @@ def test_task_get_template(self, session): assert context['ds'] == '2015-01-01' assert context['ds_nodash'] == '20150101' - # next_ds is 2015-01-02 as the dag schedule is daily. - assert context['next_ds'] == '2015-01-02' - assert context['next_ds_nodash'] == '20150102' - assert context['ts'] == '2015-01-01T00:00:00+00:00' assert context['ts_nodash'] == '20150101T000000' assert context['ts_nodash_with_tz'] == '20150101T000000+0000' @@ -258,6 +254,8 @@ def test_task_get_template(self, session): # Test deprecated fields. expected_deprecated_fields = [ + ("next_ds", "2015-01-02"), + ("next_ds_nodash", "20150102"), ("prev_ds", "2014-12-31"), ("prev_ds_nodash", "20141231"), ("yesterday_ds", "2014-12-31"), @@ -266,14 +264,17 @@ def test_task_get_template(self, session): ("tomorrow_ds_nodash", "20150102"), ] for key, expected_value in expected_deprecated_fields: - message = ( + message_beginning = ( f"Accessing {key!r} from the template is deprecated and " f"will be removed in a future version." ) with pytest.deprecated_call() as recorder: value = str(context[key]) # Simulate template evaluation to trigger warning. assert value == expected_value - assert [str(m.message) for m in recorder] == [message] + + recorded_message = [str(m.message) for m in recorder] + assert len(recorded_message) == 1 + assert recorded_message[0].startswith(message_beginning) def test_bad_trigger_rule(self, dag_maker): with pytest.raises(AirflowException): @@ -337,8 +338,10 @@ def test_externally_triggered_dagrun(self, dag_maker): context = ti.get_template_context() # next_ds should be the execution date for manually triggered runs - assert context['next_ds'] == execution_ds - assert context['next_ds_nodash'] == execution_ds_nodash + with pytest.deprecated_call(): + assert context['next_ds'] == execution_ds + with pytest.deprecated_call(): + assert context['next_ds_nodash'] == execution_ds_nodash def test_dag_params_and_task_params(self, dag_maker): # This test case guards how params of DAG and Operator work together. diff --git a/tests/operators/test_email.py b/tests/operators/test_email.py index 43d9b62c8ae9d..7d6195214251f 100644 --- a/tests/operators/test_email.py +++ b/tests/operators/test_email.py @@ -50,7 +50,7 @@ def _run_as_operator(self, **kwargs): html_content='The quick brown fox jumps over the lazy dog', task_id='task', dag=self.dag, - files=["/tmp/Report-A-{{ execution_date.strftime('%Y-%m-%d') }}.csv"], + files=["/tmp/Report-A-{{ ds }}.csv"], custom_headers={'Reply-To': 'reply_to@example.com'}, **kwargs, ) diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py index 5dc365df443b2..ef13185c6e089 100644 --- a/tests/operators/test_python.py +++ b/tests/operators/test_python.py @@ -19,6 +19,7 @@ import logging import sys import unittest.mock +import warnings from collections import namedtuple from datetime import date, datetime, timedelta from subprocess import CalledProcessError @@ -39,6 +40,7 @@ get_current_context, ) from airflow.utils import timezone +from airflow.utils.context import AirflowContextDeprecationWarning from airflow.utils.dates import days_ago from airflow.utils.session import create_session from airflow.utils.state import State @@ -917,6 +919,7 @@ def f(templates_dict): # This tests might take longer than default 60 seconds as it is serializing a lot of # context using dill (which is slow apparently). @pytest.mark.execution_timeout(120) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_airflow_context(self): def f( # basic @@ -957,6 +960,7 @@ def f( self._run_as_operator(f, use_dill=True, system_site_packages=True, requirements=None) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_pendulum_context(self): def f( # basic @@ -990,6 +994,7 @@ def f( self._run_as_operator(f, use_dill=True, system_site_packages=False, requirements=['pendulum']) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_base_context(self): def f( # basic @@ -1093,7 +1098,9 @@ def execute(self, context): def get_all_the_context(**context): current_context = get_current_context() - assert context == current_context._context + with warnings.catch_warnings(): + warnings.simplefilter("ignore", AirflowContextDeprecationWarning) + assert context == current_context._context @pytest.fixture() diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index fc5cb5c6ae0f4..1934c4d4174b0 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -177,7 +177,7 @@ def test_trigger_dagrun_with_templated_execution_date(self): task = TriggerDagRunOperator( task_id="test_trigger_dagrun_with_str_execution_date", trigger_dag_id=TRIGGERED_DAG_ID, - execution_date="{{ execution_date }}", + execution_date="{{ logical_date }}", dag=self.dag, ) task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) diff --git a/tests/providers/http/sensors/test_http.py b/tests/providers/http/sensors/test_http.py index 27a5eaf85ab18..2c9a2bbb7f5c8 100644 --- a/tests/providers/http/sensors/test_http.py +++ b/tests/providers/http/sensors/test_http.py @@ -125,8 +125,8 @@ def test_poke_context(self, mock_session_send, create_task_instance_of_operator) response.status_code = 200 mock_session_send.return_value = response - def resp_check(_, execution_date): - if execution_date == DEFAULT_DATE: + def resp_check(_, logical_date): + if logical_date == DEFAULT_DATE: return True raise AirflowException('AirflowException raised here!') diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index 9580dc320d6ab..0249edfa1160f 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -174,7 +174,7 @@ def test_external_dag_sensor(self): def test_external_task_sensor_fn_multiple_execution_dates(self): bash_command_code = """ -{% set s=execution_date.time().second %} +{% set s=logical_date.time().second %} echo "second is {{ s }}" if [[ $(( {{ s }} % 60 )) == 1 ]] then @@ -292,7 +292,7 @@ def test_external_task_sensor_fn_multiple_args(self): self.test_time_sensor() def my_func(dt, context): - assert context['execution_date'] == dt + assert context['logical_date'] == dt return dt + timedelta(0) op1 = ExternalTaskSensor( @@ -541,7 +541,7 @@ def dag_bag_parent_child(): task_id="task_1", external_dag_id=dag_0.dag_id, external_task_id=task_0.task_id, - execution_date_fn=lambda execution_date: day_1 if execution_date == day_1 else [], + execution_date_fn=lambda logical_date: day_1 if logical_date == day_1 else [], mode='reschedule', ) @@ -889,7 +889,7 @@ def dag_bag_head_tail(): task_id="tail", external_dag_id=dag.dag_id, external_task_id=head.task_id, - execution_date="{{ tomorrow_ds_nodash }}", + execution_date="{{ macros.ds_add(ds, 1) }}", ) head >> body >> tail diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index d7cec842c39cf..f4b4f7b2e31d7 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -62,7 +62,7 @@ def test_default_task_logging_setup(self): assert handler.name == FILE_TASK_HANDLER def test_file_task_handler_when_ti_value_is_invalid(self): - def task_callable(ti, **kwargs): + def task_callable(ti): ti.log.info("test") dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE) @@ -114,7 +114,7 @@ def task_callable(ti, **kwargs): os.remove(log_filename) def test_file_task_handler(self): - def task_callable(ti, **kwargs): + def task_callable(ti): ti.log.info("test") dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE) @@ -168,7 +168,7 @@ def task_callable(ti, **kwargs): os.remove(log_filename) def test_file_task_handler_running(self): - def task_callable(ti, **kwargs): + def task_callable(ti): ti.log.info("test") dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE)