Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reference improvements #1063

Merged
merged 5 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 11 additions & 2 deletions fsspec/asyn.py
Expand Up @@ -438,7 +438,14 @@ async def _cat(
return out[0]

async def _cat_ranges(
self, paths, starts, ends, max_gap=None, batch_size=None, **kwargs
self,
paths,
starts,
ends,
max_gap=None,
batch_size=None,
on_error="return",
**kwargs,
):
# TODO: on_error
if max_gap is not None:
Expand All @@ -457,7 +464,9 @@ async def _cat_ranges(
for p, s, e in zip(paths, starts, ends)
]
batch_size = batch_size or self.batch_size
return await _run_coros_in_chunks(coros, batch_size=batch_size, nofiles=True)
return await _run_coros_in_chunks(
coros, batch_size=batch_size, nofiles=True, return_exceptions=True
)

async def _put_file(self, lpath, rpath, **kwargs):
raise NotImplementedError
Expand Down
178 changes: 111 additions & 67 deletions fsspec/implementations/reference.py
Expand Up @@ -16,7 +16,7 @@
from ..callbacks import _DEFAULT_CALLBACK
from ..core import filesystem, open, split_protocol
from ..spec import AbstractFileSystem
from ..utils import isfilelike
from ..utils import isfilelike, merge_offset_ranges

logger = logging.getLogger("fsspec.reference")

Expand Down Expand Up @@ -76,6 +76,8 @@ def __init__(
fs=None,
template_overrides=None,
simple_templates=True,
max_gap=64_000,
max_block=256_000_000,
loop=None,
**kwargs,
):
Expand All @@ -85,18 +87,19 @@ def __init__(
----------
fo : dict or str
The set of references to use for this instance, with a structure as above.
If str, will use fsspec.open, in conjunction with ref_storage_args to
open and parse JSON at this location.
If str, will use fsspec.open, in conjunction with target_options
and target_protocol to open and parse JSON at this location.
target : str
For any references having target_url as None, this is the default file
target to use
ref_storage_args : dict
If references is a str, use these kwargs for loading the JSON file
If references is a str, use these kwargs for loading the JSON file.
Deprecated: use target_options instead.
target_protocol : str
Used for loading the reference file, if it is a path. If None, protocol
will be derived from the given path
target_options : dict
Extra FS options for loading the reference file, if given as a path
Extra FS options for loading the reference file ``fo``, if given as a path
remote_protocol : str
The protocol of the filesystem on which the references will be evaluated
(unless fs is provided). If not given, will be derived from the first
Expand All @@ -119,6 +122,14 @@ def __init__(
Whether templates can be processed with simple replace (True) or if
jinja is needed (False, much slower). All reference sets produced by
``kerchunk`` are simple in this sense, but the spec allows for complex.
max_gap, max_block: int
For merging multiple concurrent requests to the same remote file.
Neighboring byte ranges will only be merged when their
inter-range gap is <= `max_gap`. Default is 64KB. Set to 0
to only merge when it requires no extra bytes. Pass a negative
number to disable merging, appropriate for local target files.
Neighboring byte ranges will only be merged when the size of
the aggregated range is <= `max_block`. Default is 256MB.
kwargs : passed to parent class
"""
super().__init__(loop=loop, **kwargs)
Expand All @@ -128,6 +139,8 @@ def __init__(
self.simple_templates = simple_templates
self.templates = {}
self.fss = {}
self.max_gap = max_gap
self.max_block = max_block
if hasattr(fo, "read"):
text = fo.read()
elif isinstance(fo, str):
Expand Down Expand Up @@ -156,45 +169,48 @@ def __init__(
)
for k, opts in fs.items()
}
if None not in self.fss:
self.fss[None] = filesystem("file")
return
if fs is not None:
# single remote FS
remote_protocol = (
fs.protocol[0] if isinstance(fs.protocol, tuple) else fs.protocol
)
self.fss[remote_protocol] = fs

if remote_protocol is None:
# get single protocol from any templates
for ref in self.templates.values():
if callable(ref):
ref = ref()
protocol, _ = fsspec.core.split_protocol(ref)
if protocol:
remote_protocol = protocol
break
if protocol and protocol not in self.fss:
fs = filesystem(protocol, loop=loop, **(remote_options or {}))
self.fss[protocol] = fs
if remote_protocol is None:
# get single protocol from references
for ref in self.references.values():
if callable(ref):
ref = ref()
if isinstance(ref, list) and ref[0]:
protocol, _ = fsspec.core.split_protocol(ref[0])
if protocol:
remote_protocol = protocol
break
if remote_protocol is None:
remote_protocol = target_protocol
if protocol and protocol not in self.fss:
fs = filesystem(protocol, loop=loop, **(remote_options or {}))
self.fss[protocol] = fs

if remote_protocol and remote_protocol not in self.fss:
fs = filesystem(remote_protocol, loop=loop, **(remote_options or {}))
self.fss[remote_protocol] = fs

fs = fs or filesystem(remote_protocol, loop=loop, **(remote_options or {}))
self.fss[remote_protocol] = fs
self.fss[None] = fs # default one
self.fss[None] = fs or filesystem("file") # default one

@property
def loop(self):
inloop = [fs.loop for fs in self.fss.values() if fs.async_impl]
return inloop[0] if inloop else self._loop

def _cat_common(self, path):
def _cat_common(self, path, start=None, end=None):
path = self._strip_protocol(path)
logger.debug(f"cat: {path}")
part = self.references[path]
Expand All @@ -209,35 +225,44 @@ def _cat_common(self, path):
if len(part) == 1:
logger.debug(f"Reference: {path}, whole file")
url = part[0]
start = None
end = None
start1, end1 = start, end
else:
url, start, size = part
logger.debug(f"Reference: {path}, offset {start}, size {size}")
end = start + size
url, start0, size = part
logger.debug(f"Reference: {path} => {url}, offset {start0}, size {size}")
end0 = start0 + size

if start is not None:
if start >= 0:
start1 = start0 + start
else:
start1 = end0 + start
else:
start1 = start0
if end is not None:
if end >= 0:
end1 = start0 + end
else:
end1 = end0 + end
else:
end1 = end0
if url is None:
url = self.target
return url, start, end
return url, start1, end1

async def _cat_file(self, path, start=None, end=None, **kwargs):
part_or_url, start0, end0 = self._cat_common(path)
part_or_url, start0, end0 = self._cat_common(path, start=start, end=end)
if isinstance(part_or_url, bytes):
return part_or_url[start:end]
protocol, _ = split_protocol(part_or_url)
# TODO: start and end should be passed to cat_file, not sliced
return (
await self.fss[protocol]._cat_file(part_or_url, start=start0, end=end0)
)[start:end]
return await self.fss[protocol]._cat_file(part_or_url, start=start, end=end)

def cat_file(self, path, start=None, end=None, **kwargs):
part_or_url, start0, end0 = self._cat_common(path)
part_or_url, start0, end0 = self._cat_common(path, start=start, end=end)
if isinstance(part_or_url, bytes):
return part_or_url[start:end]
protocol, _ = split_protocol(part_or_url)
# TODO: start and end should be passed to cat_file, not sliced
return self.fss[protocol].cat_file(part_or_url, start=start0, end=end0)[
start:end
]
return self.fss[protocol].cat_file(part_or_url, start=start0, end=end0)

def pipe_file(self, path, value, **_):
"""Temporarily add binary data or reference as a file"""
Expand Down Expand Up @@ -277,45 +302,64 @@ def get(self, rpath, lpath, recursive=False, **kwargs):
)

def cat(self, path, recursive=False, on_error="raise", **kwargs):
if isinstance(path, str) and recursive:
raise NotImplementedError
if isinstance(path, list) and (recursive or any("*" in p for p in path)):
raise NotImplementedError
proto_dict = _protocol_groups(path, self.references)
out = {}
for proto, paths in proto_dict.items():
if proto is None:
# binary/string
for p in paths:
try:
out[p] = AbstractFileSystem.cat_file(self, p, **kwargs)
except Exception as e:
if on_error == "raise":
raise
if on_error == "return":
out[p] = e

elif self.fss[proto].async_impl:
# TODO: asyncio.gather on multiple async FSs
out.update(
sync(
self.loop,
self._cat,
paths,
recursive,
on_error=on_error,
**kwargs,
)
)
elif isinstance(paths, list):
if recursive or any("*" in p for p in paths):
raise NotImplementedError
for p in paths:
try:
out[p] = AbstractFileSystem.cat_file(self, p, **kwargs)
except Exception as e:
if on_error == "raise":
raise
if on_error == "return":
out[p] = e
fs = self.fss[proto]
urls, starts, ends = zip(*[self._cat_common(p) for p in paths])
urls2 = []
starts2 = []
ends2 = []
paths2 = []
whole_files = set()
for u, s, e, p in zip(urls, starts, ends, paths):
if isinstance(u, bytes):
# data
out[p] = u
elif s is None:
# whole file - limits are None, None, but no further
# entries take for this file
whole_files.add(u)
urls2.append(u)
starts2.append(s)
ends2.append(e)
paths2.append(p)
for u, s, e, p in zip(urls, starts, ends, paths):
if s is not None and u not in whole_files:
urls2.append(u)
starts2.append(s)
ends2.append(e)
paths2.append(p)
new_paths, new_starts, new_ends = merge_offset_ranges(
list(urls2),
list(starts2),
list(ends2),
sort=False,
max_gap=self.max_gap,
max_block=self.max_block,
)
bytes_out = fs.cat_ranges(new_paths, new_starts, new_ends)
if len(urls2) == len(bytes_out):
# we didn't do any merging
for p, b in zip(paths2, bytes_out):
out[p] = b
else:
out.update(AbstractFileSystem.cat_file(self, paths))
# unbundle from merged bytes - simple approach
for u, s, e, p in zip(urls, starts, ends, paths):
if p in out:
continue # was bytes, already handled
for np, ns, ne, b in zip(
new_paths, new_starts, new_ends, bytes_out
):
if np == u and (ns is None or ne is None):
out[p] = b[s:e]
elif np == u and s >= ns and e <= ne:
out[p] = b[s - ns : (e - ne) or None]

if len(out) == 1 and isinstance(path, str) and "*" not in path:
return _first(out)
return out
Expand Down
69 changes: 68 additions & 1 deletion fsspec/implementations/tests/test_reference.py
Expand Up @@ -78,7 +78,11 @@ def test_mutable(server, m):
def test_defaults(server): # noqa: F811
refs = {"a": b"data", "b": (None, 0, 5)}
fs = fsspec.filesystem(
"reference", fo=refs, target_protocol="http", target=realfile
"reference",
fo=refs,
target_protocol="http",
target=realfile,
remote_protocol="http",
)

assert fs.cat("a") == b"data"
Expand Down Expand Up @@ -337,3 +341,66 @@ def test_missing_nonasync(m):

a = zarr.open_array(m)
assert str(a[0]) == "nan"


def test_fss_has_defaults(m):
fs = fsspec.filesystem("reference", fo={})
assert None in fs.fss

fs = fsspec.filesystem("reference", fo={}, remote_protocol="memory")
assert fs.fss[None].protocol == "memory"
assert fs.fss["memory"].protocol == "memory"

fs = fsspec.filesystem("reference", fs=m, fo={})
assert fs.fss[None] is m

fs = fsspec.filesystem("reference", fs={"memory": m}, fo={})
assert fs.fss["memory"] is m
assert fs.fss[None].protocol == "file"

fs = fsspec.filesystem("reference", fs={None: m}, fo={})
assert fs.fss[None] is m

fs = fsspec.filesystem("reference", fo={"key": ["memory://a"]})
assert fs.fss[None] is fs.fss["memory"]

fs = fsspec.filesystem("reference", fo={"key": ["memory://a"], "blah": ["path"]})
assert fs.fss[None] is fs.fss["memory"]


def test_merging(m):
m.pipe("/a", b"test data")
other = b"other test data"
m.pipe("/b", other)
fs = fsspec.filesystem(
"reference",
fo={
"a": ["memory://a", 1, 1],
"b": ["memory://a", 2, 1],
"c": ["memory://b"],
"d": ["memory://b", 4, 6],
},
)
out = fs.cat(["a", "b", "c", "d"])
assert out == {"a": b"e", "b": b"s", "c": other, "d": other[4:10]}


def test_cat_file_ranges(m):
other = b"other test data"
m.pipe("/b", other)
fs = fsspec.filesystem(
"reference",
fo={
"c": ["memory://b"],
"d": ["memory://b", 4, 6],
},
)
assert fs.cat_file("c") == other
assert fs.cat_file("c", start=1) == other[1:]
assert fs.cat_file("c", start=-5) == other[-5:]
assert fs.cat_file("c", 1, -5) == other[1:-5]

assert fs.cat_file("d") == other[4:10]
assert fs.cat_file("d", start=1) == other[4:10][1:]
assert fs.cat_file("d", start=-5) == other[4:10][-5:]
assert fs.cat_file("d", 1, -3) == other[4:10][1:-3]