Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically register DAGs that are used in a context manager #23592

Merged
merged 6 commits into from Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial.py
Expand Up @@ -97,7 +97,7 @@
"""
)

dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
uranusjr marked this conversation as resolved.
Show resolved Hide resolved
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial_taskflow_api.py
Expand Up @@ -100,7 +100,7 @@ def load(total_order_value: float):


# [START dag_invocation]
tutorial_dag = tutorial_taskflow_api()
tutorial_taskflow_api()
# [END dag_invocation]

# [END tutorial]
38 changes: 26 additions & 12 deletions airflow/models/dag.py
Expand Up @@ -28,13 +28,15 @@
import traceback
import warnings
import weakref
from collections import deque
from datetime import datetime, timedelta
from inspect import signature
from typing import (
TYPE_CHECKING,
Any,
Callable,
Collection,
Deque,
Dict,
FrozenSet,
Iterable,
Expand Down Expand Up @@ -101,6 +103,8 @@
from airflow.utils.types import NOTSET, ArgNotSet, DagRunType, EdgeInfoType

if TYPE_CHECKING:
from types import ModuleType

from airflow.datasets import Dataset
from airflow.decorators import TaskDecoratorCollection
from airflow.models.slamiss import SlaMiss
Expand Down Expand Up @@ -329,6 +333,7 @@ class DAG(LoggingMixin):
:param owner_links: Dict of owners and their links, that will be clickable on the DAGs view UI.
Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link.
e.g: {"dag_owner": "https://airflow.apache.org/"}
:param auto_register: Automatically register this DAG when it is used in a ``with`` block
jedcunningham marked this conversation as resolved.
Show resolved Hide resolved
"""

