Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

progressbar cleanup #2436

Merged
merged 15 commits into from
Aug 29, 2019
90 changes: 57 additions & 33 deletions dvc/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from __future__ import print_function
import logging
from tqdm import tqdm
from copy import deepcopy
from concurrent.futures import ThreadPoolExecutor
from funcy import merge

logger = logging.getLogger(__name__)


class TqdmThreadPoolExecutor(ThreadPoolExecutor):
Expand Down Expand Up @@ -33,62 +35,84 @@ class Tqdm(tqdm):
maximum-compatibility tqdm-based progressbars
"""

BAR_FMT_DEFAULT = (
"{percentage:3.0f}%|{bar:10}|"
"{desc:{ncols_desc}.{ncols_desc}}{n}/{total}"
" [{elapsed}<{remaining}, {rate_fmt:>11}{postfix}]"
)
BAR_FMT_NOTOTAL = (
"{desc:{ncols_desc}.{ncols_desc}}{n}"
" [{elapsed}<??:??, {rate_fmt:>11}{postfix}]"
)

def __init__(
self,
iterable=None,
disable=None,
level=logging.ERROR,
desc=None,
leave=False,
bar_format=None,
bytes=False, # pylint: disable=W0622
desc_truncate=None,
leave=None,
**kwargs
):
"""
bytes : shortcut for
`unit='B', unit_scale=True, unit_divisor=1024, miniters=1`
desc_truncate : like `desc` but will truncate to 10 chars
desc : persists after `close()`
level : effective logging level for determining `disable`;
used only if `disable` is unspecified
kwargs : anything accepted by `tqdm.tqdm()`
"""
kwargs = deepcopy(kwargs)
kwargs = kwargs.copy()
kwargs.setdefault("unit_scale", True)
if bytes:
for k, v in dict(
bytes_defaults = dict(
unit="B", unit_scale=True, unit_divisor=1024, miniters=1
).items():
kwargs.setdefault(k, v)
if desc_truncate is not None:
kwargs.setdefault("desc", self.truncate(desc_truncate))
if disable is None:
disable = (
logging.getLogger(__name__).getEffectiveLevel()
>= logging.CRITICAL
)
kwargs = merge(bytes_defaults, kwargs)
self.desc_persist = desc
if disable is None:
disable = logger.getEffectiveLevel() > level
super(Tqdm, self).__init__(
iterable=iterable, disable=disable, leave=leave, **kwargs
iterable=iterable,
disable=disable,
leave=leave,
desc=desc,
bar_format="!",
**kwargs
)
if bar_format is None:
if self.__len__():
self.bar_format = self.BAR_FMT_DEFAULT
else:
self.bar_format = self.BAR_FMT_NOTOTAL
else:
self.bar_format = bar_format
self.refresh()

def update_desc(self, desc, n=1, truncate=True):
def update_desc(self, desc, n=1):
"""
Calls `set_description(truncate(desc))` and `update(n)`
Calls `set_description_str(desc)` and `update(n)`
"""
self.set_description(
self.truncate(desc) if truncate else desc, refresh=False
)
self.set_description_str(desc, refresh=False)
self.update(n)

def update_to(self, current, total=None):
if total:
self.total = total # pylint: disable=W0613,W0201
self.update(current - self.n)

@classmethod
def truncate(cls, s, max_len=25, end=True, fill="..."):
"""
Guarantee len(output) < max_lenself.
>>> truncate("hello", 4)
'...o'
"""
if len(s) <= max_len:
return s
if len(fill) > max_len:
return fill[-max_len:] if end else fill[:max_len]
i = max_len - len(fill)
return (fill + s[-i:]) if end else (s[:i] + fill)
def close(self):
if self.desc_persist is not None:
self.set_description_str(self.desc_persist, refresh=False)
casperdcl marked this conversation as resolved.
Show resolved Hide resolved
super(Tqdm, self).close()

@property
def format_dict(self):
"""inject `ncols_desc` to fill the display width (`ncols`)"""
d = super(Tqdm, self).format_dict
ncols = d["ncols"] or 80
ncols_desc = ncols - len(self.format_meter(ncols_desc=1, **d)) + 1
d["ncols_desc"] = max(ncols_desc, 0)
return d
8 changes: 2 additions & 6 deletions dvc/remote/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ def list_cache_paths(self):
def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
with Tqdm(
desc_truncate=name, disable=no_progress_bar, bytes=True
) as pbar:
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
self.blob_service.create_blob_from_path(
to_info.bucket,
to_info.path,
Expand All @@ -128,9 +126,7 @@ def _upload(
def _download(
self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs
):
with Tqdm(
desc_truncate=name, disable=no_progress_bar, bytes=True
) as pbar:
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
self.blob_service.get_blob_to_path(
from_info.bucket,
from_info.path,
Expand Down
15 changes: 10 additions & 5 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ def changed_cache(self, checksum):
return self._changed_dir_cache(checksum)
return self.changed_cache_file(checksum)

def cache_exists(self, checksums, jobs=None):
def cache_exists(self, checksums, jobs=None, name=None):
"""Check if the given checksums are stored in the remote.

