Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot update non-DAGs packages imported from DAGs using git-sync #39203

Open
1 of 2 tasks
NBardelot opened this issue Apr 23, 2024 · 3 comments
Open
1 of 2 tasks

Cannot update non-DAGs packages imported from DAGs using git-sync #39203

NBardelot opened this issue Apr 23, 2024 · 3 comments
Labels

Comments

@NBardelot
Copy link
Contributor

NBardelot commented Apr 23, 2024

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.3

What happened?

We use a structure of git submodules synchronized by git-sync sidecars as per the documentation of Typical Structure of Packages.

Some git submodules are containing DAGs files. Some other git submodules are containing utility libraries imported from the DAGs. The directory containing the utility libraries is configured in the PYTHONPATH to be available to DAGs. At startup everything goes well.

But we have an issue when a new module is added, and a DAG is modified to import this new module (both modification pushed to git, with submodules updates, and the git-sync sidecar synchronizing the files). Then, the DAG Processor component of Airflow starts to reprocess the DAG Bag but it seems like the cache of importlib is not invalidated, and the new module is not found.

We have such logs in the DAG Processor, in the import errors DB table, and thus in the UI:

ModuleNotFoundError: No module named 'ournewlib`

Note that this is not an issue with the PYTHONPATH, as restarting the dag-processor and worker containers fixes the error.

What you think should happen instead?

The documentation of importlib mentions that invalidate_caches() might be used:

If you are dynamically importing a module that was created since the interpreter began execution (e.g., created a Python source file), you may need to call invalidate_caches() in order for the new module to be noticed by the import system.

It seems like the airflow processes should call invalidate_caches() when the repo linked git ref changes (meaning the content of the code might have changed and should be reprocessed with fresh imports).

How to reproduce

  • With the Airflow Helm chart for exemple, create an Airflow instance.
  • Ensure that the PYTHONPATH contains the DAGs folder.
  • Use git-sync to load DAGs from a git project.
  • Create a simple DAG.
  • See that the DAG is correctly processed.
  • At runtime, push the following:
    • a new non-DAG module in the git project, with a mock class/function
    • and an import in the DAG, using the DAG folder as the location from which to import (so that it is coherent with the PYTHONPATH), in order to use this new module
  • See that the module is not imported, and an ImportError happens during the DAGFileProcessorProcess

Operating System

Kubernetes

Versions of Apache Airflow Providers

apache-airflow[celery,kubernetes,statsd,password,ldap,otel]

apache-airflow-providers-amazon
apache-airflow-providers-common-sql
apache-airflow-providers-elasticsearch
apache-airflow-providers-hashicorp
apache-airflow-providers-http
apache-airflow-providers-microsoft-winrm
apache-airflow-providers-microsoft-azure
apache-airflow-providers-opsgenie
apache-airflow-providers-postgres
apache-airflow-providers-redis
apache-airflow-providers-sftp
apache-airflow-providers-smtp
apache-airflow-providers-ssh

And using the constraints file from Apache Airflow for Python 3.10 and version 2.8.3.

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@NBardelot NBardelot added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Apr 23, 2024
@NBardelot
Copy link
Contributor Author

See also #118 (and especially this reply).

@NBardelot
Copy link
Contributor Author

After further analysis, we think it can be a race condition as per the importlib documentation of FileFinder.

The finder will cache the directory contents as necessary, making stat calls for each module search to verify the cache is not outdated. Because cache staleness relies upon the granularity of the operating system’s state information of the file system, there is a potential race condition of searching for a module, creating a new file, and then searching for the module the new file represents. If the operations happen fast enough to fit within the granularity of stat calls, then the module search will fail. To prevent this from happening, when you create a module dynamically, make sure to call importlib.invalidate_caches().

Here is the status of our cache:

$ python
Python 3.10.13 (main, Mar 12 2024, 12:22:40) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sys
>>> sys.path_importer_cache
{'/opt/airflow/dags/repo/src/libs': FileFinder('/opt/airflow/dags/repo/src/libs'), '/usr/local/lib/python310.zip': None, '/usr/local/lib/python3.10': FileFinder('/usr/local/lib/python3.10'), '/usr/local/lib/python3.10/encodings': FileFinder('/usr/local/lib/python3.10/encodings'), '/usr/local/lib/python3.10/importlib': FileFinder('/usr/local/lib/python3.10/importlib'), '/home/airflow/.local/lib/python3.10/site-packages': FileFinder('/home/airflow/.local/lib/python3.10/site-packages'), '/usr/local/lib/python3.10/lib-dynload': FileFinder('/usr/local/lib/python3.10/lib-dynload'), '/usr/local/lib/python3.10/site-packages': FileFinder('/usr/local/lib/python3.10/site-packages'), '/opt/airflow': FileFinder('/opt/airflow'), '/usr/local/lib/python3.10/collections': FileFinder('/usr/local/lib/python3.10/collections')}
>>>

... where /opt/airflow/dags/repo/src/libs contains our utility modules. As git-sync synchronizes rapidly both the DAG and the imported module, the race condition mentionned in importlib might be the cause of our issue. Here again the documentation mention the use of importlib.invalidate_caches() as the correct way to prevent the issue.

@potiuk
Copy link
Member

potiuk commented Apr 25, 2024

I would be surprised invalidate_caches is a solution. When DAG is parsed in Airflow, it is always parsed in a newly parsed fork of the parent process and any imports done inside the fork are lost together with the fork when parsing completes.

But Maybe that's a side effect of https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#parsing-pre-import-modules. This configuration (True by default starting from 2.6.0) will pre-import "airflow" module in the parent process before fork happens. And it might well be that your local settings imported together will pull other imports when "airflow" module is imported.

An easy way to check it, is to set the flag to false. I'd recommend you to do it. But then if your hypothesis is right, you should also be able to test it by adding importlib.invalidate_caches() as the first thing in your DAG files, right? Can you run such tests?

@RNHTTR RNHTTR added area:dev-tools and removed needs-triage label for new issues that we didn't triage yet labels May 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants