Skip to content

Commit

Permalink
dvc.objects: move hashing functions to dvc.objects
Browse files Browse the repository at this point in the history
This commit moves `HashedStreamReader`/`get_hash_file`/`file_md5`
into `dvc.objects.hash`.

The `get_hash_file` has been renamed to `hash_file`.
This also copies `relpath` and `is_exec` to the `dvc.fs.utils`.
  • Loading branch information
skshetry authored and efiop committed May 12, 2022
1 parent 7a795f7 commit 3f5757a
Show file tree
Hide file tree
Showing 25 changed files with 257 additions and 167 deletions.
59 changes: 5 additions & 54 deletions dvc/data/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
from dvc.exceptions import DvcIgnoreInCollectedDirError
from dvc.ignore import DvcIgnore
from dvc.objects.file import HashFile
from dvc.objects.hash import hash_file
from dvc.objects.hash_info import HashInfo
from dvc.objects.meta import Meta
from dvc.progress import Tqdm
from dvc.utils import file_md5, is_exec

from .db.reference import ReferenceObjectDB

Expand All @@ -32,7 +32,8 @@
def _upload_file(from_fs_path, fs, odb, upload_odb, callback=None):
from dvc.fs._callback import FsspecCallback
from dvc.utils import tmp_fname
from dvc.utils.stream import HashedStreamReader

from .stream import HashedStreamReader

fs_path = upload_odb.fs.path
tmp_info = fs_path.join(upload_odb.fs_path, tmp_fname())
Expand All @@ -52,59 +53,9 @@ def _upload_file(from_fs_path, fs, odb, upload_odb, callback=None):
return from_fs_path, meta, odb.get(stream.hash_info)


def _adapt_info(info, scheme):
if scheme == "s3" and "ETag" in info:
info["etag"] = info["ETag"].strip('"')
elif scheme == "gs" and "etag" in info:
import base64

info["etag"] = base64.b64decode(info["etag"]).hex()
elif scheme.startswith("http") and (
"ETag" in info or "Content-MD5" in info
):
info["checksum"] = info.get("ETag") or info.get("Content-MD5")
return info


def _get_file_hash(fs_path, fs, name):
info = _adapt_info(fs.info(fs_path), fs.scheme)

if name in info:
assert not info[name].endswith(".dir")
hash_value = info[name]
elif hasattr(fs, name):
func = getattr(fs, name)
hash_value = func(fs_path)
elif name == "md5":
hash_value = file_md5(fs_path, fs)
else:
raise NotImplementedError

meta = Meta(size=info["size"], isexec=is_exec(info.get("mode", 0)))
hash_info = HashInfo(name, hash_value)
return meta, hash_info


def get_file_hash(fs_path, fs, name, state=None):
if state:
meta, hash_info = state.get( # pylint: disable=assignment-from-none
fs_path, fs
)
if hash_info:
return meta, hash_info

meta, hash_info = _get_file_hash(fs_path, fs, name)

if state:
assert ".dir" not in hash_info.value
state.save(fs_path, fs, hash_info)

return meta, hash_info


def _stage_file(fs_path, fs, name, odb=None, upload_odb=None, dry_run=False):
state = odb.state if odb else None
meta, hash_info = get_file_hash(fs_path, fs, name, state=state)
meta, hash_info = hash_file(fs_path, fs, name, state=state)
if upload_odb and not dry_run:
assert odb and name == "md5"
return _upload_file(fs_path, fs, odb, upload_odb)
Expand Down Expand Up @@ -346,7 +297,7 @@ def _stage_external_tree_info(odb, tree, name):

odb.add(tree.fs_path, tree.fs, tree.hash_info)
raw = odb.get(tree.hash_info)
_, hash_info = get_file_hash(raw.fs_path, raw.fs, name, state=odb.state)
_, hash_info = hash_file(raw.fs_path, raw.fs, name, state=odb.state)
tree.fs_path = raw.fs_path
tree.fs = raw.fs
tree.hash_info.name = hash_info.name
Expand Down
2 changes: 1 addition & 1 deletion dvc/utils/stream.py → dvc/data/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

from funcy import cached_property

from dvc.objects.hash import dos2unix
from dvc.objects.hash_info import HashInfo
from dvc.objects.istextfile import DEFAULT_CHUNK_SIZE, istextblock
from dvc.utils import dos2unix


class HashedStreamReader(io.IOBase):
Expand Down
5 changes: 2 additions & 3 deletions dvc/data/tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

from dvc.objects.errors import ObjectFormatError
from dvc.objects.file import HashFile

from .stage import get_file_hash
from dvc.objects.hash import hash_file

if TYPE_CHECKING:
from dvc.objects.db import ObjectDB
Expand Down Expand Up @@ -67,7 +66,7 @@ def digest(self, hash_info: Optional["HashInfo"] = None):
if hash_info:
self.hash_info = hash_info
else:
_, self.hash_info = get_file_hash(fs_path, memfs, "md5")
_, self.hash_info = hash_file(fs_path, memfs, "md5")
assert self.hash_info.value
self.hash_info.value += ".dir"

Expand Down
18 changes: 14 additions & 4 deletions dvc/fs/_callback.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from contextlib import ExitStack
from functools import wraps
from typing import IO, TYPE_CHECKING, Any, Dict, Optional, TypeVar, cast
from typing import TYPE_CHECKING, Any, Dict, Optional, TypeVar, overload

import fsspec
from funcy import cached_property

if TYPE_CHECKING:
from typing import Callable
from typing import BinaryIO, Callable, TextIO, Union

from typing_extensions import ParamSpec

