Skip to content

Commit

Permalink
Merge pull request #2436 from casperdcl/ui-tidy
Browse files Browse the repository at this point in the history
progressbar cleanup
  • Loading branch information
efiop committed Aug 29, 2019
2 parents 5d05e9a + 6c3029d commit 1ed3645
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 82 deletions.
90 changes: 57 additions & 33 deletions dvc/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from __future__ import print_function
import logging
from tqdm import tqdm
from copy import deepcopy
from concurrent.futures import ThreadPoolExecutor
from funcy import merge

logger = logging.getLogger(__name__)


class TqdmThreadPoolExecutor(ThreadPoolExecutor):
Expand Down Expand Up @@ -33,62 +35,84 @@ class Tqdm(tqdm):
maximum-compatibility tqdm-based progressbars
"""

BAR_FMT_DEFAULT = (
"{percentage:3.0f}%|{bar:10}|"
"{desc:{ncols_desc}.{ncols_desc}}{n}/{total}"
" [{elapsed}<{remaining}, {rate_fmt:>11}{postfix}]"
)
BAR_FMT_NOTOTAL = (
"{desc:{ncols_desc}.{ncols_desc}}{n}"
" [{elapsed}<??:??, {rate_fmt:>11}{postfix}]"
)

def __init__(
self,
iterable=None,
disable=None,
level=logging.ERROR,
desc=None,
leave=False,
bar_format=None,
bytes=False, # pylint: disable=W0622
desc_truncate=None,
leave=None,
**kwargs
):
"""
bytes : shortcut for
`unit='B', unit_scale=True, unit_divisor=1024, miniters=1`
desc_truncate : like `desc` but will truncate to 10 chars
desc : persists after `close()`
level : effective logging level for determining `disable`;
used only if `disable` is unspecified
kwargs : anything accepted by `tqdm.tqdm()`
"""
kwargs = deepcopy(kwargs)
kwargs = kwargs.copy()
kwargs.setdefault("unit_scale", True)
if bytes:
for k, v in dict(
bytes_defaults = dict(
unit="B", unit_scale=True, unit_divisor=1024, miniters=1
).items():
kwargs.setdefault(k, v)
if desc_truncate is not None:
kwargs.setdefault("desc", self.truncate(desc_truncate))
if disable is None:
disable = (
logging.getLogger(__name__).getEffectiveLevel()
>= logging.CRITICAL
)
kwargs = merge(bytes_defaults, kwargs)
self.desc_persist = desc
if disable is None:
disable = logger.getEffectiveLevel() > level
super(Tqdm, self).__init__(
iterable=iterable, disable=disable, leave=leave, **kwargs
iterable=iterable,
disable=disable,
leave=leave,
desc=desc,
bar_format="!",
**kwargs
)
if bar_format is None:
if self.__len__():
self.bar_format = self.BAR_FMT_DEFAULT
else:
self.bar_format = self.BAR_FMT_NOTOTAL
else:
self.bar_format = bar_format
self.refresh()

def update_desc(self, desc, n=1, truncate=True):
def update_desc(self, desc, n=1):
"""
Calls `set_description(truncate(desc))` and `update(n)`
Calls `set_description_str(desc)` and `update(n)`
"""
self.set_description(
self.truncate(desc) if truncate else desc, refresh=False
)
self.set_description_str(desc, refresh=False)
self.update(n)

def update_to(self, current, total=None):
if total:
self.total = total # pylint: disable=W0613,W0201
self.update(current - self.n)

@classmethod
def truncate(cls, s, max_len=25, end=True, fill="..."):
"""
Guarantee len(output) < max_lenself.
>>> truncate("hello", 4)
'...o'
"""
if len(s) <= max_len:
return s
if len(fill) > max_len:
return fill[-max_len:] if end else fill[:max_len]
i = max_len - len(fill)
return (fill + s[-i:]) if end else (s[:i] + fill)
def close(self):
if self.desc_persist is not None:
self.set_description_str(self.desc_persist, refresh=False)
super(Tqdm, self).close()

@property
def format_dict(self):
"""inject `ncols_desc` to fill the display width (`ncols`)"""
d = super(Tqdm, self).format_dict
ncols = d["ncols"] or 80
ncols_desc = ncols - len(self.format_meter(ncols_desc=1, **d)) + 1
d["ncols_desc"] = max(ncols_desc, 0)
return d
8 changes: 2 additions & 6 deletions dvc/remote/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ def list_cache_paths(self):
def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
with Tqdm(
desc_truncate=name, disable=no_progress_bar, bytes=True
) as pbar:
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
self.blob_service.create_blob_from_path(
to_info.bucket,
to_info.path,
Expand All @@ -128,9 +126,7 @@ def _upload(
def _download(
self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs
):
with Tqdm(
desc_truncate=name, disable=no_progress_bar, bytes=True
) as pbar:
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
self.blob_service.get_blob_to_path(
from_info.bucket,
from_info.path,
Expand Down
15 changes: 10 additions & 5 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ def changed_cache(self, checksum):
return self._changed_dir_cache(checksum)
return self.changed_cache_file(checksum)

def cache_exists(self, checksums, jobs=None):
def cache_exists(self, checksums, jobs=None, name=None):
"""Check if the given checksums are stored in the remote.
There are two ways of performing this check:
Expand All @@ -630,23 +630,28 @@ def cache_exists(self, checksums, jobs=None):
take much shorter time to just retrieve everything they have under
a certain prefix (e.g. s3, gs, ssh, hdfs). Other remotes that can
check if particular file exists much quicker, use their own
implementation of cache_exists (see http, local).
implementation of cache_exists (see ssh, local).
Returns:
A list with checksums that were found in the remote
"""
if not self.no_traverse:
return list(set(checksums) & set(self.all()))

with Tqdm(total=len(checksums), unit="md5") as pbar:
with Tqdm(
desc="Querying "
+ ("cache in " + name if name else "remote cache"),
total=len(checksums),
unit="file",
) as pbar:

def exists_with_progress(path_info):
ret = self.exists(path_info)
pbar.update()
pbar.update_desc(str(path_info))
return ret

with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor:
path_infos = [self.checksum_to_path_info(x) for x in checksums]
path_infos = map(self.checksum_to_path_info, checksums)
in_remote = executor.map(exists_with_progress, path_infos)
ret = list(itertools.compress(checksums, in_remote))
return ret
Expand Down
2 changes: 1 addition & 1 deletion dvc/remote/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False):
total=None if no_progress_bar else self._content_length(from_info),
leave=False,
bytes=True,
desc_truncate=from_info.url if name is None else name,
desc=from_info.url if name is None else name,
disable=no_progress_bar,
) as pbar:
with open(to_file, "wb") as fd:
Expand Down
25 changes: 17 additions & 8 deletions dvc/remote/local/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,15 @@ def move(self, from_info, to_info):

move(from_info, to_info, mode=mode)

def cache_exists(self, checksums, jobs=None):
def cache_exists(self, checksums, jobs=None, name=None):
return [
checksum
for checksum in Tqdm(checksums, unit="md5")
for checksum in Tqdm(
checksums,
unit="file",
desc="Querying "
+ ("cache in " + name if name else "local cache"),
)
if not self.changed_cache_file(checksum)
]

Expand Down Expand Up @@ -313,14 +318,14 @@ def status(
show_checksums=False,
download=False,
):
logger.info(
logger.debug(
"Preparing to collect status from {}".format(remote.path_info)
)
ret = self._group(checksum_infos, show_checksums=show_checksums) or {}
md5s = list(ret)

logger.info("Collecting information from local cache...")
local_exists = self.cache_exists(md5s, jobs=jobs)
logger.debug("Collecting information from local cache...")
local_exists = self.cache_exists(md5s, jobs=jobs, name=self.cache_dir)

# This is a performance optimization. We can safely assume that,
# if the resources that we want to fetch are already cached,
Expand All @@ -329,8 +334,12 @@ def status(
if download and sorted(local_exists) == sorted(md5s):
remote_exists = local_exists
else:
logger.info("Collecting information from remote cache...")
remote_exists = list(remote.cache_exists(md5s, jobs=jobs))
logger.debug("Collecting information from remote cache...")
remote_exists = list(
remote.cache_exists(
md5s, jobs=jobs, name=str(remote.path_info)
)
)

self._fill_statuses(ret, local_exists, remote_exists)

Expand Down Expand Up @@ -377,7 +386,7 @@ def _process(
show_checksums=False,
download=False,
):
logger.info(
logger.debug(
"Preparing to {} '{}'".format(
"download data from" if download else "upload data to",
remote.path_info,
Expand Down
8 changes: 2 additions & 6 deletions dvc/remote/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,15 @@ def list_cache_paths(self):
def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
with Tqdm(
desc_truncate=name, disable=no_progress_bar, bytes=True
) as pbar:
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
self.oss_service.put_object_from_file(
to_info.path, from_file, progress_callback=pbar.update_to
)

def _download(
self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs
):
with Tqdm(
desc_truncate=name, disable=no_progress_bar, bytes=True
) as pbar:
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
self.oss_service.get_object_to_file(
from_info.path, to_file, progress_callback=pbar.update_to
)
Expand Down
10 changes: 2 additions & 8 deletions dvc/remote/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,7 @@ def exists(self, path_info):
def _upload(self, from_file, to_info, name=None, no_progress_bar=False):
total = os.path.getsize(from_file)
with Tqdm(
disable=no_progress_bar,
total=total,
bytes=True,
desc_truncate=name,
disable=no_progress_bar, total=total, bytes=True, desc=name
) as pbar:
self.s3.upload_file(
from_file,
Expand All @@ -229,10 +226,7 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False):
Bucket=from_info.bucket, Key=from_info.path
)["ContentLength"]
with Tqdm(
disable=no_progress_bar,
total=total,
bytes=True,
desc_truncate=name,
disable=no_progress_bar, total=total, bytes=True, desc=name
) as pbar:
self.s3.download_file(
from_info.bucket, from_info.path, to_file, Callback=pbar.update
Expand Down
10 changes: 7 additions & 3 deletions dvc/remote/ssh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def _exists(chunk_and_channel):

return results

def cache_exists(self, checksums, jobs=None):
def cache_exists(self, checksums, jobs=None, name=None):
"""This is older implementation used in remote/base.py
We are reusing it in RemoteSSH, because SSH's batch_exists proved to be
faster than current approach (relying on exists(path_info)) applied in
Expand All @@ -267,7 +267,12 @@ def cache_exists(self, checksums, jobs=None):
if not self.no_traverse:
return list(set(checksums) & set(self.all()))

