Skip to content

Commit

Permalink
repofs: use / as root_marker
Browse files Browse the repository at this point in the history
This makes dvcfs/repofs much easier to work with and allows it to support
isabs/abspath/etc correctly, as previously any full path (with root_marker "")
would be impossible to distinguish from a relative one.

This also detaches external workspaces from the main repo workspace in
DvcFileSystem, which was necessary to be able to distinguish dvcfs abspaths
from local external output paths. Note that currently we still have limited
support for those (e.g. get/api supports them, but import doesn't) and this
change keeps it that way.
  • Loading branch information
efiop committed May 8, 2022
1 parent 562e612 commit b14a972
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 61 deletions.
3 changes: 2 additions & 1 deletion dvc/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ def get_url(path, repo=None, rev=None, remote=None):
directory in the remote storage.
"""
with Repo.open(repo, rev=rev, subrepos=True, uninitialized=True) as _repo:
fs_path = _repo.repo_fs.from_os_path(path)
with reraise(FileNotFoundError, PathMissingError(path, repo)):
info = _repo.repo_fs.info(path)
info = _repo.repo_fs.info(fs_path)

dvc_info = info.get("dvc_info")
if not dvc_info:
Expand Down
5 changes: 4 additions & 1 deletion dvc/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ def config(self) -> Dict[str, Any]:
def path(self):
from .path import Path

return Path(self.sep)
def _getcwd():
return self.fs.root_marker

return Path(self.sep, getcwd=_getcwd)

@classmethod
def _strip_protocol(cls, path: str) -> str:
Expand Down
37 changes: 24 additions & 13 deletions dvc/fs/dvc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import os
import threading
import typing

Expand All @@ -22,31 +21,43 @@ class _DvcFileSystem(AbstractFileSystem): # pylint:disable=abstract-method
repo: DVC repo.
"""

def __init__(self, **kwargs):
root_marker = "/"

def __init__(self, workspace=None, **kwargs):
super().__init__(**kwargs)
self.repo = kwargs["repo"]
self.workspace = workspace or "repo"

@cached_property
def path(self):
from .path import Path

def _getcwd():
return self.root_marker

return Path(self.sep, getcwd=_getcwd)

@property
def config(self):
raise NotImplementedError

def _get_key(self, path):
from dvc.fs.local import LocalFileSystem

from . import get_cloud_fs
if self.workspace != "repo":
from . import get_cloud_fs

cls, kwargs, fs_path = get_cloud_fs(None, url=path)

if cls != LocalFileSystem or os.path.isabs(path):
cls, kwargs, fs_path = get_cloud_fs(None, url=path)
fs = cls(**kwargs)
return (cls.scheme, *fs.path.parts(fs_path))
return (self.workspace, *fs.path.parts(fs_path))

path = self.path.abspath(path)
if path == self.root_marker:
return (self.workspace,)

fs_key = "repo"
key = path.split(self.sep)
if key == ["."] or key == [""]:
key = self.path.relparts(path, self.root_marker)
if key == (".") or key == (""):
key = ()

return (fs_key, *key)
return (self.workspace, *key)

def _get_fs_path(self, path: "AnyPath", remote=None):
from dvc.config import NoRemoteError
Expand Down
6 changes: 6 additions & 0 deletions dvc/fs/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ def __init__(
}
)

@cached_property
def path(self):
from .path import Path

return Path(self.sep, getcwd=os.getcwd)

@wrap_prop(threading.Lock())
@cached_property
def fs(self) -> "FsspecGitFileSystem":
Expand Down
6 changes: 5 additions & 1 deletion dvc/fs/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ def overlaps(self, left, right):
return self.isin_or_eq(left, right) or self.isin(right, left)

def relpath(self, path, start):
return self.flavour.relpath(path, start=start)
if not start:
start = "."
return self.flavour.relpath(
self.abspath(path), start=self.abspath(start)
)

