Skip to content

Commit

Permalink
Merge branch 'main' into stage/options
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgeorpinel committed Apr 8, 2022
2 parents 57e8963 + 7804e65 commit 32c8e24
Show file tree
Hide file tree
Showing 24 changed files with 232 additions and 190 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/tests.yaml
Expand Up @@ -30,6 +30,7 @@ jobs:
- name: Install requirements
run: |
pip install --upgrade pip setuptools wheel
pip install collective.checkdocs==0.2
pip install ".[dev]" pre-commit
- name: Check README
run: python setup.py checkdocs
Expand Down Expand Up @@ -74,7 +75,7 @@ jobs:
--cov-report=xml --cov-report=term
${{ env.extra_test_args }}
- name: upload coverage report
uses: codecov/codecov-action@v2.1.0
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
fail_ci_if_error: false
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Expand Up @@ -6,7 +6,7 @@ repos:
- id: black
language_version: python3
repo: https://github.com/ambv/black
rev: 22.1.0
rev: 22.3.0
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.1.0
hooks:
Expand Down
4 changes: 4 additions & 0 deletions dvc/api.py
Expand Up @@ -28,6 +28,10 @@ def get_url(path, repo=None, rev=None, remote=None):

cloud = info["repo"].cloud
dvc_path = _repo.fs.path.relpath(fs_path, info["repo"].root_dir)

if not os.path.isabs(path):
dvc_path = dvc_path.replace("\\", "/")

md5 = info["repo"].dvcfs.info(dvc_path)["md5"]
return cloud.get_url_for(remote, checksum=md5)

Expand Down
13 changes: 12 additions & 1 deletion dvc/cli/__init__.py
@@ -1,6 +1,7 @@
"""This module provides an entrypoint to the dvc cli and parsing utils."""

import logging
import sys

# Workaround for CPython bug. See [1] and [2] for more info.
# [1] https://github.com/aws/aws-cli/blob/1.16.277/awscli/clidriver.py#L55
Expand Down Expand Up @@ -48,6 +49,16 @@ def main(argv=None): # noqa: C901
from dvc.exceptions import DvcException, NotDvcRepoError
from dvc.logger import FOOTER, disable_other_loggers

# NOTE: stderr/stdout may be closed if we are running from dvc.daemon.
# On Linux we directly call cli.main after double forking and closing
# the copied parent's standard file descriptors. If we make any logging
# calls in this state it will cause an exception due to writing to a closed
# file descriptor.
if sys.stderr.closed: # pylint: disable=using-constant-test
logging.disable()
elif sys.stdout.closed: # pylint: disable=using-constant-test
logging.disable(logging.INFO)

args = None
disable_other_loggers()

Expand All @@ -68,7 +79,7 @@ def main(argv=None): # noqa: C901

logger.trace(args)

if not args.quiet:
if not sys.stdout.closed and not args.quiet:
from dvc.ui import ui