Expand All @@ -20,11 +20,21 @@
class FsspecCallback(fsspec.Callback):
"""FsspecCallback usable as a context manager, and a few helper methods."""

def wrap_attr(self, fobj: IO, method: str = "read") -> IO:
@overload
def wrap_attr(self, fobj: "BinaryIO", method: str = "read") -> "BinaryIO":
...

@overload
def wrap_attr(self, fobj: "TextIO", method: str = "read") -> "TextIO":
...

def wrap_attr(
self, fobj: "Union[TextIO, BinaryIO]", method: str = "read"
) -> "Union[TextIO, BinaryIO]":
from tqdm.utils import CallbackIOWrapper

wrapped = CallbackIOWrapper(self.relative_update, fobj, method)
return cast(IO, wrapped)
return wrapped

def wrap_fn(self, fn: "Callable[_P, _R]") -> "Callable[_P, _R]":
@wraps(fn)
Expand Down
28 changes: 24 additions & 4 deletions dvc/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from ._callback import DEFAULT_CALLBACK, FsspecCallback

if TYPE_CHECKING:
from typing import BinaryIO, TextIO

from fsspec.spec import AbstractFileSystem
from typing_extensions import Literal

Expand Down Expand Up @@ -179,12 +181,30 @@ def is_empty(self, path: AnyFSPath) -> bool:
return not self.fs.ls(path)
return entry["size"] == 0

@overload
def open(
self,
path: AnyFSPath,
mode: "Literal['rb', 'br', 'wb']",
**kwargs: Any,
) -> "BinaryIO": # pylint: disable=arguments-differ
return self.open(path, mode, **kwargs)

@overload
def open(
self,
path: AnyFSPath,
mode: "Literal['r', 'rt', 'w']",
**kwargs: Any,
) -> "TextIO": # pylint: disable=arguments-differ
...

def open(
self,
path: AnyFSPath,
mode: str = "r",
**kwargs,
) -> "IO": # pylint: disable=arguments-differ
**kwargs: Any,
) -> "IO[Any]": # pylint: disable=arguments-differ
if "b" in mode:
kwargs.pop("encoding", None)
return self.fs.open(path, mode=mode, **kwargs)
Expand Down Expand Up @@ -354,7 +374,7 @@ def makedirs(self, path: AnyFSPath, **kwargs: Any) -> None:

def put_file(
self,
from_file: Union[AnyFSPath, IO],
from_file: Union[AnyFSPath, "BinaryIO"],
to_info: AnyFSPath,
callback: FsspecCallback = DEFAULT_CALLBACK,
size: int = None,
Expand All @@ -363,7 +383,7 @@ def put_file(
if size:
callback.set_size(size)
if hasattr(from_file, "read"):
stream = callback.wrap_attr(cast("IO", from_file))
stream = callback.wrap_attr(cast("BinaryIO", from_file))
self.upload_fobj(stream, to_info, size=size)
else:
assert isinstance(from_file, str)
Expand Down
2 changes: 1 addition & 1 deletion dvc/fs/dvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def _ls(fs, path):


def _merge_info(repo, fs_info, dvc_info):
from dvc.utils import is_exec
from dvc.fs.utils import is_exec

ret = {"repo": repo}

Expand Down
4 changes: 2 additions & 2 deletions dvc/fs/http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import threading
from typing import IO, Union
from typing import BinaryIO, Union

from funcy import cached_property, memoize, wrap_with

Expand Down Expand Up @@ -141,7 +141,7 @@ def unstrip_protocol(self, path: str) -> str:

def put_file(
self,
from_file: Union[AnyFSPath, IO],
from_file: Union[AnyFSPath, BinaryIO],
to_info: AnyFSPath,
callback: FsspecCallback = DEFAULT_CALLBACK,
size: int = None,
Expand Down
31 changes: 31 additions & 0 deletions dvc/fs/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
import stat
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from .base import AnyFSPath


def is_exec(mode: int) -> bool:
return bool(mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))


def relpath(path: "AnyFSPath", start: "AnyFSPath" = os.curdir) -> "AnyFSPath":
path = os.fspath(path)
start = os.path.abspath(os.fspath(start))

# Windows path on different drive than curdir doesn't have relpath
if os.name == "nt":
# Since python 3.8 os.realpath resolves network shares to their UNC
# path. So, to be certain that relative paths correctly captured,
# we need to resolve to UNC path first. We resolve only the drive
# name so that we don't follow any 'real' symlinks on the path
def resolve_network_drive_windows(path_to_resolve):
drive, tail = os.path.splitdrive(path_to_resolve)
return os.path.join(os.path.realpath(drive), tail)

path = resolve_network_drive_windows(os.path.abspath(path))
start = resolve_network_drive_windows(start)
if not os.path.commonprefix([start, path]):
return path
return os.path.relpath(path, start)
2 changes: 1 addition & 1 deletion dvc/objects/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ def __init__(
) -> None:
settings.setdefault("disk_pickle_protocol", 4)
super().__init__(
directory=directory, timeout=timeout, disk=Disk, **settings
directory=directory, timeout=timeout, disk=disk, **settings
)
6 changes: 3 additions & 3 deletions dvc/objects/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ def check(self, odb: "ObjectDB", check_hash: bool = True):
self._check_hash(odb)

def _check_hash(self, odb):
from dvc.data.stage import get_file_hash
from dvc.objects.errors import ObjectFormatError
from .errors import ObjectFormatError
from .hash import hash_file

_, actual = get_file_hash(
_, actual = hash_file(
self.fs_path, self.fs, self.hash_info.name, odb.state
)

Expand Down

0 comments on commit 3f5757a

Please sign in to comment.