New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
use PythonVirtualenvOperator with a prebuilt env #15286
Comments
Thanks for opening your first issue here! Be sure to follow the issue template! |
Edit: I misread the issue. Thanks for the added example.
|
hey @uranusjr yeah sorry i hit submit by accident there |
I wonder if this should be made more generic as a new operator that can take any Python installation prefix (e.g. |
i agree
this could work, though in my experience one disadvantage with |
Just one comment - this fine, if you can make sure all your - distributed - venvs are present on all the workers (which might be tricky if you want to update those) - and you have to somehow link the "task" definition (expecting certain venv with certain requirement versions) with the "deployment" (i.e. worker definition). Any kind of "upgrade" to such an env might be tricky. The "local" installation pattern had the advantage, that you always got the requirements in the version you described in the task definition (via requirements specification). I think a better solution would be to use caching mechanism to the task and modify the PythonVirtualenv to use it. However this might be tricky to get right when you have multiple tasks of the same type running in the same runner in Celery deployment. |
I thought about this a bit and feel there are two things here to consider. The first is the overhead for There is another use case surrounding I think two solutions are needed for the two problems. The first is probably more intuitive to design; we can add caching options to Any advices are very welcomed! |
If I may pitch an idea; instead of creating Venvs for each task/operator, what about creating a virtualdag e.g with VenvDAG(
default_args=default_args,
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
tags=['venv'],
requirements = "requirements.txt"
) as dag:
t1 = PythonOperator()
t2 = PythonOperator()
t1 >>t2 such that |
That’s an interesting idea, but would require much more change since we don’t currently have a hook point for DAG to do pre-processing before operators are run. Maybe something like this would be easier: with DAG(...) as dag:
t0 = CreateVirtualEnvironmentOperator(task_id="init_venv", ...)
python_prefix_template = "{{ ti.xcom_pull(task_ids='init_venv')['prefix'] }}"
t1 = ExternalPythonOperator(..., python_prefix=python_prefix_template)
t2 = ExternalPythonOperator(..., python_prefix=python_prefix_template)
t0 >> t1 >> t2 There are probably abstractions available to make this easier, but that’s the basic idea. |
I came back to the discussion after the Summit, and It gave me some idea @uranusjr. The problem with this approach is that you cannot have separate operator to prepare env and another to run the env. The problem is that they might run on different workers. However with the custom XCom backends, I think we are very closet to use those backends as a generic Caching mechanism (That we could also use to store virtualenv caches). I think it is not that far to be able to add similar mechanism as we see in many CI environments, where we could specify ID of the cache (with some variations) and pull it from shared location (if exists) or push it there after task succeeds. Then we could make python virtualenv to build venv if it is missing (using requirments) and push it after complete. We would have to add some basic mechanism of invalidation of the hash (for example when hash of requirements.txt changes). WDYT @uranusjr ? |
Bump. Selecting my own existing venv, or at the very least reuse existing venvs instead of creating one every time, is a great feature to have. |
@ManikandanUV We are doing it the following way for now: env = vars_dict.get("conda_env", None)
path_to_python = f"/home/username/.conda{'/envs/'+env if env is not None else ''}/bin/python"
parse_files = BashOperator(
task_id='parse-files',
bash_command=f"{path_to_python} {abs_path_code}/my_repo/parse.py {files_to_parse}",
env={"PATH": os.environ["PATH"],
"DB_CONN": db_conn}
) We have an environment variable containing the conda-env name which is used to get the full path to the Python executable. Then, using a BashOperator, we can use the same environment again for different Tasks. Additionally, we run an update to the environment if requirements changed (note that we are using poetry as package manager): update_repo = BashOperator(
task_id=f"update-repo-{folder}",
bash_command=f"cd {abs_path_code}/{folder}; "
"git checkout master; git stash; git stash drop; git pull"
)
install_dependencies = BashOperator(
task_id=f"install-dependencies-{folder}",
bash_command=f"cd {abs_path_code}/{folder}; conda activate {env_name}; poetry install "
)
update_repo >> install_dependencies
`
`` |
may use "conda run -n env_name python xxx.py |
one walk around may be use DockerOperator or KubernetesPodOperator to isolate env depends on your deploment. |
This use case is now covered by ExternalPythonOperator, introduced in #25780 and will be a part of 2.4.0. |
Description
Instead of passing in the requirements and relying Airflow to build the env, in some cases it would be more straightforward and desirable to just make Airflow use a prebuilt env.
This could be done with
PythonVirtualenvOperator
with a param likeenv_path
.Use case / motivation
Are you willing to submit a PR?
Perhaps
Related Issues
The text was updated successfully, but these errors were encountered: