Skip to content

Latest commit

 

History

History
92 lines (63 loc) · 4.37 KB

cluster-policies.rst

File metadata and controls

92 lines (63 loc) · 4.37 KB

Cluster Policies

If you want to check or mutate DAGs or Tasks on a cluster-wide level, then a Cluster Policy will let you do that. They have three main purposes:

  • Checking that DAGs/Tasks meet a certain standard
  • Setting default arguments on DAGs/Tasks
  • Performing custom routing logic

There are three types of cluster policy:

The DAG and Task cluster policies can raise the :class:`~airflow.exceptions.AirflowClusterPolicyViolation` exception to indicate that the dag/task they were passed is not compliant and should not be loaded.

Any extra attributes set by a cluster policy take priority over those defined in your DAG file; for example, if you set an sla on your Task in the DAG file, and then your cluster policy also sets an sla, the cluster policy's value will take precedence.

To configure cluster policies, you should create an airflow_local_settings.py file in either the config folder under your $AIRFLOW_HOME, or place it on the $PYTHONPATH, and then add callables to the file matching one or more of the cluster policy names above (e.g. dag_policy)

Examples

DAG policies

This policy checks if each DAG has at least one tag defined:

.. literalinclude:: /../../tests/cluster_policies/__init__.py
      :language: python
      :start-after: [START example_dag_cluster_policy]
      :end-before: [END example_dag_cluster_policy]

Note

To avoid import cycles, if you use DAG in type annotations in your cluster policy, be sure to import from airflow.models and not from airflow.

Task policies

Here's an example of enforcing a maximum timeout policy on every task:

.. literalinclude:: /../../tests/cluster_policies/__init__.py
        :language: python
        :start-after: [START example_task_cluster_policy]
        :end-before: [END example_task_cluster_policy]

You could also implement to protect against common errors, rather than as technical security controls. For example, don't run tasks without airflow owners:

.. literalinclude:: /../../tests/cluster_policies/__init__.py
        :language: python
        :start-after: [START example_cluster_policy_rule]
        :end-before: [END example_cluster_policy_rule]

If you have multiple checks to apply, it is best practice to curate these rules in a separate python module and have a single policy / task mutation hook that performs multiple of these custom checks and aggregates the various error messages so that a single AirflowClusterPolicyViolation can be reported in the UI (and import errors table in the database).

For example, your airflow_local_settings.py might follow this pattern:

.. literalinclude:: /../../tests/cluster_policies/__init__.py
        :language: python
        :start-after: [START example_list_of_cluster_policy_rules]
        :end-before: [END example_list_of_cluster_policy_rules]


Task instance mutation

Here's an example of re-routing tasks that are on their second (or greater) retry to a different queue:

.. literalinclude:: /../../tests/cluster_policies/__init__.py
        :language: python
        :start-after: [START example_task_mutation_hook]
        :end-before: [END example_task_mutation_hook]