with Tqdm(total=len(checksums), unit="md5") as pbar:
with Tqdm(
desc="Querying "
+ ("cache in " + name if name else "remote cache"),
total=len(checksums),
unit="file",
) as pbar:

def exists_with_progress(chunks):
return self.batch_exists(chunks, callback=pbar.update_desc)
Expand All @@ -278,5 +283,4 @@ def exists_with_progress(chunks):
results = executor.map(exists_with_progress, chunks)
in_remote = itertools.chain.from_iterable(results)
ret = list(itertools.compress(checksums, in_remote))
pbar.update_desc("", 0) # clear path name description
return ret
4 changes: 2 additions & 2 deletions dvc/remote/ssh/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def remove(self, path):

def download(self, src, dest, no_progress_bar=False, progress_title=None):
with Tqdm(
desc_truncate=progress_title or os.path.basename(src),
desc=progress_title or os.path.basename(src),
disable=no_progress_bar,
bytes=True,
) as pbar:
Expand All @@ -186,7 +186,7 @@ def upload(self, src, dest, no_progress_bar=False, progress_title=None):
progress_title = posixpath.basename(dest)

with Tqdm(
desc_truncate=progress_title, disable=no_progress_bar, bytes=True
desc=progress_title, disable=no_progress_bar, bytes=True
) as pbar:
self.sftp.put(src, tmp_file, callback=pbar.update_to)

Expand Down
3 changes: 2 additions & 1 deletion dvc/repo/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def checkout(self, target=None, with_deps=False, force=False, recursive=False):
with self.state:
_cleanup_unused_links(self, all_stages)
total = get_all_files_numbers(stages)
if total == 0:
logger.info("Nothing to do")
with Tqdm(
total=total, unit="file", desc="Checkout", disable=total == 0
) as pbar:
Expand All @@ -51,4 +53,3 @@ def checkout(self, target=None, with_deps=False, force=False, recursive=False):
)

stage.checkout(force=force, progress_callback=pbar.update_desc)
pbar.update_desc("Checkout", 0) # clear path name description

0 comments on commit 1ed3645

Please sign in to comment.