diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py index f375fbf7167ba..6004a397e4bfc 100644 --- a/airflow/decorators/__init__.py +++ b/airflow/decorators/__init__.py @@ -19,6 +19,7 @@ from airflow.decorators.base import TaskDecorator from airflow.decorators.branch_python import branch_task +from airflow.decorators.external_python import external_python_task from airflow.decorators.python import python_task from airflow.decorators.python_virtualenv import virtualenv_task from airflow.decorators.task_group import task_group @@ -34,6 +35,7 @@ "task_group", "python_task", "virtualenv_task", + "external_python_task", "branch_task", ] @@ -43,6 +45,7 @@ class TaskDecoratorCollection: python = staticmethod(python_task) virtualenv = staticmethod(virtualenv_task) + external_python = staticmethod(external_python_task) branch = staticmethod(branch_task) __call__: Any = python # Alias '@task' to '@task.python'. diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index 9238b5f6daa7c..b5992cf51302e 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -26,6 +26,7 @@ from kubernetes.client import models as k8s from airflow.decorators.base import FParams, FReturn, Task, TaskDecorator from airflow.decorators.branch_python import branch_task +from airflow.decorators.external_python import external_python_task from airflow.decorators.python import python_task from airflow.decorators.python_virtualenv import virtualenv_task from airflow.decorators.task_group import task_group @@ -41,6 +42,7 @@ __all__ = [ "task_group", "python_task", "virtualenv_task", + "external_python_task", "branch_task", ] @@ -126,6 +128,37 @@ class TaskDecoratorCollection: """ @overload def virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... + def external_python( + self, + *, + python: str, + multiple_outputs: Optional[bool] = None, + # 'python_callable', 'op_args' and 'op_kwargs' since they are filled by + # _PythonVirtualenvDecoratedOperator. + use_dill: bool = False, + templates_dict: Optional[Mapping[str, Any]] = None, + show_return_value_in_logs: bool = True, + **kwargs, + ) -> TaskDecorator: + """Create a decorator to convert the decorated callable to a virtual environment task. + + :param python: Full path string (file-system specific) that points to a Python binary inside + a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path + (so usually start with "/" or "X:/" depending on the filesystem/os used). + :param multiple_outputs: If set, function return value will be unrolled to multiple XCom values. + Dict will unroll to XCom values with keys as XCom keys. Defaults to False. + :param use_dill: Whether to use dill to serialize + the args and result (pickle is default). This allow more complex types + but requires you to include dill in your requirements. + :param templates_dict: a dictionary where the values are templates that + will get templated by the Airflow engine sometime between + ``__init__`` and ``execute`` takes place and are made available + in your callable's context after the template has been applied. + :param show_return_value_in_logs: a bool value whether to show return_value + logs. Defaults to True, which allows return value log output. + It can be set to False to prevent log output of return value when you return huge data + such as transmission a large amount of XCom to TaskAPI. + """ @overload def branch(self, *, multiple_outputs: Optional[bool] = None, **kwargs) -> TaskDecorator: """Create a decorator to wrap the decorated callable into a BranchPythonOperator. diff --git a/airflow/decorators/branch_python.py b/airflow/decorators/branch_python.py index aa4a3c5c3d8b1..d4adce91db055 100644 --- a/airflow/decorators/branch_python.py +++ b/airflow/decorators/branch_python.py @@ -21,7 +21,7 @@ from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory from airflow.operators.python import BranchPythonOperator -from airflow.utils.python_virtualenv import remove_task_decorator +from airflow.utils.decorators import remove_task_decorator class _BranchPythonDecoratedOperator(DecoratedOperator, BranchPythonOperator): diff --git a/airflow/decorators/external_python.py b/airflow/decorators/external_python.py new file mode 100644 index 0000000000000..7291e19052d09 --- /dev/null +++ b/airflow/decorators/external_python.py @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import inspect +from textwrap import dedent +from typing import Callable, Optional, Sequence + +from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory +from airflow.operators.python import ExternalPythonOperator +from airflow.utils.decorators import remove_task_decorator + + +class _PythonExternalDecoratedOperator(DecoratedOperator, ExternalPythonOperator): + """ + Wraps a Python callable and captures args/kwargs when called for execution. + + :param python: Full path string (file-system specific) that points to a Python binary inside + a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path + (so usually start with "/" or "X:/" depending on the filesystem/os used). + :param python_callable: A reference to an object that is callable + :param op_kwargs: a dictionary of keyword arguments that will get unpacked + in your function (templated) + :param op_args: a list of positional arguments that will get unpacked when + calling your callable (templated) + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. + """ + + template_fields: Sequence[str] = ('op_args', 'op_kwargs') + template_fields_renderers = {"op_args": "py", "op_kwargs": "py"} + + # since we won't mutate the arguments, we should just do the shallow copy + # there are some cases we can't deepcopy the objects (e.g protobuf). + shallow_copy_attrs: Sequence[str] = ('python_callable',) + + custom_operator_name: str = '@task.external_python' + + def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs) -> None: + kwargs_to_upstream = { + "python_callable": python_callable, + "op_args": op_args, + "op_kwargs": op_kwargs, + } + super().__init__( + kwargs_to_upstream=kwargs_to_upstream, + python_callable=python_callable, + op_args=op_args, + op_kwargs=op_kwargs, + **kwargs, + ) + + def get_python_source(self): + raw_source = inspect.getsource(self.python_callable) + res = dedent(raw_source) + res = remove_task_decorator(res, "@task.external_python") + return res + + +def external_python_task( + python: Optional[str] = None, + python_callable: Optional[Callable] = None, + multiple_outputs: Optional[bool] = None, + **kwargs, +) -> TaskDecorator: + """Wraps a callable into an Airflow operator to run via a Python virtual environment. + + Accepts kwargs for operator kwarg. Can be reused in a single DAG. + + This function is only used during type checking or auto-completion. + + :meta private: + + :param python: Full path string (file-system specific) that points to a Python binary inside + a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path + (so usually start with "/" or "X:/" depending on the filesystem/os used). + :param python_callable: Function to decorate + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. + Defaults to False. + """ + return task_decorator_factory( + python=python, + python_callable=python_callable, + multiple_outputs=multiple_outputs, + decorated_operator_class=_PythonExternalDecoratedOperator, + **kwargs, + ) diff --git a/airflow/decorators/python_virtualenv.py b/airflow/decorators/python_virtualenv.py index ec954fbc4fd4e..523879b0632a4 100644 --- a/airflow/decorators/python_virtualenv.py +++ b/airflow/decorators/python_virtualenv.py @@ -21,7 +21,7 @@ from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory from airflow.operators.python import PythonVirtualenvOperator -from airflow.utils.python_virtualenv import remove_task_decorator +from airflow.utils.decorators import remove_task_decorator class _PythonVirtualenvDecoratedOperator(DecoratedOperator, PythonVirtualenvOperator): diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index 9b4bc52079faa..5752d394dfdbd 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -21,7 +21,10 @@ virtual environment. """ import logging +import os import shutil +import sys +import tempfile import time from pprint import pprint @@ -32,6 +35,10 @@ log = logging.getLogger(__name__) +PYTHON = sys.executable + +BASE_DIR = tempfile.gettempdir() + with DAG( dag_id='example_python_operator', schedule=None, @@ -86,10 +93,36 @@ def callable_virtualenv(): print(Back.GREEN + 'and with a green background') print(Style.DIM + 'and in dim text') print(Style.RESET_ALL) - for _ in range(10): + for _ in range(4): print(Style.DIM + 'Please wait...', flush=True) - sleep(10) + sleep(1) print('Finished') virtualenv_task = callable_virtualenv() # [END howto_operator_python_venv] + + sleeping_task >> virtualenv_task + + # [START howto_operator_external_python] + @task.external_python(task_id="external_python", python=os.fspath(sys.executable)) + def callable_external_python(): + """ + Example function that will be performed in a virtual environment. + + Importing at the module level ensures that it will not attempt to import the + library before it is installed. + """ + import sys + from time import sleep + + print(f"Running task via {sys.executable}") + print("Sleeping") + for _ in range(4): + print('Please wait...', flush=True) + sleep(1) + print('Finished') + + external_python_task = callable_external_python() + # [END howto_operator_external_python] + + run_this >> external_python_task diff --git a/airflow/operators/python.py b/airflow/operators/python.py index 8a96e215d1bc7..e51f52f527346 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -19,16 +19,19 @@ import os import pickle import shutil +import subprocess import sys import types import warnings +from abc import ABCMeta, abstractmethod +from pathlib import Path from tempfile import TemporaryDirectory from textwrap import dedent from typing import Any, Callable, Collection, Dict, Iterable, List, Mapping, Optional, Sequence, Union import dill -from airflow.exceptions import AirflowException, RemovedInAirflow3Warning +from airflow.exceptions import AirflowConfigException, AirflowException, RemovedInAirflow3Warning from airflow.models.baseoperator import BaseOperator from airflow.models.skipmixin import SkipMixin from airflow.models.taskinstance import _CURRENT_CONTEXT @@ -36,6 +39,7 @@ from airflow.utils.operator_helpers import KeywordParameters from airflow.utils.process_utils import execute_in_subprocess from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script +from airflow.version import version as airflow_version def task(python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs): @@ -279,7 +283,152 @@ def execute(self, context: Context) -> Any: self.log.info("Done.") -class PythonVirtualenvOperator(PythonOperator): +class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): + BASE_SERIALIZABLE_CONTEXT_KEYS = { + 'ds', + 'ds_nodash', + 'inlets', + 'next_ds', + 'next_ds_nodash', + 'outlets', + 'prev_ds', + 'prev_ds_nodash', + 'run_id', + 'task_instance_key_str', + 'test_mode', + 'tomorrow_ds', + 'tomorrow_ds_nodash', + 'ts', + 'ts_nodash', + 'ts_nodash_with_tz', + 'yesterday_ds', + 'yesterday_ds_nodash', + } + PENDULUM_SERIALIZABLE_CONTEXT_KEYS = { + 'data_interval_end', + 'data_interval_start', + 'execution_date', + 'logical_date', + 'next_execution_date', + 'prev_data_interval_end_success', + 'prev_data_interval_start_success', + 'prev_execution_date', + 'prev_execution_date_success', + 'prev_start_date_success', + } + AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = {'macros', 'conf', 'dag', 'dag_run', 'task', 'params'} + + def __init__( + self, + *, + python_callable: Callable, + use_dill: bool = False, + op_args: Optional[Collection[Any]] = None, + op_kwargs: Optional[Mapping[str, Any]] = None, + string_args: Optional[Iterable[str]] = None, + templates_dict: Optional[Dict] = None, + templates_exts: Optional[List[str]] = None, + expect_airflow: bool = True, + **kwargs, + ): + if ( + not isinstance(python_callable, types.FunctionType) + or isinstance(python_callable, types.LambdaType) + and python_callable.__name__ == "" + ): + raise AirflowException('PythonVirtualenvOperator only supports functions for python_callable arg') + super().__init__( + python_callable=python_callable, + op_args=op_args, + op_kwargs=op_kwargs, + templates_dict=templates_dict, + templates_exts=templates_exts, + **kwargs, + ) + self.string_args = string_args or [] + self.use_dill = use_dill + self.pickling_library = dill if self.use_dill else pickle + self.expect_airflow = expect_airflow + + @abstractmethod + def _iter_serializable_context_keys(self): + pass + + def execute(self, context: Context) -> Any: + serializable_keys = set(self._iter_serializable_context_keys()) + serializable_context = context_copy_partial(context, serializable_keys) + return super().execute(context=serializable_context) + + def get_python_source(self): + """ + Returns the source of self.python_callable + @return: + """ + return dedent(inspect.getsource(self.python_callable)) + + def _write_args(self, file: Path): + if self.op_args or self.op_kwargs: + file.write_bytes(self.pickling_library.dumps({'args': self.op_args, 'kwargs': self.op_kwargs})) + + def _write_string_args(self, file: Path): + file.write_text('\n'.join(map(str, self.string_args))) + + def _read_result(self, path: Path): + if path.stat().st_size == 0: + return None + try: + return self.pickling_library.loads(path.read_bytes()) + except ValueError: + self.log.error( + "Error deserializing result. Note that result deserialization " + "is not supported across major Python versions." + ) + raise + + def __deepcopy__(self, memo): + # module objects can't be copied _at all__ + memo[id(self.pickling_library)] = self.pickling_library + return super().__deepcopy__(memo) + + def _execute_python_callable_in_subprocess(self, python_path: Path, tmp_dir: Path): + op_kwargs: Dict[str, Any] = {k: v for k, v in self.op_kwargs.items()} + if self.templates_dict: + op_kwargs['templates_dict'] = self.templates_dict + input_path = tmp_dir / 'script.in' + output_path = tmp_dir / 'script.out' + string_args_path = tmp_dir / 'string_args.txt' + script_path = tmp_dir / 'script.py' + self._write_args(input_path) + self._write_string_args(string_args_path) + write_python_script( + jinja_context=dict( + op_args=self.op_args, + op_kwargs=op_kwargs, + expect_airflow=self.expect_airflow, + pickling_library=self.pickling_library.__name__, + python_callable=self.python_callable.__name__, + python_callable_source=self.get_python_source(), + ), + filename=os.fspath(script_path), + render_template_as_native_obj=self.dag.render_template_as_native_obj, + ) + + execute_in_subprocess( + cmd=[ + os.fspath(python_path), + os.fspath(script_path), + os.fspath(input_path), + os.fspath(output_path), + os.fspath(string_args_path), + ] + ) + return self._read_result(output_path) + + def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: + return KeywordParameters.determine(self.python_callable, self.op_args, context).serializing() + + +class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): """ Allows one to run a function in a virtualenv that is created and destroyed automatically (with certain caveats). @@ -323,44 +472,13 @@ class PythonVirtualenvOperator(PythonOperator): in your callable's context after the template has been applied :param templates_exts: a list of file extensions to resolve while processing templated fields, for examples ``['.sql', '.hql']`` + :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator + will raise warning if Airflow is not installed, and it will attempt to load Airflow + macros when starting. """ template_fields: Sequence[str] = tuple({'requirements'} | set(PythonOperator.template_fields)) - template_ext: Sequence[str] = ('.txt',) - BASE_SERIALIZABLE_CONTEXT_KEYS = { - 'ds', - 'ds_nodash', - 'inlets', - 'next_ds', - 'next_ds_nodash', - 'outlets', - 'prev_ds', - 'prev_ds_nodash', - 'run_id', - 'task_instance_key_str', - 'test_mode', - 'tomorrow_ds', - 'tomorrow_ds_nodash', - 'ts', - 'ts_nodash', - 'ts_nodash_with_tz', - 'yesterday_ds', - 'yesterday_ds_nodash', - } - PENDULUM_SERIALIZABLE_CONTEXT_KEYS = { - 'data_interval_end', - 'data_interval_start', - 'execution_date', - 'logical_date', - 'next_execution_date', - 'prev_data_interval_end_success', - 'prev_data_interval_start_success', - 'prev_execution_date', - 'prev_execution_date_success', - 'prev_start_date_success', - } - AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = {'macros', 'conf', 'dag', 'dag_run', 'task', 'params'} def __init__( self, @@ -376,14 +494,9 @@ def __init__( string_args: Optional[Iterable[str]] = None, templates_dict: Optional[Dict] = None, templates_exts: Optional[List[str]] = None, + expect_airflow: bool = True, **kwargs, ): - if ( - not isinstance(python_callable, types.FunctionType) - or isinstance(python_callable, types.LambdaType) - and python_callable.__name__ == "" - ): - raise AirflowException('PythonVirtualenvOperator only supports functions for python_callable arg') if ( python_version and str(python_version)[0] != str(sys.version_info.major) @@ -392,40 +505,34 @@ def __init__( raise AirflowException( "Passing op_args or op_kwargs is not supported across different Python " "major versions for PythonVirtualenvOperator. Please use string_args." + f"Sys version: {sys.version_info}. Venv version: {python_version}" ) if not shutil.which("virtualenv"): raise AirflowException('PythonVirtualenvOperator requires virtualenv, please install it.') - super().__init__( - python_callable=python_callable, - op_args=op_args, - op_kwargs=op_kwargs, - templates_dict=templates_dict, - templates_exts=templates_exts, - **kwargs, - ) if not requirements: self.requirements: Union[List[str], str] = [] elif isinstance(requirements, str): self.requirements = requirements else: self.requirements = list(requirements) - self.string_args = string_args or [] self.python_version = python_version - self.use_dill = use_dill self.system_site_packages = system_site_packages self.pip_install_options = pip_install_options - self.pickling_library = dill if self.use_dill else pickle - - def execute(self, context: Context) -> Any: - serializable_keys = set(self._iter_serializable_context_keys()) - serializable_context = context_copy_partial(context, serializable_keys) - return super().execute(context=serializable_context) - - def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: - return KeywordParameters.determine(self.python_callable, self.op_args, context).serializing() + super().__init__( + python_callable=python_callable, + use_dill=use_dill, + op_args=op_args, + op_kwargs=op_kwargs, + string_args=string_args, + templates_dict=templates_dict, + templates_exts=templates_exts, + expect_airflow=expect_airflow, + **kwargs, + ) def execute_callable(self): with TemporaryDirectory(prefix='venv') as tmp_dir: + tmp_path = Path(tmp_dir) requirements_file_name = f'{tmp_dir}/requirements.txt' if not isinstance(self.requirements, str): @@ -438,15 +545,6 @@ def execute_callable(self): with open(requirements_file_name, 'w') as file: file.write(requirements_file_contents) - - if self.templates_dict: - self.op_kwargs['templates_dict'] = self.templates_dict - - input_filename = os.path.join(tmp_dir, 'script.in') - output_filename = os.path.join(tmp_dir, 'script.out') - string_args_filename = os.path.join(tmp_dir, 'string_args.txt') - script_filename = os.path.join(tmp_dir, 'script.py') - prepare_virtualenv( venv_directory=tmp_dir, python_bin=f'python{self.python_version}' if self.python_version else None, @@ -454,74 +552,173 @@ def execute_callable(self): requirements_file_path=requirements_file_name, pip_install_options=self.pip_install_options, ) + python_path = tmp_path / "bin" / "python" - self._write_args(input_filename) - self._write_string_args(string_args_filename) - write_python_script( - jinja_context=dict( - op_args=self.op_args, - op_kwargs=self.op_kwargs, - pickling_library=self.pickling_library.__name__, - python_callable=self.python_callable.__name__, - python_callable_source=self.get_python_source(), - ), - filename=script_filename, - render_template_as_native_obj=self.dag.render_template_as_native_obj, - ) + return self._execute_python_callable_in_subprocess(python_path, tmp_path) - execute_in_subprocess( - cmd=[ - f'{tmp_dir}/bin/python', - script_filename, - input_filename, - output_filename, - string_args_filename, - ] - ) + def _iter_serializable_context_keys(self): + yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS + if self.system_site_packages or 'apache-airflow' in self.requirements: + yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS + yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS + elif 'pendulum' in self.requirements: + yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS - return self._read_result(output_filename) - def get_python_source(self): - """ - Returns the source of self.python_callable - @return: - """ - return dedent(inspect.getsource(self.python_callable)) +class ExternalPythonOperator(_BasePythonVirtualenvOperator): + """ + Allows one to run a function in a virtualenv that is not re-created but used as is + without the overhead of creating the virtualenv (with certain caveats). - def _write_args(self, filename): - if self.op_args or self.op_kwargs: - with open(filename, 'wb') as file: - self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file) + The function must be defined using def, and not be + part of a class. All imports must happen inside the function + and no variables outside the scope may be referenced. A global scope + variable named virtualenv_string_args will be available (populated by + string_args). In addition, one can pass stuff through op_args and op_kwargs, and one + can use a return value. + Note that if your virtualenv runs in a different Python major version than Airflow, + you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to + Airflow through plugins. You can use string_args though. + + If Airflow is installed in the external environment in different version that the version + used by the operator, the operator will fail., + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:ExternalPythonOperator` + + :param python: Full path string (file-system specific) that points to a Python binary inside + a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path + (so usually start with "/" or "X:/" depending on the filesystem/os used). + :param python_callable: A python function with no references to outside variables, + defined with def, which will be run in a virtualenv + :param use_dill: Whether to use dill to serialize + the args and result (pickle is default). This allow more complex types + but if dill is not preinstalled in your venv, the task will fail with use_dill enabled. + :param op_args: A list of positional arguments to pass to python_callable. + :param op_kwargs: A dict of keyword arguments to pass to python_callable. + :param string_args: Strings that are present in the global var virtualenv_string_args, + available to python_callable at runtime as a list[str]. Note that args are split + by newline. + :param templates_dict: a dictionary where the values are templates that + will get templated by the Airflow engine sometime between + ``__init__`` and ``execute`` takes place and are made available + in your callable's context after the template has been applied + :param templates_exts: a list of file extensions to resolve while + processing templated fields, for examples ``['.sql', '.hql']`` + :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator + will raise warning if Airflow is not installed, and it will attempt to load Airflow + macros when starting. + """ + + template_fields: Sequence[str] = tuple({'python_path'} | set(PythonOperator.template_fields)) + + def __init__( + self, + *, + python: str, + python_callable: Callable, + use_dill: bool = False, + op_args: Optional[Collection[Any]] = None, + op_kwargs: Optional[Mapping[str, Any]] = None, + string_args: Optional[Iterable[str]] = None, + templates_dict: Optional[Dict] = None, + templates_exts: Optional[List[str]] = None, + expect_airflow: bool = True, + expect_pendulum: bool = False, + **kwargs, + ): + if not python: + raise ValueError("Python Path must be defined in ExternalPythonOperator") + self.python = python + self.expect_pendulum = expect_pendulum + super().__init__( + python_callable=python_callable, + use_dill=use_dill, + op_args=op_args, + op_kwargs=op_kwargs, + string_args=string_args, + templates_dict=templates_dict, + templates_exts=templates_exts, + expect_airflow=expect_airflow, + **kwargs, + ) + + def execute_callable(self): + python_path = Path(self.python) + if not python_path.exists(): + raise ValueError(f"Python Path '{python_path}' must exists") + if not python_path.is_file(): + raise ValueError(f"Python Path '{python_path}' must be a file") + if not python_path.is_absolute(): + raise ValueError(f"Python Path '{python_path}' must be an absolute path.") + python_version_as_list_of_strings = self._get_python_version_from_environment() + if ( + python_version_as_list_of_strings + and str(python_version_as_list_of_strings[0]) != str(sys.version_info.major) + and (self.op_args or self.op_kwargs) + ): + raise AirflowException( + "Passing op_args or op_kwargs is not supported across different Python " + "major versions for ExternalPythonOperator. Please use string_args." + f"Sys version: {sys.version_info}. Venv version: {python_version_as_list_of_strings}" + ) + with TemporaryDirectory(prefix='tmd') as tmp_dir: + tmp_path = Path(tmp_dir) + return self._execute_python_callable_in_subprocess(python_path, tmp_path) + + def _get_python_version_from_environment(self) -> List[str]: + try: + result = subprocess.check_output([self.python, "--version"], text=True) + return result.strip().split(" ")[-1].split(".") + except Exception as e: + raise ValueError(f"Error while executing {self.python}: {e}") def _iter_serializable_context_keys(self): yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS - if self.system_site_packages or 'apache-airflow' in self.requirements: + if self._get_airflow_version_from_target_env(): yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS - elif 'pendulum' in self.requirements: + elif self._is_pendulum_installed_in_target_env(): yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS - def _write_string_args(self, filename): - with open(filename, 'w') as file: - file.write('\n'.join(map(str, self.string_args))) - - def _read_result(self, filename): - if os.stat(filename).st_size == 0: - return None - with open(filename, 'rb') as file: - try: - return self.pickling_library.load(file) - except ValueError: - self.log.error( - "Error deserializing result. Note that result deserialization " - "is not supported across major Python versions." + def _is_pendulum_installed_in_target_env(self) -> bool: + try: + subprocess.check_call([self.python, "-c", "import pendulum"]) + return True + except Exception as e: + if self.expect_pendulum: + self.log.warning("When checking for Pendulum installed in venv got %s", e) + self.log.warning( + "Pendulum is not properly installed in the virtualenv " + "Pendulum context keys will not be available. " + "Please Install Pendulum or Airflow in your venv to access them." ) - raise + return False - def __deepcopy__(self, memo): - # module objects can't be copied _at all__ - memo[id(self.pickling_library)] = self.pickling_library - return super().__deepcopy__(memo) + def _get_airflow_version_from_target_env(self) -> Optional[str]: + try: + result = subprocess.check_output( + [self.python, "-c", "from airflow import version; print(version.version)"], text=True + ) + target_airflow_version = result.strip() + if target_airflow_version != airflow_version: + raise AirflowConfigException( + f"The version of Airflow installed for the {self.python}(" + f"{target_airflow_version}) is different than the runtime Airflow version: " + f"{airflow_version}. Make sure your environment has the same Airflow version " + f"installed as the Airflow runtime." + ) + return target_airflow_version + except Exception as e: + if self.expect_airflow: + self.log.warning("When checking for Airflow installed in venv got %s", e) + self.log.warning( + f"This means that Airflow is not properly installed by " + f"{self.python}. Airflow context keys will not be available. " + f"Please Install Airflow {airflow_version} in your environment to access them." + ) + return None def get_current_context() -> Context: diff --git a/airflow/providers/docker/decorators/docker.py b/airflow/providers/docker/decorators/docker.py index bfd2b4db5d31f..02fbc4f26964e 100644 --- a/airflow/providers/docker/decorators/docker.py +++ b/airflow/providers/docker/decorators/docker.py @@ -27,7 +27,15 @@ from airflow.decorators.base import DecoratedOperator, task_decorator_factory from airflow.providers.docker.operators.docker import DockerOperator -from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script + +try: + from airflow.utils.decorators import remove_task_decorator + + # This can be removed after we move to Airflow 2.4+ +except ImportError: + from airflow.utils.python_virtualenv import remove_task_decorator + +from airflow.utils.python_virtualenv import write_python_script if TYPE_CHECKING: from airflow.decorators.base import TaskDecorator @@ -53,6 +61,10 @@ class _DockerDecoratedOperator(DecoratedOperator, DockerOperator): Wraps a Python callable and captures args/kwargs when called for execution. :param python_callable: A reference to an object that is callable + :param python: Python binary name to use + :param use_dill: Whether dill should be used to serialize the callable + :param expect_airflow: whether to expect airflow to be installed in the docker environment. if this + one is specified, the script to run callable will attempt to load Airflow macros. :param op_kwargs: a dictionary of keyword arguments that will get unpacked in your function (templated) :param op_args: a list of positional arguments that will get unpacked when @@ -74,10 +86,12 @@ def __init__( self, use_dill=False, python_command='python3', + expect_airflow: bool = True, **kwargs, ) -> None: command = "dummy command" self.python_command = python_command + self.expect_airflow = expect_airflow self.pickling_library = dill if use_dill else pickle super().__init__( command=command, retrieve_output=True, retrieve_output_path="/tmp/script.out", **kwargs @@ -107,6 +121,7 @@ def execute(self, context: 'Context'): pickling_library=self.pickling_library.__name__, python_callable=self.python_callable.__name__, python_callable_source=py_source, + expect_airflow=self.expect_airflow, string_args_global=False, ), filename=script_filename, diff --git a/airflow/utils/decorators.py b/airflow/utils/decorators.py index f774e59b16cfe..b816a7db5d0bf 100644 --- a/airflow/utils/decorators.py +++ b/airflow/utils/decorators.py @@ -18,6 +18,7 @@ # import warnings +from collections import deque from functools import wraps from typing import Callable, TypeVar, cast @@ -52,3 +53,33 @@ def wrapper(*args, **kwargs): return func(*args, **kwargs) return cast(T, wrapper) + + +def remove_task_decorator(python_source: str, task_decorator_name: str) -> str: + """ + Removed @task. + + :param python_source: + """ + if task_decorator_name not in python_source: + return python_source + split = python_source.split(task_decorator_name) + before_decorator, after_decorator = split[0], split[1] + if after_decorator[0] == "(": + after_decorator = _balance_parens(after_decorator) + if after_decorator[0] == "\n": + after_decorator = after_decorator[1:] + return before_decorator + after_decorator + + +def _balance_parens(after_decorator): + num_paren = 1 + after_decorator = deque(after_decorator) + after_decorator.popleft() + while num_paren: + current = after_decorator.popleft() + if current == "(": + num_paren = num_paren + 1 + elif current == ")": + num_paren = num_paren - 1 + return ''.join(after_decorator) diff --git a/airflow/utils/python_virtualenv.py b/airflow/utils/python_virtualenv.py index 05c88e813e498..c7dd367ddaedb 100644 --- a/airflow/utils/python_virtualenv.py +++ b/airflow/utils/python_virtualenv.py @@ -19,11 +19,12 @@ """Utilities for creating a virtual environment""" import os import sys -from collections import deque +import warnings from typing import List, Optional import jinja2 +from airflow.utils.decorators import remove_task_decorator as _remove_task_decorator from airflow.utils.process_utils import execute_in_subprocess @@ -50,34 +51,13 @@ def _generate_pip_install_cmd_from_list( return cmd + requirements -def _balance_parens(after_decorator): - num_paren = 1 - after_decorator = deque(after_decorator) - after_decorator.popleft() - while num_paren: - current = after_decorator.popleft() - if current == "(": - num_paren = num_paren + 1 - elif current == ")": - num_paren = num_paren - 1 - return ''.join(after_decorator) - - def remove_task_decorator(python_source: str, task_decorator_name: str) -> str: - """ - Removed @task.virtualenv - - :param python_source: - """ - if task_decorator_name not in python_source: - return python_source - split = python_source.split(task_decorator_name) - before_decorator, after_decorator = split[0], split[1] - if after_decorator[0] == "(": - after_decorator = _balance_parens(after_decorator) - if after_decorator[0] == "\n": - after_decorator = after_decorator[1:] - return before_decorator + after_decorator + warnings.warn( + "Import remove_task_decorator from airflow.utils.decorators instead", + DeprecationWarning, + stacklevel=2, + ) + return _remove_task_decorator(python_source, task_decorator_name) def prepare_virtualenv( diff --git a/airflow/utils/python_virtualenv_script.jinja2 b/airflow/utils/python_virtualenv_script.jinja2 index 94a425b4976d5..60c96d34819df 100644 --- a/airflow/utils/python_virtualenv_script.jinja2 +++ b/airflow/utils/python_virtualenv_script.jinja2 @@ -20,7 +20,8 @@ import {{ pickling_library }} import sys -{# Check whether Airflow is available in the environment. +{% if expect_airflow %} + {# Check whether Airflow is available in the environment. # If it is, we'll want to ensure that we integrate any macros that are being provided # by plugins prior to unpickling the task context. #} if sys.version_info >= (3,6): @@ -31,6 +32,7 @@ if sys.version_info >= (3,6): {# Airflow is not available in this environment, therefore we won't # be able to integrate any plugin macros. #} pass +{% endif %} {% if op_args or op_kwargs %} with open(sys.argv[1], "rb") as file: diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst index 4b1bcb232dec4..33f31654cc900 100644 --- a/docs/apache-airflow/best-practices.rst +++ b/docs/apache-airflow/best-practices.rst @@ -20,12 +20,13 @@ Best Practices ============== -Creating a new DAG is a two-step process: +Creating a new DAG is a three-step process: - writing Python code to create a DAG object, -- testing if the code meets our expectations +- testing if the code meets our expectations, +- configuring environment dependencies to run your DAG -This tutorial will introduce you to the best practices for these two steps. +This tutorial will introduce you to the best practices for these three steps. .. _best_practice:writing_a_dag: @@ -350,6 +351,7 @@ want to optimize your DAGs there are the following actions you can take: consider splitting them if you observe it takes a long time to reflect changes in your DAG files in the UI of Airflow. + Testing a DAG ^^^^^^^^^^^^^ @@ -619,3 +621,247 @@ Prune data before upgrading --------------------------- Some database migrations can be time-consuming. If your metadata database is very large, consider pruning some of the old data with the :ref:`db clean` command prior to performing the upgrade. *Use with caution.* + +.. _best_practices/handling_conflicting_complex_python_dependencies: + +Handling conflicting/complex Python dependencies +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Airflow has many Python dependencies and sometimes the Airflow dependencies are conflicting with dependencies that your +task code expects. Since - by default - Airflow environment is just a single set of Python dependencies and single +Python environment, often there might also be cases that some of your tasks require different dependencies than other tasks +and the dependencies basically conflict between those tasks. + +If you are using pre-defined Airflow Operators to talk to external services, there is not much choice, but usually those +operators will have dependencies that are not conflicting with basic Airflow dependencies. Airflow uses constraints mechanism +which means that you have a "fixed" set of dependencies that the community guarantees that Airflow can be installed with +(including all community providers) without triggering conflicts. However you can upgrade the providers +independently and their constraints do not limit you so the chance of a conflicting dependency is lower (you still have +to test those dependencies). Therefore when you are using pre-defined operators, chance is that you will have +little, to no problems with conflicting dependencies. + +However, when you are approaching Airflow in a more "modern way", where you use TaskFlow Api and most of +your operators are written using custom python code, or when you want to write your own Custom Operator, +you might get to the point where the dependencies required by the custom code of yours are conflicting with those +of Airflow, or even that dependencies of several of your Custom Operators introduce conflicts between themselves. + +There are a number of strategies that can be employed to mitigate the problem. And while dealing with +dependency conflict in custom operators is difficult, it's actually quite a bit easier when it comes to +using :class:`airflow.operators.python.PythonVirtualenvOperator` or :class:`airflow.operators.python.ExternalPythonOperator` +- either directly using classic "operator" approach or by using tasks decorated with +``@task.virtualenv`` or ``@task.external_python`` decorators if you use TaskFlow. + +Let's start from the strategies that are easiest to implement (having some limits and overhead), and +we will gradually go through those strategies that requires some changes in your Airflow deployment. + +Using PythonVirtualenvOperator +------------------------------ + +This is simplest to use and most limited strategy. The PythonVirtualenvOperator allows you to dynamically +create a virtualenv that your Python callable function will execute in. In the modern +TaskFlow approach described in :doc:`/tutorial/taskflow`. this also can be done with decorating +your callable with ``@task.virtualenv`` decorator (recommended way of using the operator). +Each :class:`airflow.operators.python.PythonVirtualenvOperator` task can +have its own independent Python virtualenv (dynamically created every time the task is run) and can +specify fine-grained set of requirements that need to be installed for that task to execute. + +The operator takes care of: + +* creating the virtualenv based on your environment +* serializing your Python callable and passing it to execution by the virtualenv Python interpreter +* executing it and retrieving the result of the callable and pushing it via xcom if specified + +The benefits of the operator are: + +* There is no need to prepare the venv upfront. It will be dynamically created before task is run, and + removed after it is finished, so there is nothing special (except having virtualenv package in your + airflow dependencies) to make use of multiple virtual environments +* You can run tasks with different sets of dependencies on the same workers - thus Memory resources are + reused (though see below about the CPU overhead involved in creating the venvs). +* In bigger installations, DAG Authors do not need to ask anyone to create the venvs for you. + As a DAG Author, you only have to have virtualenv dependency installed and you can specify and modify the + environments as you see fit. +* No changes in deployment requirements - whether you use Local virtualenv, or Docker, or Kubernetes, + the tasks will work without adding anything to your deployment. +* No need to learn more about containers, Kubernetes as a DAG Author. Only knowledge of Python requirements + is required to author DAGs this way. + +There are certain limitations and overhead introduced by this operator: + +* Your python callable has to be serializable. There are a number of python objects that are not serializable + using standard ``pickle`` library. You can mitigate some of those limitations by using ``dill`` library + but even that library does not solve all the serialization limitations. +* All dependencies that are not available in the Airflow environment must be locally imported in the callable you + use and the top-level Python code of your DAG should not import/use those libraries. +* The virtual environments are run in the same operating system, so they cannot have conflicting system-level + dependencies (``apt`` or ``yum`` installable packages). Only Python dependencies can be independently + installed in those environments. +* The operator adds a CPU, networking and elapsed time overhead for running each task - Airflow has + to re-create the virtualenv from scratch for each task +* The workers need to have access to PyPI or private repositories to install dependencies +* The dynamic creation of virtualenv is prone to transient failures (for example when your repo is not available + or when there is a networking issue with reaching the repository) +* It's easy to fall into a "too" dynamic environment - since the dependencies you install might get upgraded + and their transitive dependencies might get independent upgrades you might end up with the situation where + your task will stop working because someone released a new version of a dependency or you might fall + a victim of "supply chain" attack where new version of a dependency might become malicious +* The tasks are only isolated from each other via running in different environments. This makes it possible + that running tasks will still interfere with each other - for example subsequent tasks executed on the + same worker might be affected by previous tasks creating/modifying files et.c + +You can see detailed examples of using :class:`airflow.operators.python.PythonVirtualenvOperator` in +:ref:`Taskflow Virtualenv example ` + + +Using ExternalPythonOperator +---------------------------- + +.. versionadded:: 2.4 + +A bit more involved but with significantly less overhead, security, stability problems is to use the +:class:`airflow.operators.python.ExternalPythonOperator``. In the modern +TaskFlow approach described in :doc:`/tutorial/taskflow`. this also can be done with decorating +your callable with ``@task.external_python`` decorator (recommended way of using the operator). +It requires however that you have a pre-existing, immutable Python environment, that is prepared upfront. +Unlike in :class:`airflow.operators.python.PythonVirtualenvOperator` you cannot add new dependencies +to such pre-existing environment. All dependencies you need should be added upfront in your environment +and available in all the workers in case your Airflow runs in a distributed environment. + +This way you avoid the overhead and problems of re-creating the virtual environment but they have to be +prepared and deployed together with Airflow installation. Usually people who manage Airflow installation +need to be involved, and in bigger installations those are usually different people than DAG Authors +(DevOps/System Admins). + +Those virtual environments can be prepared in various ways - if you use LocalExecutor they just need to be installed +at the machine where scheduler is run, if you are using distributed Celery virtualenv installations, there +should be a pipeline that installs those virtual environments across multiple machines, finally if you are using +Docker Image (for example via Kubernetes), the virtualenv creation should be added to the pipeline of +your custom image building. + +The benefits of the operator are: + +* No setup overhead when running the task. The virtualenv is ready when you start running a task. +* You can run tasks with different sets of dependencies on the same workers - thus all resources are reused. +* There is no need to have access by workers to PyPI or private repositories. Less chance for transient + errors resulting from networking. +* The dependencies can be pre-vetted by the admins and your security team, no unexpected, new code will + be added dynamically. This is good for both, security and stability. +* Limited impact on your deployment - you do not need to switch to Docker containers or Kubernetes to + make a good use of the operator. +* No need to learn more about containers, Kubernetes as a DAG Author. Only knowledge of Python, requirements + is required to author DAGs this way. + +The drawbacks: + +* Your environment needs to have the virtual environments prepared upfront. This usually means that you + cannot change it on the fly, adding new or changing requirements require at least an Airflow re-deployment + and iteration time when you work on new versions might be longer. +* Your python callable has to be serializable. There are a number of python objects that are not serializable + using standard ``pickle`` library. You can mitigate some of those limitations by using ``dill`` library + but even that library does not solve all the serialization limitations. +* All dependencies that are not available in Airflow environment must be locally imported in the callable you + use and the top-level Python code of your DAG should not import/use those libraries. +* The virtual environments are run in the same operating system, so they cannot have conflicting system-level + dependencies (``apt`` or ``yum`` installable packages). Only Python dependencies can be independently + installed in those environments +* The tasks are only isolated from each other via running in different environments. This makes it possible + that running tasks will still interfere with each other - for example subsequent tasks executed on the + same worker might be affected by previous tasks creating/modifying files et.c + +You can think about the ``PythonVirtualenvOperator`` and ``ExternalPythonOperator`` as counterparts - +that make it smother to move from development phase to production phase. As a DAG author you'd normally +iterate with dependencies and develop your DAG using ``PythonVirtualenvOperator`` (thus decorating +your tasks with ``@task.virtualenv`` decorators) while after the iteration and changes you would likely +want to change it for production to switch to the ``ExternalPythonOperator`` (and ``@task.external_python``) +after your DevOps/System Admin teams deploy your new dependencies in pre-existing virtualenv in production. +The nice thing about this is that you can switch the decorator back at any time and continue +developing it "dynamically" with ``PythonVirtualenvOperator``. + +You can see detailed examples of using :class:`airflow.operators.python.ExternalPythonOperator` in +:ref:`Taskflow External Python example ` + +Using DockerOperator or Kubernetes Pod Operator +----------------------------------------------- + +Another strategy is to use the :class:`airflow.providers.docker.operators.docker.DockerOperator` +:class:`airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` +Those require that Airflow has access to a Docker engine or Kubernetes cluster. + +Similarly as in case of Python operators, the taskflow decorators are handy for you if you would like to +use those operators to execute your callable Python code. + +However, it is far more involved - you need to understand how Docker/Kubernetes Pods work if you want to use +this approach, but the tasks are fully isolated from each other and you are not even limited to running +Python code. You can write your tasks in any Programming language you want. Also your dependencies are +fully independent from Airflow ones (including the system level dependencies) so if your task require +a very different environment, this is the way to go. + +.. versionadded:: 2.2 + +As of version 2.2 of Airflow you can use ``@task.docker`` decorator to run your functions with ``DockerOperator``. + +.. versionadded:: 2.4 + +As of version 2.2 of Airflow you can use ``@task.kubernetes`` decorator to run your functions with ``KubernetesPodOperator``. + + +The benefits of using those operators are: + +* You can run tasks with different sets of both Python and system level dependencies, or even tasks + written in completely different language or even different processor architecture (x86 vs. arm). +* The environment used to run the tasks enjoys the optimizations and immutability of containers, where a + similar set of dependencies can effectively reuse a number of cached layers of the image, so the + environment is optimized for the case where you have multiple similar, but different environments. +* The dependencies can be pre-vetted by the admins and your security team, no unexpected, new code will + be added dynamically. This is good for both, security and stability. +* Complete isolation between tasks. They cannot influence one another in other ways than using standard + Airflow XCom mechanisms. + +The drawbacks: + +* There is an overhead to start the tasks. Usually not as big as when creating virtual environments dynamically, + but still significant (especially for the ``KubernetesPodOperator``). +* In case of TaskFlow decorators, the whole method to call needs to be serialized and sent over to the + Docker Container or Kubernetes Pod, and there are system-level limitations on how big the method can be. + Serializing, sending, and finally deserializing the method on remote end also adds an overhead. +* There is a resources overhead coming from multiple processes needed. Running tasks in case of those + two operators requires at least two processes - one process (running in Docker Container or Kubernetes Pod) + executing the task, and a supervising process in the Airflow worker that submits the job to + Docker/Kubernetes and monitors the execution. +* Your environment needs to have the container images ready upfront. This usually means that you + cannot change them on the fly. Adding system dependencies, modifying or changing Python requirements + requires an image rebuilding and publishing (usually in your private registry). + Iteration time when you work on new dependencies are usually longer and require the developer who is + iterating to build and use their own images during iterations if they change dependencies. + An appropriate deployment pipeline here is essential to be able to reliably maintain your deployment. +* Your python callable has to be serializable if you want to run it via decorators, also in this case + all dependencies that are not available in Airflow environment must be locally imported in the callable you + use and the top-level Python code of your DAG should not import/use those libraries. +* You need to understand more details about how Docker Containers or Kubernetes work. The abstraction + provided by those two are "leaky", so you need to understand a bit more about resources, networking, + containers etc. in order to author a DAG that uses those operators. + +You can see detailed examples of using :class:`airflow.operators.providers.Docker` in +:ref:`Taskflow Docker example ` +and :class:`airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` +:ref:`Taskflow Kubernetes example ` + +Using multiple Docker Images and Celery Queues +---------------------------------------------- + +There is a possibility (though it requires a deep knowledge of Airflow deployment) to run Airflow tasks +using multiple, independent Docker images. This can be achieved via allocating different tasks to different +Queues and configuring your Celery workers to use different images for different Queues. This however +(at least currently) requires a lot of manual deployment configuration and intrinsic knowledge of how +Airflow, Celery and Kubernetes works. Also it introduces quite some overhead for running the tasks - there +are less chances for resource reuse and it's much more difficult to fine-tune such a deployment for +cost of resources without impacting the performance and stability. + +One of the possible ways to make it more useful is +`AIP-46 Runtime isolation for Airflow tasks and DAG parsing `_. +and completion of `AIP-43 DAG Processor Separation `_ +Until those are implemented, there are very few benefits of using this approach and it is not recommended. + +When those AIPs are implemented, however, this will open up the possibility of a more multi-tenant approach, +where multiple teams will be able to have completely isolated sets of dependencies that will be used across +the full lifecycle of a DAG - from parsing to execution. diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst index 7e225a9f0462a..b61ea77df192b 100644 --- a/docs/apache-airflow/howto/operator/python.rst +++ b/docs/apache-airflow/howto/operator/python.rst @@ -37,8 +37,7 @@ Use the ``@task`` decorator to execute Python callables. Passing in arguments ^^^^^^^^^^^^^^^^^^^^ -Pass extra arguments to the ``@task`` decorated function as you would with -a normal Python function. +Pass extra arguments to the ``@task`` decorated function as you would with a normal Python function. .. exampleinclude:: /../../airflow/example_dags/example_python_operator.py :language: python @@ -97,6 +96,36 @@ If additional parameters for package installation are needed pass them in ``requ All supported options are listed in the `requirements file format `_. +.. _howto/operator:ExternalPythonOperator: + +ExternalPythonOperator +====================== + +The ``ExternalPythonOperator`` can help you to run some of your tasks with a different set of Python +libraries than other tasks (and than the main Airflow environment). + +Use the :class:`~airflow.operators.python.ExternalPythonOperator` to execute Python callables inside a +pre-defined virtual environment. The virtualenv should be preinstalled in the environment where +Python is run and in case ``dill`` is used, it has to be preinstalled in the virtualenv (the same +version that is installed in main Airflow environment). + +.. exampleinclude:: /../../airflow/example_dags/example_python_operator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_external_python] + :end-before: [END howto_operator_external_python] + +Passing in arguments +^^^^^^^^^^^^^^^^^^^^ + +Pass extra arguments to the ``@task.external_python`` decorated function as you would with a normal Python function. +Unfortunately Airflow does not support serializing ``var`` and ``ti`` / ``task_instance`` due to incompatibilities +with the underlying library. For Airflow context variables make sure that Airflow is also installed as part +of the virtualenv environment in the same version as the Airflow version the task is run on. +Otherwise you won't have access to the most context variables of Airflow in ``op_kwargs``. +If you want the context related to datetime objects like ``data_interval_start`` you can add ``pendulum`` and +``lazy_object_proxy`` to your virtualenv. + .. _howto/operator:ShortCircuitOperator: ShortCircuitOperator diff --git a/docs/apache-airflow/tutorial/taskflow.rst b/docs/apache-airflow/tutorial/taskflow.rst index 35609325e1c20..e3da69e893ba7 100644 --- a/docs/apache-airflow/tutorial/taskflow.rst +++ b/docs/apache-airflow/tutorial/taskflow.rst @@ -218,28 +218,92 @@ Suppose the ``add_task`` code lives in a file called ``common.py``. You can do t created_dag = use_add_task() -Using the TaskFlow API with Docker or Virtual Environments ----------------------------------------------------------- +Using the TaskFlow API with complex/conflicting Python dependencies +------------------------------------------------------------------- If you have tasks that require complex or conflicting requirements then you will have the ability to use the -TaskFlow API with either a Docker container (since version 2.2.0) or Python virtual environment (since 2.0.2). -This added functionality will allow a much more -comprehensive range of use-cases for the TaskFlow API, as you will not be limited to the -packages and system libraries of the Airflow worker. +TaskFlow API with either Python virtual environment (since 2.0.2), Docker container (since version 2.2.0) or +or ExternalPythonOperator or KubernetesPodOperator (since 2.4.0). -To use a docker image with the TaskFlow API, change the decorator to ``@task.docker`` +This functionality allows a much more comprehensive range of use-cases for the TaskFlow API, +as you are not limited to the packages and system libraries of the Airflow worker. For all cases of +the decorated functions described below, you have to make sure the functions are serializable and that +they only use local imports for additional dependencies you use. Those imported additional libraries must +be available in the target environment - they do not need to be available in the main Airflow environment. + +Which of the operators you should use, depend on several factors: + +* whether you are running Airflow with access to Docker engine or Kubernetes +* whether you can afford an overhead to dynamically create a virtual environment with the new dependencies +* whether you can deploy a pre-existing, immutable Python environment for all Airflow components. + +These options should allow for far greater flexibility for users who wish to keep their workflows simpler +and more Pythonic - and allow you to keep complete logic of your DAG in the DAG itself. + +You can also get more context about the approach of managing conflicting dependencies, including more detailed +explanation on boundaries and consequences of each of the options in +:ref:`Best practices for handling conflicting/complex Python dependencies ` + + +Virtualenv created dynamically for each task +............................................ + +The simplest approach is to create dynamically (every time a task is run) a separate virtual environment on the +same machine, you can use the ``@task.virtualenv`` decorator. The decorator allows +you to create dynamically a new virtualenv with custom libraries and even a different Python version to +run your function. + +.. _taskflow/virtualenv_example: + +Example (dynamically created virtualenv): + +.. exampleinclude:: /../../airflow/example_dags/example_python_operator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_python_venv] + :end-before: [END howto_operator_python_venv] + +Using Python environment with pre-installed dependencies +........................................................ + +A bit more involved ``@task.external_python`` decorator allows you to run an Airflow task in pre-defined, +immutable virtualenv (or Python binary installed at system level without virtualenv). +This virtualenv or system python can also have different set of custom libraries installed and must be +made available in all workers that can execute the tasks in the same location. + +.. _taskflow/external_python_example: + +Example with ``@task.external_python`` (using immutable, pre-existing virtualenv): + +.. exampleinclude:: /../../airflow/example_dags/example_python_operator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_external_python] + :end-before: [END howto_operator_external_python] + +Dependency separation using Docker Operator +........................................... + +If your Airflow workers have access to a docker engine, you can instead use a ``DockerOperator`` and add any needed arguments to correctly run the task. Please note that the docker image must have a working Python installed and take in a bash command as the ``command`` argument. +It is worth noting that the Python source code (extracted from the decorated function) and any +callable args are sent to the container via (encoded and pickled) environment variables so the +length of these is not boundless (the exact limit depends on system settings). + Below is an example of using the ``@task.docker`` decorator to run a Python task. +.. _taskflow/docker_example: + .. exampleinclude:: /../../tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py :language: python :dedent: 4 :start-after: [START transform_docker] :end-before: [END transform_docker] -It is worth noting that the Python source code (extracted from the decorated function) and any callable args are sent to the container via (encoded and pickled) environment variables so the length of these is not boundless (the exact limit depends on system settings). + +Notes on using the operator: .. note:: Using ``@task.docker`` decorator in one of the earlier Airflow versions @@ -252,19 +316,36 @@ It is worth noting that the Python source code (extracted from the decorated fun You should upgrade to Airflow 2.2 or above in order to use it. -If you don't want to run your image on a Docker environment, and instead want to create a separate virtual -environment on the same machine, you can use the ``@task.virtualenv`` decorator instead. The ``@task.virtualenv`` -decorator will allow you to create a new virtualenv with custom libraries and even a different -Python version to run your function. +Dependency separation using Kubernetes Pod Operator +................................................... -.. exampleinclude:: /../../tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py + +If your Airflow workers have access to Kubernetes, you can instead use a ``KubernetesPodOperator`` +and add any needed arguments to correctly run the task. + +Below is an example of using the ``@task.kubernetes`` decorator to run a Python task. + +.. _taskflow/kubernetes_example: + +.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes_decorator.py :language: python :dedent: 4 - :start-after: [START extract_virtualenv] - :end-before: [END extract_virtualenv] + :start-after: [START howto_operator_kubernetes] + :end-before: [END howto_operator_kubernetes] + +Notes on using the operator: + +.. note:: Using ``@task.kubernetes`` decorator in one of the earlier Airflow versions + + Since ``@task.kubernetes`` decorator is available in the docker provider, you might be tempted to use it in + Airflow version before 2.4, but this is not going to work. You will get this error if you try: + + .. code-block:: text + + AttributeError: '_TaskDecorator' object has no attribute 'kubernetes' + + You should upgrade to Airflow 2.4 or above in order to use it. -These two options should allow for far greater flexibility for users who wish to keep their workflows more simple -and Pythonic. Multiple outputs inference -------------------------- diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 401ea93db6fa4..d68041feac76f 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -194,7 +194,7 @@ def test_should_respond_200(self, username, session): "pid": 100, "pool": "default_pool", "pool_slots": 1, - "priority_weight": 6, + "priority_weight": 8, "queue": "default_queue", "queued_when": None, "sla_miss": None, @@ -227,7 +227,7 @@ def test_should_respond_200_with_task_state_in_removed(self, session): "pid": 100, "pool": "default_pool", "pool_slots": 1, - "priority_weight": 6, + "priority_weight": 8, "queue": "default_queue", "queued_when": None, "sla_miss": None, @@ -272,7 +272,7 @@ def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): "pid": 100, "pool": "default_pool", "pool_slots": 1, - "priority_weight": 6, + "priority_weight": 8, "queue": "default_queue", "queued_when": None, "sla_miss": { @@ -332,7 +332,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session): "pid": 100, "pool": "default_pool", "pool_slots": 1, - "priority_weight": 6, + "priority_weight": 8, "queue": "default_queue", "queued_when": None, 'sla_miss': None, @@ -751,8 +751,8 @@ def test_should_respond_200_when_task_instance_properties_are_none( ( "with dag filter", {"dag_ids": ["example_python_operator", "example_skip_dag"]}, - 15, - 15, + 16, + 16, ), ], ) diff --git a/tests/decorators/test_external_python.py b/tests/decorators/test_external_python.py new file mode 100644 index 0000000000000..6c2bae249c163 --- /dev/null +++ b/tests/decorators/test_external_python.py @@ -0,0 +1,125 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import datetime +import subprocess +import venv +from datetime import timedelta +from pathlib import Path +from subprocess import CalledProcessError +from tempfile import TemporaryDirectory + +import pytest + +from airflow.decorators import task +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2016, 1, 1) +END_DATE = timezone.datetime(2016, 1, 2) +INTERVAL = timedelta(hours=12) +FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1) + +TI_CONTEXT_ENV_VARS = [ + 'AIRFLOW_CTX_DAG_ID', + 'AIRFLOW_CTX_TASK_ID', + 'AIRFLOW_CTX_EXECUTION_DATE', + 'AIRFLOW_CTX_DAG_RUN_ID', +] + + +@pytest.fixture() +def venv_python(): + with TemporaryDirectory() as d: + venv.create(d, with_pip=False) + yield Path(d) / "bin" / "python" + + +@pytest.fixture() +def venv_python_with_dill(): + with TemporaryDirectory() as d: + venv.create(d, with_pip=True) + python_path = Path(d) / "bin" / "python" + subprocess.call([python_path, "-m", "pip", "install", "dill"]) + yield python_path + + +class TestExternalPythonDecorator: + def test_with_dill_works(self, dag_maker, venv_python_with_dill): + @task.external_python(python=venv_python_with_dill, use_dill=True) + def f(): + """Import dill to double-check it is installed .""" + import dill # noqa: F401 + + with dag_maker(): + ret = f() + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_no_dill_installed_raises_exception_when_use_dill(self, dag_maker, venv_python): + @task.external_python(python=venv_python, use_dill=True) + def f(): + pass + + with dag_maker(): + ret = f() + + with pytest.raises(CalledProcessError): + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_exception_raises_error(self, dag_maker, venv_python): + @task.external_python(python=venv_python) + def f(): + raise Exception + + with dag_maker(): + ret = f() + + with pytest.raises(CalledProcessError): + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_with_args(self, dag_maker, venv_python): + @task.external_python(python=venv_python) + def f(a, b, c=False, d=False): + if a == 0 and b == 1 and c and not d: + return True + else: + raise Exception + + with dag_maker(): + ret = f(0, 1, c=True) + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_return_none(self, dag_maker, venv_python): + @task.external_python(python=venv_python) + def f(): + return None + + with dag_maker(): + ret = f() + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_nonimported_as_arg(self, dag_maker, venv_python): + @task.external_python(python=venv_python) + def f(_): + return None + + with dag_maker(): + ret = f(datetime.datetime.utcnow()) + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) diff --git a/tests/system/providers/cncf/kubernetes/example_kubernetes_decorator.py b/tests/system/providers/cncf/kubernetes/example_kubernetes_decorator.py index a2488a493fa73..62fdf4461f93e 100644 --- a/tests/system/providers/cncf/kubernetes/example_kubernetes_decorator.py +++ b/tests/system/providers/cncf/kubernetes/example_kubernetes_decorator.py @@ -29,6 +29,7 @@ catchup=False, ) as dag: + # [START howto_operator_kubernetes] @task.kubernetes( image="python:3.8-slim-buster", name="k8s_test", @@ -59,6 +60,7 @@ def print_pattern(): print_pattern_instance = print_pattern() execute_in_k8s_pod_instance >> print_pattern_instance + # [END howto_operator_kubernetes] from tests.system.utils import get_test_run diff --git a/tests/utils/test_preexisting_python_virtualenv_decorator.py b/tests/utils/test_preexisting_python_virtualenv_decorator.py new file mode 100644 index 0000000000000..97cb396b5c22b --- /dev/null +++ b/tests/utils/test_preexisting_python_virtualenv_decorator.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest + +from airflow.utils.decorators import remove_task_decorator + + +class TestExternalPythonDecorator(unittest.TestCase): + def test_remove_task_decorator(self): + py_source = "@task.external_python(use_dill=True)\ndef f():\nimport funcsigs" + res = remove_task_decorator(python_source=py_source, task_decorator_name="@task.external_python") + assert res == "def f():\nimport funcsigs" + + def test_remove_decorator_no_parens(self): + + py_source = "@task.external_python\ndef f():\nimport funcsigs" + res = remove_task_decorator(python_source=py_source, task_decorator_name="@task.external_python") + assert res == "def f():\nimport funcsigs" + + def test_remove_decorator_nested(self): + + py_source = "@foo\n@task.external_python\n@bar\ndef f():\nimport funcsigs" + res = remove_task_decorator(python_source=py_source, task_decorator_name="@task.external_python") + assert res == "@foo\n@bar\ndef f():\nimport funcsigs" + + py_source = "@foo\n@task.external_python()\n@bar\ndef f():\nimport funcsigs" + res = remove_task_decorator(python_source=py_source, task_decorator_name="@task.external_python") + assert res == "@foo\n@bar\ndef f():\nimport funcsigs" diff --git a/tests/utils/test_python_virtualenv.py b/tests/utils/test_python_virtualenv.py index ed332e1dd7e36..199fd7929281d 100644 --- a/tests/utils/test_python_virtualenv.py +++ b/tests/utils/test_python_virtualenv.py @@ -20,7 +20,8 @@ import unittest from unittest import mock -from airflow.utils.python_virtualenv import prepare_virtualenv, remove_task_decorator +from airflow.utils.decorators import remove_task_decorator +from airflow.utils.python_virtualenv import prepare_virtualenv class TestPrepareVirtualenv(unittest.TestCase):