_comps = {
Expand Down Expand Up @@ -390,6 +395,7 @@ def __init__(
render_template_as_native_obj: bool = False,
tags: Optional[List[str]] = None,
owner_links: Optional[Dict[str, str]] = None,
auto_register: bool = True,
):
from airflow.utils.task_group import TaskGroup

Expand Down Expand Up @@ -565,6 +571,7 @@ def __init__(

self._access_control = DAG._upgrade_outdated_dag_access_control(access_control)
self.is_paused_upon_creation = is_paused_upon_creation
self.auto_register = auto_register

self.jinja_environment_kwargs = jinja_environment_kwargs
self.render_template_as_native_obj = render_template_as_native_obj
Expand Down Expand Up @@ -2860,6 +2867,7 @@ def get_serialized_fields(cls):
# has_on_*_callback are only stored if the value is True, as the default is False
'has_on_success_callback',
'has_on_failure_callback',
'auto_register',
}
cls.__serialized_fields = frozenset(vars(DAG(dag_id='test')).keys()) - exclusion_list
return cls.__serialized_fields
Expand Down Expand Up @@ -3315,6 +3323,7 @@ def dag(
render_template_as_native_obj: bool = False,
tags: Optional[List[str]] = None,
owner_links: Optional[Dict[str, str]] = None,
auto_register: bool = True,
) -> Callable[[Callable], Callable[..., DAG]]:
"""
Python dag decorator. Wraps a function into an Airflow DAG.
Expand Down Expand Up @@ -3367,6 +3376,7 @@ def factory(*args, **kwargs):
tags=tags,
schedule=schedule,
owner_links=owner_links,
auto_register=auto_register,
) as dag_obj:
# Set DAG documentation from function documentation.
if f.__doc__:
Expand Down Expand Up @@ -3424,24 +3434,28 @@ class DagContext:

"""

_context_managed_dag: Optional[DAG] = None
_previous_context_managed_dags: List[DAG] = []
_context_managed_dags: Deque[DAG] = deque()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason we need a deque here? From what I can tell a plain list is enough (with the ordering reversed, i.e. we append to / pop from the end and use -1 to access).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason no -- mostly thinking that we only ever need to access the head value, or push a new head value, and deque is more optimized for that case.

But it hardly matters as this is not on the critical path for performance.

autoregistered_dags: Set[Tuple[DAG, "ModuleType"]] = set()
current_autoregister_module_name: Optional[str] = None

@classmethod
def push_context_managed_dag(cls, dag: DAG):
if cls._context_managed_dag:
cls._previous_context_managed_dags.append(cls._context_managed_dag)
cls._context_managed_dag = dag
cls._context_managed_dags.appendleft(dag)

@classmethod
def pop_context_managed_dag(cls) -> Optional[DAG]:
old_dag = cls._context_managed_dag
if cls._previous_context_managed_dags:
cls._context_managed_dag = cls._previous_context_managed_dags.pop()
else:
cls._context_managed_dag = None
return old_dag
dag = cls._context_managed_dags.popleft()

# In a few cases around serialization we explicitly push None in to the stack
if cls.current_autoregister_module_name is not None and dag and dag.auto_register:
mod = sys.modules[cls.current_autoregister_module_name]
cls.autoregistered_dags.add((dag, mod))

return dag

@classmethod
def get_current_dag(cls) -> Optional[DAG]:
return cls._context_managed_dag
try:
return cls._context_managed_dags[0]
except IndexError:
return None
22 changes: 20 additions & 2 deletions airflow/models/dagbag.py
Expand Up @@ -260,9 +260,12 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
Given a path to a python module or zip file, this method imports
the module and look for dag objects within it.
"""
from airflow.models.dag import DagContext

# if the source file no longer exists in the DB or in the filesystem,
# return an empty list
# todo: raise exception?

if filepath is None or not os.path.isfile(filepath):
return []

Expand All @@ -280,6 +283,9 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
self.log.exception(e)
return []

# Ensure we don't pick up anything else we didn't mean to
DagContext.autoregistered_dags.clear()

if filepath.endswith(".py") or not zipfile.is_zipfile(filepath):
mods = self._load_modules_from_file(filepath, safe_mode)
else:
Expand All @@ -291,6 +297,8 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
return found_dags

def _load_modules_from_file(self, filepath, safe_mode):
from airflow.models.dag import DagContext

if not might_contain_dag(filepath, safe_mode):
# Don't want to spam user with skip messages
if not self.has_logged:
Expand All @@ -306,6 +314,8 @@ def _load_modules_from_file(self, filepath, safe_mode):
if mod_name in sys.modules:
del sys.modules[mod_name]

DagContext.current_autoregister_module_name = mod_name
Copy link
Member

@uranusjr uranusjr Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This in-place operation feels a bit weird to me… Perhaps a context manager would be easier to understand and maintain? Something like

with DagContext.enable_autoregister(mod_name):
    parse(...)

This probably needs some refactoring to work though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree, This is essentially used as a global variable.

It's not very easy to refactor given it's use across three or so different functions, so a context manager would be tricky. (I.e. I can't see an easy refactor to make it work right now)


def parse(mod_name, filepath):
try:
loader = importlib.machinery.SourceFileLoader(mod_name, filepath)
Expand Down Expand Up @@ -344,6 +354,8 @@ def parse(mod_name, filepath):
return parse(mod_name, filepath)

def _load_modules_from_zip(self, filepath, safe_mode):
from airflow.models.dag import DagContext

mods = []
with zipfile.ZipFile(filepath) as current_zip_file:
for zip_info in current_zip_file.infolist():
Expand Down Expand Up @@ -372,6 +384,7 @@ def _load_modules_from_zip(self, filepath, safe_mode):
if mod_name in sys.modules:
del sys.modules[mod_name]

DagContext.current_autoregister_module_name = mod_name
try:
sys.path.insert(0, filepath)
current_module = importlib.import_module(mod_name)
Expand All @@ -391,9 +404,14 @@ def _load_modules_from_zip(self, filepath, safe_mode):
return mods

def _process_modules(self, filepath, mods, file_last_changed_on_disk):
from airflow.models.dag import DAG # Avoid circular import
from airflow.models.dag import DAG, DagContext # Avoid circular import

top_level_dags = {(o, m) for m in mods for o in m.__dict__.values() if isinstance(o, DAG)}

top_level_dags.update(DagContext.autoregistered_dags)

top_level_dags = ((o, m) for m in mods for o in m.__dict__.values() if isinstance(o, DAG))
DagContext.current_autoregister_module_name = None
DagContext.autoregistered_dags.clear()

found_dags = []

Expand Down
30 changes: 18 additions & 12 deletions docs/apache-airflow/howto/dynamic-dag-generation.rst
Expand Up @@ -74,10 +74,10 @@ Then you can import and use the ``ALL_TASKS`` constant in all your DAGs like tha
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
):
for task in ALL_TASKS:
# create your operators and relations here
pass
...

Don't forget that in this case you need to add empty ``__init__.py`` file in the ``my_company_utils`` folder
and you should add the ``my_company_utils/.*`` line to ``.airflowignore`` file (if using the regexp ignore
Expand Down Expand Up @@ -107,10 +107,11 @@ the meta-data file in your DAG easily. The location of the file to read can be f
# Configuration dict is available here


Dynamic DAGs with ``globals()``
...............................
You can dynamically generate DAGs by working with ``globals()``.
As long as a ``DAG`` object in ``globals()`` is created, Airflow will load it.
Registering dynamic DAGs
........................

You can dynamically generate DAGs when using the ``@dag`` decorator or the ``with DAG(..)`` context manager
and Airflow will automatically register them.

.. code-block:: python

Expand All @@ -133,13 +134,18 @@ As long as a ``DAG`` object in ``globals()`` is created, Airflow will load it.

print_message(config["message"])

globals()[dag_id] = dynamic_generated_dag()
dynamic_generated_dag()

The code below will generate a DAG for each config: ``dynamic_generated_dag_config1`` and ``dynamic_generated_dag_config2``.
ashb marked this conversation as resolved.
Show resolved Hide resolved
Each of them can run separately with related configuration
Each of them can run separately with related configuration.

If you do not wish to have DAGs auto-registered, you can disable the behavior by setting ``auto_register=False`` on your DAG.

.. versionchanged:: 2.4

.. warning::
Using this practice, pay attention to "late binding" behaviour in Python loops. See `that GitHub discussion <https://github.com/apache/airflow/discussions/21278#discussioncomment-2103559>`_ for more details
As of version 2.4 DAGs that are created by calling a ``@dag`` decorated function (or that are used in the
``with DAG(...)`` context manager are automatically registered, and no longer need to be stored in a
global variable.


Optimizing DAG parsing delays during execution
Expand Down Expand Up @@ -199,5 +205,5 @@ of the context are set to ``None``.
if current_dag_id is not None and current_dag_id != dag_id:
continue # skip generation of non-selected DAG

dag = DAG(dag_id=dag_id, ...)
globals()[dag_id] = dag
with DAG(dag_id=dag_id, ...):
...
12 changes: 12 additions & 0 deletions docs/apache-airflow/tutorial/taskflow.rst
Expand Up @@ -62,6 +62,18 @@ as shown below, with the Python function name acting as the DAG identifier.
:start-after: [START instantiate_dag]
:end-before: [END instantiate_dag]

Now to actually enable this to be run as a DAG, we invoke the Python function
``tutorial_taskflow_api`` set up using the ``@dag`` decorator earlier, as shown below.

.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py
:language: python
:start-after: [START dag_invocation]
:end-before: [END dag_invocation]

.. versionchanged:: 2.4

It's no longer required to "register" the DAG into a global variable for Airflow to be able to detect the dag if that DAG is used inside a ``with`` block, or if it is the result of a ``@dag`` decorated function.

Tasks
-----
In this data pipeline, tasks are created based on Python functions using the ``@task`` decorator
Expand Down
40 changes: 40 additions & 0 deletions newsfragments/23592.significant.rst
@@ -0,0 +1,40 @@
DAGS used in a context manager no longer need to be assigned to a module variable
ashb marked this conversation as resolved.
Show resolved Hide resolved

Previously you had do assign a DAG to a module-level variable in order for Airflow to pick it up. For example this
ashb marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: python

with DAG(dag_id="example") as dag:
...


@dag
def dag_maker():
...


dag2 = dag_maker()


can become

.. code-block:: python

with DAG(dag_id="example"):
...


@dag
def dag_maker():
...


dag_maker()

If you want to disable the behaviour for any reason then set ``auto_register=False`` on the dag::
ashb marked this conversation as resolved.
Show resolved Hide resolved

.. code-block::

# This dag will not be picked up by Airflow as it's not assigned to a variable
with DAG(dag_id="example", auto_register=False):
...
2 changes: 1 addition & 1 deletion tests/dags/test_subdag.py
Expand Up @@ -63,7 +63,7 @@ def subdag(parent_dag_name, child_dag_name, args):
max_active_runs=1,
default_args=DEFAULT_TASK_ARGS,
schedule=timedelta(minutes=1),
) as dag:
):

start = EmptyOperator(
task_id='start',
Expand Down
Binary file modified tests/dags/test_zip.zip
Binary file not shown.