Skip to content

Commit

Permalink
fs.get: use async implementation when supported by the filesystem
Browse files Browse the repository at this point in the history
  • Loading branch information
skshetry authored and efiop committed May 5, 2022
1 parent 35d2963 commit 54dd3ae
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 53 deletions.
48 changes: 28 additions & 20 deletions dvc/fs/_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from typing_extensions import ParamSpec

from dvc.fs.base import FileSystem
from dvc.progress import Tqdm
from dvc.ui._rich_progress import RichTransferProgress

Expand All @@ -34,26 +33,18 @@ def wrapped(*args: "_P.args", **kwargs: "_P.kwargs") -> "_R":

return wrapped

def wrap_and_branch(
self, fn: "Callable", fs: "FileSystem" = None
) -> "Callable":
def wrap_and_branch(self, fn: "Callable") -> "Callable":
"""
Wraps a function, and pass a new child callback to it.
When the function completes, we increment the parent callback by 1.
"""
from .local import localfs

fs = fs or localfs
wrapped = self.wrap_fn(fn)

def make_callback(path1, path2):
# pylint: disable=assignment-from-none
return self.branch(fs.path.name(path1), path2, {})

@wraps(fn)
def func(path1, path2, **kwargs):
with make_callback(path1, path2) as callback:
return wrapped(path1, path2, callback=callback, **kwargs)
def func(path1: str, path2: str):
kw: Dict[str, Any] = {}
with self.branch(path1, path2, kw):
return wrapped(path1, path2, **kw)

return func

Expand Down Expand Up @@ -89,9 +80,14 @@ def as_rich_callback(
return callback or RichCallback(**rich_kwargs)

def branch(
self, path_1: str, path_2: str, kwargs: Dict[str, Any]
self,
path_1: str,
path_2: str,
kwargs: Dict[str, Any],
child: "FsspecCallback" = None,
) -> "FsspecCallback":
return DEFAULT_CALLBACK
child = kwargs["callback"] = child or DEFAULT_CALLBACK
return child


class NoOpCallback(FsspecCallback, fsspec.callbacks.NoOpCallback):
Expand Down Expand Up @@ -126,8 +122,15 @@ def absolute_update(self, value):
self.progress_bar.update_to(value)
super().absolute_update(value)

def branch(self, path_1: str, path_2: str, kwargs):
return TqdmCallback(bytes=True, total=-1, desc=path_1, **kwargs)
def branch(
self,
path_1: str,
path_2: str,
kwargs,
child: Optional[FsspecCallback] = None,
):
child = child or TqdmCallback(bytes=True, total=-1, desc=path_1)
return super().branch(path_1, path_2, kwargs, child=child)


class RichCallback(FsspecCallback):
Expand Down Expand Up @@ -187,8 +190,13 @@ def absolute_update(self, value: int) -> None:
self.progress.update(self.task, completed=value)
super().absolute_update(value)

def branch(self, path_1, path_2, kwargs):
return RichCallback(self.progress, desc=path_1, bytes=True, total=-1)
def branch(
self, path_1, path_2, kwargs, child: Optional[FsspecCallback] = None
):
child = child or RichCallback(
self.progress, desc=path_1, bytes=True, total=-1
)
return super().branch(path_1, path_2, kwargs, child=child)


DEFAULT_CALLBACK = NoOpCallback()
75 changes: 42 additions & 33 deletions dvc/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,49 +414,58 @@ def du(

def get(
self,
from_info: AnyFSPath,
to_info: AnyFSPath,
from_info: Union[AnyFSPath, List[AnyFSPath]],
to_info: Union[AnyFSPath, List[AnyFSPath]],
callback: "FsspecCallback" = DEFAULT_CALLBACK,
recursive: bool = False, # pylint: disable=unused-argument
batch_size: int = None,
):
) -> None:
# Currently, the implementation is non-recursive if the paths are
# provided as a list, and recursive if it's a single path.
from .local import localfs

def get_file(from_info, to_info):
localfs.makedirs(localfs.path.parent(to_info), exist_ok=True)
branch = callback.branch(from_info, to_info, {})
self.get_file(from_info, to_info, callback=branch)
callback.relative_update()
def get_file(rpath, lpath, **kwargs):
localfs.makedirs(localfs.path.parent(lpath), exist_ok=True)
self.fs.get_file(rpath, lpath, **kwargs)

if not self.isdir(from_info):
callback.set_size(1)
return get_file(from_info, to_info)
get_file = callback.wrap_and_branch(get_file)

pairs = {
info: localfs.path.join(
to_info, *self.path.relparts(info, from_info)
)
for info in self.find(from_info)
}
if not pairs:
localfs.makedirs(to_info, exist_ok=True)
return
if isinstance(from_info, list) and isinstance(to_info, list):
from_infos: List[AnyFSPath] = from_info
to_infos: List[AnyFSPath] = to_info
else:
assert isinstance(from_info, str)
assert isinstance(to_info, str)

if not self.isdir(from_info):
callback.set_size(1)
return get_file(from_info, to_info)

callback.set_size(len(pairs))
max_workers = batch_size or self.jobs
# NOTE: unlike pulling/fetching cache, where we need to
# download everything we can, not raising an error here might
# turn very ugly, as the user might think that he has
# downloaded a complete directory, while having a partial one,
# which might cause unexpected results in his pipeline.
with ThreadPoolExecutor(
max_workers=max_workers, cancel_on_error=True
) as executor:
list(
executor.imap_unordered(
lambda args: get_file(*args), pairs.items()
from_infos = list(self.find(from_info))
if not from_infos:
return localfs.makedirs(to_info, exist_ok=True)

to_infos = [
localfs.path.join(
to_info, *self.path.relparts(info, from_info)
)
for info in from_infos
]

jobs = batch_size or self.jobs
if self.fs.async_impl:
return self.fs.get(
from_infos,
to_infos,
callback=callback,
batch_size=jobs,
)

callback.set_size(len(from_infos))
executor = ThreadPoolExecutor(max_workers=jobs, cancel_on_error=True)
with executor:
list(executor.imap_unordered(get_file, from_infos, to_infos))

def ukey(self, path: AnyFSPath) -> str:
return self.fs.ukey(path)

Expand Down

0 comments on commit 54dd3ae

Please sign in to comment.