There are two ways of performing this check:
Expand All @@ -630,23 +630,28 @@ def cache_exists(self, checksums, jobs=None):
take much shorter time to just retrieve everything they have under
a certain prefix (e.g. s3, gs, ssh, hdfs). Other remotes that can
check if particular file exists much quicker, use their own
implementation of cache_exists (see http, local).
implementation of cache_exists (see ssh, local).

Returns:
A list with checksums that were found in the remote
"""
if not self.no_traverse:
return list(set(checksums) & set(self.all()))

with Tqdm(total=len(checksums), unit="md5") as pbar:
with Tqdm(
desc="Querying "
+ ("cache in " + name if name else "remote cache"),
total=len(checksums),
unit="file",
) as pbar:

def exists_with_progress(path_info):
ret = self.exists(path_info)
pbar.update()
pbar.update_desc(str(path_info))
return ret

with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor:
path_infos = [self.checksum_to_path_info(x) for x in checksums]
path_infos = map(self.checksum_to_path_info, checksums)
in_remote = executor.map(exists_with_progress, path_infos)
ret = list(itertools.compress(checksums, in_remote))
return ret
Expand Down
2 changes: 1 addition & 1 deletion dvc/remote/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False):
total=None if no_progress_bar else self._content_length(from_info),
leave=False,
bytes=True,
desc_truncate=from_info.url if name is None else name,
desc=from_info.url if name is None else name,
disable=no_progress_bar,
) as pbar:
with open(to_file, "wb") as fd:
Expand Down
25 changes: 17 additions & 8 deletions dvc/remote/local/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,15 @@ def move(self, from_info, to_info):

move(from_info, to_info, mode=mode)

def cache_exists(self, checksums, jobs=None):
def cache_exists(self, checksums, jobs=None, name=None):
return [
checksum
for checksum in Tqdm(checksums, unit="md5")
for checksum in Tqdm(
checksums,
unit="file",
desc="Querying "
+ ("cache in " + name if name else "local cache"),
)
if not self.changed_cache_file(checksum)
]

Expand Down Expand Up @@ -313,14 +318,14 @@ def status(
show_checksums=False,
download=False,
):
logger.info(
logger.debug(
casperdcl marked this conversation as resolved.
Show resolved Hide resolved
"Preparing to collect status from {}".format(remote.path_info)
casperdcl marked this conversation as resolved.
Show resolved Hide resolved
)
ret = self._group(checksum_infos, show_checksums=show_checksums) or {}
md5s = list(ret)

logger.info("Collecting information from local cache...")
local_exists = self.cache_exists(md5s, jobs=jobs)
logger.debug("Collecting information from local cache...")
local_exists = self.cache_exists(md5s, jobs=jobs, name=self.cache_dir)

# This is a performance optimization. We can safely assume that,
# if the resources that we want to fetch are already cached,
Expand All @@ -329,8 +334,12 @@ def status(
if download and sorted(local_exists) == sorted(md5s):
remote_exists = local_exists
else:
logger.info("Collecting information from remote cache...")
remote_exists = list(remote.cache_exists(md5s, jobs=jobs))
logger.debug("Collecting information from remote cache...")
remote_exists = list(
remote.cache_exists(
md5s, jobs=jobs, name=str(remote.path_info)
)
)

self._fill_statuses(ret, local_exists, remote_exists)

Expand Down Expand Up @@ -377,7 +386,7 @@ def _process(
show_checksums=False,
download=False,
):
logger.info(
logger.debug(
"Preparing to {} '{}'".format(
"download data from" if download else "upload data to",
remote.path_info,
Expand Down
8 changes: 2 additions & 6 deletions dvc/remote/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,15 @@ def list_cache_paths(self):
def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
with Tqdm(
desc_truncate=name, disable=no_progress_bar, bytes=True
) as pbar:
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
self.oss_service.put_object_from_file(
to_info.path, from_file, progress_callback=pbar.update_to
)

def _download(
self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs
):
with Tqdm(
desc_truncate=name, disable=no_progress_bar, bytes=True
) as pbar:
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
self.oss_service.get_object_to_file(
from_info.path, to_file, progress_callback=pbar.update_to
)
Expand Down
10 changes: 2 additions & 8 deletions dvc/remote/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,7 @@ def exists(self, path_info):
def _upload(self, from_file, to_info, name=None, no_progress_bar=False):
total = os.path.getsize(from_file)
with Tqdm(
disable=no_progress_bar,
total=total,
bytes=True,
desc_truncate=name,
disable=no_progress_bar, total=total, bytes=True, desc=name
) as pbar:
self.s3.upload_file(
from_file,
Expand All @@ -229,10 +226,7 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False):
Bucket=from_info.bucket, Key=from_info.path
)["ContentLength"]
with Tqdm(
disable=no_progress_bar,
total=total,
bytes=True,
desc_truncate=name,
disable=no_progress_bar, total=total, bytes=True, desc=name
) as pbar:
self.s3.download_file(
from_info.bucket, from_info.path, to_file, Callback=pbar.update
Expand Down
10 changes: 7 additions & 3 deletions dvc/remote/ssh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def _exists(chunk_and_channel):

return results

def cache_exists(self, checksums, jobs=None):
def cache_exists(self, checksums, jobs=None, name=None):
"""This is older implementation used in remote/base.py
We are reusing it in RemoteSSH, because SSH's batch_exists proved to be
faster than current approach (relying on exists(path_info)) applied in
Expand All @@ -267,7 +267,12 @@ def cache_exists(self, checksums, jobs=None):
if not self.no_traverse:
return list(set(checksums) & set(self.all()))