ui.enable()
Expand Down
11 changes: 6 additions & 5 deletions dvc/commands/experiments/show.py
Expand Up @@ -426,11 +426,12 @@ def show_experiments(
subset=subset,
)
td.drop_duplicates("rows", subset=subset)
td.column("Experiment")[:] = [
# remove tree characters
str(x).encode("ascii", "ignore").strip().decode()
for x in td.column("Experiment")
]
if "Experiment" in td:
td.column("Experiment")[:] = [
# remove tree characters
str(x).encode("ascii", "ignore").strip().decode()
for x in td.column("Experiment")
]
out = kwargs.get("out") or "dvc_plots"
output_file = os.path.join(out, "index.html")
ui.write(
Expand Down
3 changes: 1 addition & 2 deletions dvc/commands/plots.py
Expand Up @@ -21,8 +21,7 @@ def _show_json(renderers, split=False):
result = {
renderer.name: to_json(renderer, split) for renderer in renderers
}
if result:
ui.write_json(result)
ui.write_json(result)


class CmdPlots(CmdBase):
Expand Down
4 changes: 2 additions & 2 deletions dvc/commands/remote.py
Expand Up @@ -113,8 +113,8 @@ def run(self):
class CmdRemoteList(CmdRemote):
def run(self):
conf = self.config.read(self.args.level)
for name, conf_val in conf["remote"].items():
ui.write(name, conf_val["url"], sep="\t")
for name, remote_conf in conf["remote"].items():
ui.write(name, remote_conf["url"], sep="\t")
return 0


Expand Down
5 changes: 3 additions & 2 deletions dvc/daemon.py
Expand Up @@ -106,8 +106,9 @@ def daemon(args):
cmd = ["daemon", "-q"] + args

env = fix_env()
file_path = os.path.abspath(inspect.stack()[0][1])
env["PYTHONPATH"] = os.path.dirname(os.path.dirname(file_path))
if not is_binary():
file_path = os.path.abspath(inspect.stack()[0][1])
env["PYTHONPATH"] = os.path.dirname(os.path.dirname(file_path))
env[DVC_DAEMON] = "1"

_spawn(cmd, env)
25 changes: 25 additions & 0 deletions dvc/data/tree.py
Expand Up @@ -19,6 +19,10 @@
logger = logging.getLogger(__name__)


class TreeError(Exception):
pass


def _try_load(
odbs: Iterable["ObjectDB"],
hash_info: "HashInfo",
Expand Down Expand Up @@ -197,6 +201,27 @@ def get(self, odb, prefix: Tuple[str]) -> Optional[HashFile]:
tree.digest()
return tree

def ls(self, prefix=None):
kwargs = {}
if prefix:
kwargs["prefix"] = prefix

meta, hash_info = self._trie.get(prefix, (None, None))
if hash_info and hash_info.isdir and meta and not meta.obj:
raise TreeError

ret = []

def node_factory(_, key, children, *args):
if key == prefix:
list(children)
else:
ret.append(key[-1])

self._trie.traverse(node_factory, **kwargs)

return ret


def du(odb, tree):
try:
Expand Down
4 changes: 2 additions & 2 deletions dvc/dependency/repo.py
Expand Up @@ -99,7 +99,7 @@ def _get_used_and_obj(
) -> Tuple[Dict[Optional["ObjectDB"], Set["HashInfo"]], "HashFile"]:
from dvc.config import NoRemoteError
from dvc.data.stage import stage
from dvc.data.tree import Tree
from dvc.data.tree import Tree, TreeError
from dvc.exceptions import NoOutputOrStageError, PathMissingError

local_odb = self.repo.odb.local
Expand Down Expand Up @@ -136,7 +136,7 @@ def _get_used_and_obj(
repo.repo_fs,
local_odb.fs.PARAM_CHECKSUM,
)
except FileNotFoundError as exc:
except (FileNotFoundError, TreeError) as exc:
raise PathMissingError(
self.def_path, self.def_repo[self.PARAM_URL]
) from exc
Expand Down
7 changes: 6 additions & 1 deletion dvc/fs/__init__.py
Expand Up @@ -114,7 +114,12 @@ def get_cloud_fs(repo, **kwargs):
remote_conf["gdrive_credentials_tmp_dir"] = repo.tmp_dir

url = remote_conf.pop("url")
fs_path = cls._strip_protocol(url) # pylint:disable=protected-access
if issubclass(cls, WebDAVFileSystem):
# For WebDAVFileSystem, provided url is the base path itself, so it
# should be treated as being a root path.
fs_path = cls.root_marker
else:
fs_path = cls._strip_protocol(url) # pylint:disable=protected-access

extras = cls._get_kwargs_from_urls(url) # pylint:disable=protected-access
conf = {**extras, **remote_conf} # remote config takes priority
Expand Down
132 changes: 49 additions & 83 deletions dvc/fs/dvc.py
@@ -1,28 +1,27 @@
import logging
import os
import threading
import typing

from fsspec import AbstractFileSystem
from funcy import cached_property, wrap_prop

from ._callback import DEFAULT_CALLBACK
from .base import FileSystem
from .fsspec_wrapper import FSSpecWrapper

if typing.TYPE_CHECKING:
from dvc.types import AnyPath

logger = logging.getLogger(__name__)


class DvcFileSystem(FileSystem): # pylint:disable=abstract-method
class _DvcFileSystem(AbstractFileSystem): # pylint:disable=abstract-method
"""DVC repo fs.
Args:
repo: DVC repo.
"""

sep = os.sep

scheme = "local"
PARAM_CHECKSUM = "md5"

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.repo = kwargs["repo"]
Expand All @@ -43,8 +42,8 @@ def _get_key(self, path):
return (cls.scheme, *fs.path.parts(fs_path))

fs_key = "repo"
key = self.path.parts(path)
if key == (".",) or key == ("",):
key = path.split(self.sep)
if key == ["."] or key == [""]:
key = ()

return (fs_key, *key)
Expand Down Expand Up @@ -74,81 +73,28 @@ def _get_fs_path(self, path: "AnyPath", remote=None):

def open( # type: ignore
self, path: str, mode="r", encoding=None, **kwargs
): # pylint: disable=arguments-renamed
): # pylint: disable=arguments-renamed, arguments-differ
fs, fspath = self._get_fs_path(path, **kwargs)
return fs.open(fspath, mode=mode, encoding=encoding)

def exists(self, path): # pylint: disable=arguments-renamed
try:
self.info(path)
return True
except FileNotFoundError:
return False

def isdir(self, path): # pylint: disable=arguments-renamed
try:
return self.info(path)["type"] == "directory"
except FileNotFoundError:
return False

def isfile(self, path): # pylint: disable=arguments-renamed
try:
return self.info(path)["type"] == "file"
except FileNotFoundError:
return False

def _walk(self, root, topdown=True, **kwargs):
dirs = set()
files = []

root_parts = self._get_key(root)
root_len = len(root_parts)
try:
for key, (meta, hash_info) in self.repo.index.tree.iteritems(
prefix=root_parts
): # noqa: B301
if hash_info and hash_info.isdir and meta and not meta.obj:
raise FileNotFoundError

if key == root_parts:
continue

if hash_info.isdir:
continue

name = key[root_len]
if len(key) > root_len + 1:
dirs.add(name)
continue

files.append(name)
except KeyError:
pass

assert topdown
dirs = list(dirs)
yield root, dirs, files

for dname in dirs:
yield from self._walk(self.path.join(root, dname))
def ls(self, path, detail=True, **kwargs):
info = self.info(path)
if info["type"] != "directory":
return [info] if detail else [path]

def walk(self, top, topdown=True, **kwargs):
assert topdown
root_key = self._get_key(path)
try:
info = self.info(top)
except FileNotFoundError:
return

if info["type"] != "directory":
return
entries = [
self.sep.join((path, name)) if path else name
for name in self.repo.index.tree.ls(prefix=root_key)
]
except KeyError as exc:
raise FileNotFoundError from exc

yield from self._walk(top, topdown=topdown, **kwargs)
if not detail:
return entries

def find(self, path, prefix=None):
for root, _, files in self.walk(path):
for fname in files:
# NOTE: os.path.join is ~5.5 times slower
yield f"{root}{os.sep}{fname}"
return [self.info(epath) for epath in entries]

def isdvc(self, path, recursive=False, strict=True):
try:
Expand All @@ -159,7 +105,7 @@ def isdvc(self, path, recursive=False, strict=True):
recurse = recursive or not strict
return bool(info.get("outs") if recurse else info.get("isout"))

def info(self, path):
def info(self, path, **kwargs):
from dvc.data.meta import Meta

key = self._get_key(path)
Expand All @@ -175,6 +121,7 @@ def info(self, path):
"isexec": False,
"isdvc": False,
"outs": outs,
"name": path,
}

if len(outs) > 1 and outs[0][0] != key:
Expand Down Expand Up @@ -206,12 +153,10 @@ def info(self, path):
ret["type"] = "directory"
return ret

def get_file(
self, from_info, to_file, callback=DEFAULT_CALLBACK, **kwargs
):
fs, path = self._get_fs_path(from_info)
def get_file(self, rpath, lpath, callback=DEFAULT_CALLBACK, **kwargs):
fs, path = self._get_fs_path(rpath)
fs.get_file( # pylint: disable=protected-access
path, to_file, callback=callback, **kwargs
path, lpath, callback=callback, **kwargs
)

def checksum(self, path):
Expand All @@ -220,3 +165,24 @@ def checksum(self, path):
if md5:
return md5
raise NotImplementedError


class DvcFileSystem(FSSpecWrapper):
scheme = "local"

PARAM_CHECKSUM = "md5"

def _prepare_credentials(self, **config):
return config

@wrap_prop(threading.Lock())
@cached_property
def fs(self):
return _DvcFileSystem(**self.fs_args)

def isdvc(self, path, **kwargs):
return self.fs.isdvc(path, **kwargs)

@property
def repo(self):
return self.fs.repo

0 comments on commit 32c8e24

Please sign in to comment.