Use the @task
decorator to execute Python callables.
Warning
The @task
decorator is recommended over the classic ~airflow.operators.python.PythonOperator
to execute Python callables.
/../../airflow/example_dags/example_python_operator.py
Pass extra arguments to the @task
decorated function as you would with a normal Python function.
/../../airflow/example_dags/example_python_operator.py
Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables <templates:variables>
and a templates_dict
argument.
The templates_dict
argument is templated, so each value in the dictionary is evaluated as a Jinja template <concepts:jinja-templating>
.
/../../airflow/example_dags/example_python_operator.py
Use the @task.virtualenv
decorator to execute Python callables inside a new Python virtual environment. The virtualenv
package needs to be installed in the environment that runs Airflow (as optional dependency pip install airflow[virtualenv] --constraint ...
).
Warning
The @task.virtualenv
decorator is recommended over the classic ~airflow.operators.python.PythonVirtualenvOperator
to execute Python callables inside new Python virtual environments.
/../../airflow/example_dags/example_python_operator.py
Pass extra arguments to the @task.virtualenv
decorated function as you would with a normal Python function. Unfortunately, Airflow does not support serializing var
, ti
and task_instance
due to incompatibilities with the underlying library. For Airflow context variables make sure that you either have access to Airflow through setting system_site_packages
to True
or add apache-airflow
to the requirements
argument. Otherwise you won't have access to the most context variables of Airflow in op_kwargs
. If you want the context related to datetime objects like data_interval_start
you can add pendulum
and lazy_object_proxy
.
If additional parameters for package installation are needed pass them in requirements.txt
as in the example below:
SomePackage==0.2.1 --pre --index-url http://some.archives.com/archives
AnotherPackage==1.4.3 --no-index --find-links /my/local/archives
All supported options are listed in the requirements file format.
The ExternalPythonOperator
can help you to run some of your tasks with a different set of Python libraries than other tasks (and than the main Airflow environment).
Use the ~airflow.operators.python.ExternalPythonOperator
to execute Python callables inside a pre-defined virtual environment. The virtualenv should be preinstalled in the environment where Python is run and in case dill
is used, it has to be preinstalled in the virtualenv (the same version that is installed in main Airflow environment).
/../../airflow/example_dags/example_python_operator.py
Pass extra arguments to the @task.external_python
decorated function as you would with a normal Python function. Unfortunately Airflow does not support serializing var
and ti
/ task_instance
due to incompatibilities with the underlying library. For Airflow context variables make sure that Airflow is also installed as part of the virtualenv environment in the same version as the Airflow version the task is run on. Otherwise you won't have access to the most context variables of Airflow in op_kwargs
. If you want the context related to datetime objects like data_interval_start
you can add pendulum
and lazy_object_proxy
to your virtualenv.
Use the @task.short_circuit
decorator to control whether a pipeline continues if a condition is satisfied or a truthy value is obtained.
Warning
The @task.short_circuit
decorator is recommended over the classic ~airflow.operators.python.ShortCircuitOperator
to short-circuit pipelines via Python callables.
The evaluation of this condition and truthy value is done via the output of the decorated function. If the decorated function returns True or a truthy value, the pipeline is allowed to continue and an XCom <concepts:xcom>
of the output will be pushed. If the output is False or a falsy value, the pipeline will be short-circuited based on the configured short-circuiting (more on this later). In the example below, the tasks that follow the "condition_is_true" task will execute while the tasks downstream of the "condition_is_false" task will be skipped.
/../../airflow/example_dags/example_short_circuit_decorator.py
The "short-circuiting" can be configured to either respect or ignore the trigger rule <concepts:trigger-rules>
defined for downstream tasks. If ignore_downstream_trigger_rules
is set to True, the default configuration, all downstream tasks are skipped without considering the trigger_rule
defined for tasks. If this parameter is set to False, the direct downstream tasks are skipped but the specified trigger_rule
for other subsequent downstream tasks are respected. In this short-circuiting configuration, the operator assumes the direct downstream task(s) were purposely meant to be skipped but perhaps not other subsequent tasks. This configuration is especially useful if only part of a pipeline should be short-circuited rather than all tasks which follow the short-circuiting task.
In the example below, notice that the "short_circuit" task is configured to respect downstream trigger rules. This means while the tasks that follow the "short_circuit" task will be skipped since the decorated function returns False, "task_7" will still execute as its set to execute when upstream tasks have completed running regardless of status (i.e. the TriggerRule.ALL_DONE
trigger rule).
/../../airflow/example_dags/example_short_circuit_decorator.py
Pass extra arguments to the @task.short_circuit
-decorated function as you would with a normal Python function.
Jinja templating can be used in same way as described for the PythonOperator.