with Tqdm(total=len(checksums), unit="md5") as pbar:
with Tqdm(
desc="Querying "
+ ("cache in " + name if name else "remote cache"),
total=len(checksums),
unit="file",
) as pbar:

def exists_with_progress(chunks):
return self.batch_exists(chunks, callback=pbar.update_desc)
Expand All @@ -278,5 +283,4 @@ def exists_with_progress(chunks):
results = executor.map(exists_with_progress, chunks)
in_remote = itertools.chain.from_iterable(results)
ret = list(itertools.compress(checksums, in_remote))
pbar.update_desc("", 0) # clear path name description
return ret
4 changes: 2 additions & 2 deletions dvc/remote/ssh/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def remove(self, path):

def download(self, src, dest, no_progress_bar=False, progress_title=None):
with Tqdm(
desc_truncate=progress_title or os.path.basename(src),
desc=progress_title or os.path.basename(src),
disable=no_progress_bar,
bytes=True,
) as pbar:
Expand All @@ -186,7 +186,7 @@ def upload(self, src, dest, no_progress_bar=False, progress_title=None):
progress_title = posixpath.basename(dest)

with Tqdm(
desc_truncate=progress_title, disable=no_progress_bar, bytes=True
desc=progress_title, disable=no_progress_bar, bytes=True
) as pbar:
self.sftp.put(src, tmp_file, callback=pbar.update_to)

Expand Down
3 changes: 2 additions & 1 deletion dvc/repo/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def checkout(self, target=None, with_deps=False, force=False, recursive=False):
with self.state:
_cleanup_unused_links(self, all_stages)
total = get_all_files_numbers(stages)
if total == 0:
logger.info("Nothing to do")
with Tqdm(
total=total, unit="file", desc="Checkout", disable=total == 0
) as pbar:
Expand All @@ -51,4 +53,3 @@ def checkout(self, target=None, with_deps=False, force=False, recursive=False):
)

stage.checkout(force=force, progress_callback=pbar.update_desc)
pbar.update_desc("Checkout", 0) # clear path name description