def relparts(self, path, base):
return self.parts(self.relpath(path, base))
Expand Down
27 changes: 15 additions & 12 deletions dvc/fs/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class _RepoFileSystem(AbstractFileSystem): # pylint:disable=abstract-method
kwargs: Additional keyword arguments passed to the `DvcFileSystem()`.
"""

root_marker = "/"

PARAM_REPO_URL = "repo_url"
PARAM_REPO_ROOT = "repo_root"
PARAM_REV = "rev"
Expand Down Expand Up @@ -103,7 +105,10 @@ def __init__(
else:
self.repo_factory = repo_factory

self.path = Path(self.sep)
def _getcwd():
return self.root_marker

self.path = Path(self.sep, getcwd=_getcwd)
self.repo = repo
self.hash_jobs = repo.fs.hash_jobs
self._traverse_subrepos = subrepos
Expand Down Expand Up @@ -258,17 +263,7 @@ def _get_fs_pair(
"""
Returns a pair of fss based on repo the path falls in, using prefix.
"""
from dvc.utils import as_posix

if os.path.isabs(path):
if self.repo.fs.path.isin_or_eq(path, self.repo.root_dir):
path = self.repo.fs.path.relpath(path, self.repo.root_dir)
else:
return None, None, self.repo.dvcfs, path

path = as_posix(path)

parts = self.path.parts(path)
parts = self.path.relparts(path, self.root_marker)
if parts and parts[0] == os.curdir:
parts = parts[1:]

Expand Down Expand Up @@ -461,3 +456,11 @@ def repo_url(self):
@property
def config(self):
return self.fs.config

def from_os_path(self, path):
from dvc.utils import as_posix

if os.path.isabs(path):
path = os.path.relpath(path, self.repo.root_dir)

return as_posix(path)
14 changes: 12 additions & 2 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,12 +491,22 @@ def index_db_dir(self):
@contextmanager
def open_by_relpath(self, path, remote=None, mode="r", encoding=None):
"""Opens a specified resource as a file descriptor"""
from dvc.fs.dvc import DvcFileSystem
from dvc.fs.repo import RepoFileSystem

fs = RepoFileSystem(repo=self, subrepos=True)
if os.path.isabs(path):
fs = DvcFileSystem(repo=self, workspace="local")
fs_path = path
else:
fs = RepoFileSystem(repo=self, subrepos=True)
fs_path = fs.from_os_path(path)

try:
with fs.open(
path, mode=mode, encoding=encoding, remote=remote
fs_path,
mode=mode,
encoding=encoding,
remote=remote,
) as fobj:
yield fobj
except FileNotFoundError as exc:
Expand Down
8 changes: 4 additions & 4 deletions dvc/repo/collect.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import os
from typing import TYPE_CHECKING, Callable, Iterable, List, Tuple

from dvc.types import AnyPath
Expand Down Expand Up @@ -34,8 +33,8 @@ def _collect_paths(
from dvc.fs.repo import RepoFileSystem
from dvc.utils import relpath

fs_paths = [os.path.abspath(target) for target in targets]
fs = RepoFileSystem(repo=repo)
fs_paths = [fs.from_os_path(target) for target in targets]

target_paths: StrPaths = []
for fs_path in fs_paths:
Expand All @@ -60,10 +59,11 @@ def _filter_duplicates(
fs_res_paths = fs_paths

for out in outs:
if out.fs_path in fs_paths:
fs_path = out.repo.repo_fs.from_os_path(out.fs_path)
if fs_path in fs_paths:
res_outs.append(out)
# MUTATING THE SAME LIST!!
fs_res_paths.remove(out.fs_path)
fs_res_paths.remove(fs_path)

return res_outs, fs_res_paths

Expand Down
3 changes: 2 additions & 1 deletion dvc/repo/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ def _dir_output_paths(fs, fs_path, obj, targets=None):

def _filter_missing(repo_fs, paths):
for path in paths:
fs_path = repo_fs.from_os_path(path)
try:
info = repo_fs.info(path)
info = repo_fs.info(fs_path)
dvc_info = info.get("dvc_info")
if (
dvc_info
Expand Down
16 changes: 14 additions & 2 deletions dvc/repo/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,26 @@ def get(url, path, out=None, rev=None, jobs=None):
with external_repo(
url=url, rev=rev, cache_dir=tmp_dir, cache_types=cache_types
) as repo:
fs = repo.repo_fs

if os.path.isabs(path):
from dvc.fs.dvc import DvcFileSystem

fs = DvcFileSystem(repo=repo, workspace="local")
fs_path = path
else:
fs = repo.repo_fs
fs_path = fs.from_os_path(path)

with FsspecCallback.as_tqdm_callback(
total=-1,
desc=f"Downloading {fs.path.name(path)}",
unit="files",
) as cb:
fs.get(
path, os.path.abspath(out), batch_size=jobs, callback=cb
fs_path,
os.path.abspath(out),
batch_size=jobs,
callback=cb,
)
finally:
remove(tmp_dir)
4 changes: 3 additions & 1 deletion dvc/repo/ls.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ def ls(url, path=None, rev=None, recursive=None, dvc_only=False):


def _ls(fs, path, recursive=None, dvc_only=False):
fs_path = fs.from_os_path(path)

try:
fs_path = fs.info(path)["name"]
fs_path = fs.info(fs_path)["name"]
except FileNotFoundError:
return {}

Expand Down
10 changes: 7 additions & 3 deletions dvc/repo/metrics/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ def _to_fs_paths(metrics: List[Output]) -> StrPaths:
result = []
for out in metrics:
if out.metric:
result.append(out.fs_path)
result.append(out.repo.repo_fs.from_os_path(out.fs_path))
elif out.live:
fs_path = summary_fs_path(out)
if fs_path:
result.append(fs_path)
result.append(out.repo.repo_fs.from_os_path(fs_path))
return result


Expand Down Expand Up @@ -78,12 +78,16 @@ def _read_metric(path, fs, rev, **kwargs):
def _read_metrics(repo, metrics, rev, onerror=None):
fs = RepoFileSystem(repo=repo)

relpath = ""
if repo.root_dir != repo.fs.path.getcwd():
relpath = repo.fs.path.relpath(repo.root_dir, repo.fs.path.getcwd())

res = {}
for metric in metrics:
if not fs.isfile(metric):
continue

res[os.path.relpath(metric, os.getcwd())] = _read_metric(
res[os.path.join(relpath, *fs.path.parts(metric))] = _read_metric(
metric, fs, rev, onerror=onerror
)

Expand Down
5 changes: 4 additions & 1 deletion dvc/repo/plots/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,10 @@ def _collect_plots(
recursive=recursive,
)

result = {plot.fs_path: _plot_props(plot) for plot in plots}
result = {
repo.repo_fs.from_os_path(plot.fs_path): _plot_props(plot)
for plot in plots
}
result.update({fs_path: {} for fs_path in fs_paths})
return result

Expand Down
1 change: 1 addition & 0 deletions tests/unit/fs/test_dvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
[
("", ("repo",)),
(".", ("repo",)),
("/", ("repo",)),
("foo", ("repo", "foo")),
("dir/foo", ("repo", "dir", "foo")),
],
Expand Down
24 changes: 12 additions & 12 deletions tests/unit/fs/test_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,16 +474,16 @@ def test_repo_fs_no_subrepos(tmp_dir, dvc, scm):
dvc._reset()
fs = RepoFileSystem(repo=dvc)
expected = [
".dvcignore",
".gitignore",
"lorem",
"lorem.dvc",
"dir",
"dir/repo.txt",
"/.dvcignore",
"/.gitignore",
"/lorem",
"/lorem.dvc",
"/dir",
"/dir/repo.txt",
]

actual = []
for root, dirs, files in fs.walk("", dvcfiles=True):
for root, dirs, files in fs.walk("/", dvcfiles=True):
for entry in dirs + files:
actual.append(posixpath.join(root, entry))

Expand Down Expand Up @@ -657,9 +657,9 @@ def dvc_structure(suffix):

if traverse_subrepos or repo_dir == tmp_dir:
repo_dir_path = (
repo_dir.relative_to(tmp_dir).as_posix()
"/" + repo_dir.relative_to(tmp_dir).as_posix()
if repo_dir != tmp_dir
else ""
else "/"
)
expected[repo_dir_path] = set(
scm_files.keys() | dvc_files.keys() | extras
Expand All @@ -675,13 +675,13 @@ def dvc_structure(suffix):

if traverse_subrepos:
# update subrepos
expected[""].update(["subrepo1", "subrepo2"])
expected["subrepo1"].add("subrepo3")
expected["/"].update(["subrepo1", "subrepo2"])
expected["/subrepo1"].add("subrepo3")

actual = {}
fs = RepoFileSystem(repo=dvc)
for root, dirs, files in fs.walk(
"", ignore_subrepos=not traverse_subrepos
"/", ignore_subrepos=not traverse_subrepos
):
actual[root] = set(dirs + files)
assert expected == actual
7 changes: 0 additions & 7 deletions tests/unit/fs/test_repo_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,3 @@ def test_info_on_subrepos(make_tmp_dir, tmp_dir, dvc, scm, repo_fs):
assert info["repo"].root_dir == str(
subrepo
), f"repo root didn't match for {path}"

# supports external outputs on top-level DVC repo
external_dir = make_tmp_dir("external-output")
external_dir.gen("bar", "bar")
dvc.add(str(external_dir / "bar"), external=True)
info = repo_fs.info((external_dir / "bar").fs_path)
assert info["repo"].root_dir == str(tmp_dir)

0 comments on commit b14a972

Please sign in to comment.