forked from iterative/dvc-objects
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
localfs: temporarily patch fsspec's localfs
While waiting for fsspec/filesystem_spec#1053 to be released.
- Loading branch information
Showing
2 changed files
with
374 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,371 @@ | ||
# NOTE: temporary fix while waiting for | ||
# https://github.com/fsspec/filesystem_spec/pull/1053 | ||
# pylint: disable=W | ||
import datetime | ||
import io | ||
import logging | ||
import os | ||
import os.path as osp | ||
import posixpath | ||
import re | ||
import shutil | ||
import stat | ||
import tempfile | ||
|
||
from fsspec import AbstractFileSystem | ||
from fsspec.compression import compr | ||
from fsspec.core import get_compression | ||
from fsspec.utils import isfilelike, stringify_path | ||
|
||
logger = logging.getLogger("fsspec.local") | ||
|
||
|
||
class LocalFileSystem(AbstractFileSystem): | ||
"""Interface to files on local storage | ||
Parameters | ||
---------- | ||
auto_mkdir: bool | ||
Whether, when opening a file, the directory containing it should | ||
be created (if it doesn't already exist). This is assumed by pyarrow | ||
code. | ||
""" | ||
|
||
root_marker = "/" | ||
protocol = "file" | ||
local_file = True | ||
|
||
def __init__(self, auto_mkdir=False, **kwargs): | ||
super().__init__(**kwargs) | ||
self.auto_mkdir = auto_mkdir | ||
|
||
def mkdir(self, path, create_parents=True, **kwargs): | ||
path = self._strip_protocol(path) | ||
if self.exists(path): | ||
raise FileExistsError(path) | ||
if create_parents: | ||
self.makedirs(path, exist_ok=True) | ||
else: | ||
os.mkdir(path, **kwargs) | ||
|
||
def makedirs(self, path, exist_ok=False): | ||
path = self._strip_protocol(path) | ||
os.makedirs(path, exist_ok=exist_ok) | ||
|
||
def rmdir(self, path): | ||
path = self._strip_protocol(path) | ||
os.rmdir(path) | ||
|
||
def ls(self, path, detail=False, **kwargs): | ||
path = self._strip_protocol(path) | ||
if detail: | ||
with os.scandir(path) as it: | ||
return [self.info(f) for f in it] | ||
else: | ||
return [posixpath.join(path, f) for f in os.listdir(path)] | ||
|
||
def glob(self, path, **kwargs): | ||
path = self._strip_protocol(path) | ||
return super().glob(path, **kwargs) | ||
|
||
def info(self, path, **kwargs): | ||
if isinstance(path, os.DirEntry): | ||
# scandir DirEntry | ||
out = path.stat(follow_symlinks=False) | ||
link = path.is_symlink() | ||
if path.is_dir(follow_symlinks=False): | ||
t = "directory" | ||
elif path.is_file(follow_symlinks=False): | ||
t = "file" | ||
else: | ||
t = "other" | ||
path = self._strip_protocol(path.path) | ||
else: | ||
# str or path-like | ||
path = self._strip_protocol(path) | ||
out = os.stat(path, follow_symlinks=False) | ||
link = stat.S_ISLNK(out.st_mode) | ||
if link: | ||
out = os.stat(path, follow_symlinks=True) | ||
if stat.S_ISDIR(out.st_mode): | ||
t = "directory" | ||
elif stat.S_ISREG(out.st_mode): | ||
t = "file" | ||
else: | ||
t = "other" | ||
result = { | ||
"name": path, | ||
"size": out.st_size, | ||
"type": t, | ||
"created": out.st_ctime, | ||
"islink": link, | ||
} | ||
for field in ["mode", "uid", "gid", "mtime", "ino", "nlink"]: | ||
result[field] = getattr(out, "st_" + field) | ||
if result["islink"]: | ||
result["destination"] = os.readlink(path) | ||
try: | ||
out2 = os.stat(path, follow_symlinks=True) | ||
result["size"] = out2.st_size | ||
except IOError: | ||
result["size"] = 0 | ||
return result | ||
|
||
def lexists(self, path, **kwargs): | ||
return osp.lexists(path) | ||
|
||
def cp_file(self, path1, path2, **kwargs): | ||
path1 = self._strip_protocol(path1).rstrip("/") | ||
path2 = self._strip_protocol(path2).rstrip("/") | ||
if self.auto_mkdir: | ||
self.makedirs(self._parent(path2), exist_ok=True) | ||
if self.isfile(path1): | ||
shutil.copyfile(path1, path2) | ||
elif self.isdir(path1): | ||
self.mkdirs(path2, exist_ok=True) | ||
else: | ||
raise FileNotFoundError(path1) | ||
|
||
def get_file(self, path1, path2, callback=None, **kwargs): | ||
if isfilelike(path2): | ||
with open(path1, "rb") as f: | ||
shutil.copyfileobj(f, path2) | ||
else: | ||
return self.cp_file(path1, path2, **kwargs) | ||
|
||
def put_file(self, path1, path2, callback=None, **kwargs): | ||
return self.cp_file(path1, path2, **kwargs) | ||
|
||
def mv_file(self, path1, path2, **kwargs): | ||
path1 = self._strip_protocol(path1).rstrip("/") | ||
path2 = self._strip_protocol(path2).rstrip("/") | ||
shutil.move(path1, path2) | ||
|
||
def rm_file(self, path): | ||
os.remove(path) | ||
|
||
def rm(self, path, recursive=False, maxdepth=None): | ||
if isinstance(path, str): | ||
path = [path] | ||
|
||
for p in path: | ||
p = self._strip_protocol(p).rstrip("/") | ||
if recursive and self.isdir(p): | ||
|
||
if osp.abspath(p) == os.getcwd(): | ||
raise ValueError("Cannot delete current working directory") | ||
shutil.rmtree(p) | ||
else: | ||
os.remove(p) | ||
|
||
def _open(self, path, mode="rb", block_size=None, **kwargs): | ||
path = self._strip_protocol(path) | ||
if self.auto_mkdir and "w" in mode: | ||
self.makedirs(self._parent(path), exist_ok=True) | ||
return LocalFileOpener(path, mode, fs=self, **kwargs) | ||
|
||
def touch(self, path, truncate=True, **kwargs): | ||
path = self._strip_protocol(path) | ||
if self.auto_mkdir: | ||
self.makedirs(self._parent(path), exist_ok=True) | ||
if self.exists(path): | ||
os.utime(path, None) | ||
else: | ||
open(path, "a").close() | ||
if truncate: | ||
os.truncate(path, 0) | ||
|
||
def created(self, path): | ||
info = self.info(path=path) | ||
return datetime.datetime.utcfromtimestamp(info["created"]) | ||
|
||
def modified(self, path): | ||
info = self.info(path=path) | ||
return datetime.datetime.utcfromtimestamp(info["mtime"]) | ||
|
||
@classmethod | ||
def _parent(cls, path): | ||
path = cls._strip_protocol(path).rstrip("/") | ||
if "/" in path: | ||
return path.rsplit("/", 1)[0] | ||
else: | ||
return cls.root_marker | ||
|
||
@classmethod | ||
def _strip_protocol(cls, path): | ||
path = stringify_path(path) | ||
if path.startswith("file://"): | ||
path = path[7:] | ||
return make_path_posix(path).rstrip("/") or cls.root_marker | ||
|
||
def _isfilestore(self): | ||
# Inheriting from DaskFileSystem makes this False (S3, etc. were) | ||
# the original motivation. But we are a posix-like file system. | ||
# See https://github.com/dask/dask/issues/5526 | ||
return True | ||
|
||
def chmod(self, path, mode): | ||
path = stringify_path(path) | ||
return os.chmod(path, mode) | ||
|
||
|
||
def make_path_posix(path, sep=os.sep): | ||
"""Make path generic""" | ||
if isinstance(path, (list, set, tuple)): | ||
return type(path)(make_path_posix(p) for p in path) | ||
if "~" in path: | ||
path = osp.expanduser(path) | ||
if sep == "/": | ||
# most common fast case for posix | ||
if path.startswith("/"): | ||
return path | ||
return os.getcwd() + "/" + path | ||
if ( | ||
(sep not in path and "/" not in path) | ||
or (sep == "/" and not path.startswith("/")) | ||
or (sep == "\\" and ":" not in path and not path.startswith("\\\\")) | ||
): | ||
# relative path like "path" or "rel\\path" (win) or rel/path" | ||
if os.sep == "\\": | ||
# abspath made some more '\\' separators | ||
return make_path_posix(osp.abspath(path)) | ||
else: | ||
return os.getcwd() + "/" + path | ||
if re.match("/[A-Za-z]:", path): | ||
# for windows file URI like "file:///C:/folder/file" | ||
# or "file:///C:\\dir\\file" | ||
path = path[1:] | ||
if path.startswith("\\\\"): | ||
# special case for windows UNC/DFS-style paths, do nothing, | ||
# just flip the slashes around (case below does not work!) | ||
return path.replace("\\", "/") | ||
if re.match("[A-Za-z]:", path): | ||
# windows full path like "C:\\local\\path" | ||
return path.lstrip("\\").replace("\\", "/").replace("//", "/") | ||
if path.startswith("\\"): | ||
# windows network path like "\\server\\path" | ||
return "/" + path.lstrip("\\").replace("\\", "/").replace("//", "/") | ||
return path | ||
|
||
|
||
class LocalFileOpener(io.IOBase): | ||
def __init__( | ||
self, path, mode, autocommit=True, fs=None, compression=None, **kwargs | ||
): | ||
logger.debug("open file: %s", path) | ||
self.path = path | ||
self.mode = mode | ||
self.fs = fs | ||
self.f = None | ||
self.autocommit = autocommit | ||
self.compression = get_compression(path, compression) | ||
self.blocksize = io.DEFAULT_BUFFER_SIZE | ||
self._open() | ||
|
||
def _open(self): | ||
if self.f is None or self.f.closed: | ||
if self.autocommit or "w" not in self.mode: | ||
self.f = open(self.path, mode=self.mode) | ||
if self.compression: | ||
compress = compr[self.compression] | ||
self.f = compress(self.f, mode=self.mode) | ||
else: | ||
# TODO: check if path is writable? | ||
i, name = tempfile.mkstemp() | ||
os.close(i) # we want normal open and normal buffered file | ||
self.temp = name | ||
self.f = open(name, mode=self.mode) | ||
if "w" not in self.mode: | ||
self.size = self.f.seek(0, 2) | ||
self.f.seek(0) | ||
self.f.size = self.size | ||
|
||
def _fetch_range(self, start, end): | ||
# probably only used by cached FS | ||
if "r" not in self.mode: | ||
raise ValueError | ||
self._open() | ||
self.f.seek(start) | ||
return self.f.read(end - start) | ||
|
||
def __setstate__(self, state): | ||
self.f = None | ||
loc = state.pop("loc", None) | ||
self.__dict__.update(state) | ||
if "r" in state["mode"]: | ||
self.f = None | ||
self._open() | ||
self.f.seek(loc) | ||
|
||
def __getstate__(self): | ||
d = self.__dict__.copy() | ||
d.pop("f") | ||
if "r" in self.mode: | ||
d["loc"] = self.f.tell() | ||
else: | ||
if not self.f.closed: | ||
raise ValueError("Cannot serialise open write-mode local file") | ||
return d | ||
|
||
def commit(self): | ||
if self.autocommit: | ||
raise RuntimeError( | ||
"Can only commit if not already set to autocommit" | ||
) | ||
shutil.move(self.temp, self.path) | ||
|
||
def discard(self): | ||
if self.autocommit: | ||
raise RuntimeError("Cannot discard if set to autocommit") | ||
os.remove(self.temp) | ||
|
||
def readable(self) -> bool: | ||
return True | ||
|
||
def writable(self) -> bool: | ||
return "r" not in self.mode | ||
|
||
def read(self, *args, **kwargs): | ||
return self.f.read(*args, **kwargs) | ||
|
||
def write(self, *args, **kwargs): | ||
return self.f.write(*args, **kwargs) | ||
|
||
def tell(self, *args, **kwargs): | ||
return self.f.tell(*args, **kwargs) | ||
|
||
def seek(self, *args, **kwargs): | ||
return self.f.seek(*args, **kwargs) | ||
|
||
def seekable(self, *args, **kwargs): | ||
return self.f.seekable(*args, **kwargs) | ||
|
||
def readline(self, *args, **kwargs): | ||
return self.f.readline(*args, **kwargs) | ||
|
||
def readlines(self, *args, **kwargs): | ||
return self.f.readlines(*args, **kwargs) | ||
|
||
def close(self): | ||
return self.f.close() | ||
|
||
@property | ||
def closed(self): | ||
return self.f.closed | ||
|
||
def fileno(self): | ||
return self.raw.fileno() | ||
|
||
def __iter__(self): | ||
return self.f.__iter__() | ||
|
||
def __getattr__(self, item): | ||
return getattr(self.f, item) | ||
|
||
def __enter__(self): | ||
self._incontext = True | ||
return self | ||
|
||
def __exit__(self, exc_type, exc_value, traceback): | ||
self._incontext = False | ||
self.f.__exit__(exc_type, exc_value, traceback) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters