Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Augment xcom docs #20755

Merged
merged 15 commits into from Feb 3, 2022
56 changes: 53 additions & 3 deletions docs/apache-airflow/concepts/xcoms.rst
Expand Up @@ -42,13 +42,63 @@ XComs are a relative of :doc:`variables`, with the main difference being that XC

Note: If the first task run is not succeeded then on every retry task XComs will be cleared to make the task run idempotent.

Custom Backends
---------------
Custom XCom Backends
--------------------

The XCom system has interchangeable backends, and you can set which backend is being used via the ``xcom_backend`` configuration option.

If you want to implement your own backend, you should subclass :class:`~airflow.models.xcom.BaseXCom`, and override the ``serialize_value`` and ``deserialize_value`` methods.

There is also an ``orm_deserialize_value`` method that is called whenever the XCom objects are rendered for UI or reporting purposes; if you have large or expensive-to-retrieve values in your XComs, you should override this method to avoid calling that code (and instead return a lighter, incomplete representation) so the UI remains responsive.

You can also override the ``clear`` method and use it when clearing results for given dags and tasks. This allows the custom XCom backend process the data lifecycle easier.
You can also override the ``clear`` method and use it when clearing results for given dags and tasks. This allows the custom XCom backend to process the data lifecycle easier.

Working with Custom XCom Backends in Containers
-----------------------------------------------

Depending on where Airflow is deployed i.e., local, Docker, K8s, etc. it can be useful to be assured that a custom XCom backend is actually being initialized. For example, the complexity of the container environment can make it more difficult to determine if your backend is being loaded correctly during container deployment. Luckily the following guidance can be used to assist you in building confidence in your custom XCom implementation.

Firstly, if you can exec into a terminal in the container then you should be able to do::
lewismc marked this conversation as resolved.
Show resolved Hide resolved

from airflow.models.xcom import XCom
print(XCom.__name__)
lewismc marked this conversation as resolved.
Show resolved Hide resolved

which will print the actual class that is being used.

Depending on how you've configured the backend, you can also examine airflow
configuration::
lewismc marked this conversation as resolved.
Show resolved Hide resolved

from airflow.settings import conf
conf.get("core", "xcom_backend")

If using env vars check with ``env|grep AIRFLOW__CORE__XCOM``.
lewismc marked this conversation as resolved.
Show resolved Hide resolved

Working with Custom Backends in K8s via Helm
--------------------------------------------

Running custom XCom backends in K8s will introduce even more complexity to you Airflow deployment. Put simply, sometimes things go wrong which can be difficult to debug.

For example, if you define a custom XCom backend in the Chart ``values.yaml`` (via the ``xcom_backend`` configuration) and Airflow fails to load the class, the entire Chart deployment will fail with each pod container attempting to restart time and time again.

When deploying in K8s your custom XCom backend needs to be reside in a ``config`` directory otherwise it cannot be located during Chart deployment.

An observed problem is that it is very difficult to acquire logs from the container because there is a very small window of availability where the trace can be obtained. If you are fortunate enough to query the container logs at the right time, assuming that the custom backend value used is ``xcom_custom_backend.S3XComBackend``, you may see something similar to the following::

Traceback (most recent call last):
lewismc marked this conversation as resolved.
Show resolved Hide resolved
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/__main__.py", line 48, in main
args.func(args)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 47, in command
...
from airflow.models.xcom import XCOM_RETURN_KEY, XCom
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom.py", line 379, in <module>
XCom = resolve_xcom_backend()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom.py", line 369, in resolve_xcom_backend
clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/configuration.py", line 485, in getimport
raise AirflowConfigException(
airflow.exceptions.AirflowConfigException: The object could not be loaded. Please check "xcom_backend" key in "core" section. Current value: "xcom_custom_backend.S3XComBackend".
[2022-01-06 00:02:16,880] {settings.py:331} DEBUG - Disposing DB connection pool (PID 214)

As you can see, in this example the path to the custom XCom is incorrect. This in turn prevents the entire Helm chart from deploying successfully.