Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow input url passthrough to executor backend #2639

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion snakemake/dag.py
Expand Up @@ -351,7 +351,7 @@ async def retrieve_storage_inputs(self):
f
for job in self.needrun_jobs()
for f in job.input
if f.is_storage and self.is_external_input(f, job)
if f.is_storage and self.is_external_input(f, job) and not is_flagged(f, "passthrough")
}

try:
Expand Down Expand Up @@ -990,6 +990,10 @@ async def update_(
producer = dict()
exceptions = dict()
for res in potential_dependencies:
if is_flagged(res.file, "passthrough"):
# passthrough files are not considered for dependency resolution
continue

if create_inventory:
# If possible, obtain inventory information starting from
# given file and store it in the IOCache.
Expand Down
21 changes: 18 additions & 3 deletions snakemake/io.py
Expand Up @@ -18,7 +18,7 @@
import subprocess as sp
import time
from contextlib import contextmanager
from hashlib import sha256
from hashlib import md5, sha256
from inspect import isfunction, ismethod
from itertools import chain, product
from pathlib import Path
Expand Down Expand Up @@ -357,6 +357,14 @@ def open(self, mode="r", buffering=-1, encoding=None, errors=None, newline=None)
def contains_wildcard(self):
return contains_wildcard(self.file)

@property
def is_passthrough(self):
return is_flagged(self._file, "passthrough")

@property
def passthrough_path(self):
return get_flag_value(self._file, "passthrough")

@property
def is_storage(self):
return is_flagged(self._file, "storage_object")
Expand Down Expand Up @@ -918,8 +926,8 @@ async def get_missing():
)
else os.path.exists(f)
if not (
(is_flagged(f, "pipe") or is_flagged(f, "service"))
and ignore_pipe_or_service
((is_flagged(f, "pipe") or is_flagged(f, "service"))
and ignore_pipe_or_service) or is_flagged(f, "passthrough")
)
else True
)
Expand Down Expand Up @@ -1043,6 +1051,13 @@ def get_flag_value(value, flag_type):
else:
return None

def passthrough(value, path=None):
if hasattr(value, "flags"):
raise SyntaxError("Passthrough flag can't be combined with any other flags.")

if path is None:
path = md5(value.encode()).hexdigest()
return flag(path, "passthrough", value)

def ancient(value):
"""
Expand Down
1 change: 1 addition & 0 deletions snakemake/path_modifier.py
Expand Up @@ -105,6 +105,7 @@ def is_annotated_callable(value):
or is_flagged(path, "local")
or is_flagged(path, "sourcecache_entry")
or is_annotated_callable(path)
or is_flagged(path, "passthrough")
):
# no default remote needed
return path
Expand Down
1 change: 1 addition & 0 deletions snakemake/workflow.py
Expand Up @@ -64,6 +64,7 @@
from snakemake.parser import parse
import snakemake.io
from snakemake.io import (
passthrough,
protected,
temp,
temporary,
Expand Down