From b9f82c3c81d163aaf69b84bea0f79a47c39273b3 Mon Sep 17 00:00:00 2001 From: Carlos Zoido Date: Tue, 26 Nov 2019 10:00:14 +0100 Subject: [PATCH] Upload packages in parallel (#5856) * Added required = True to subparsers in order to print error message in Py2 and Py3. * sync * basic concurrent upload at reference level with futures * revert changes * add line * add progress bar for file list * refactor compressing progress bars * refactor upload progress bar * name change * move download code * put IterableToFileAdapter back * make file iterable * file iterator * fix iterable file * change function place * check output exists * change ouput to tqdm * minor changes * wip * minor changes * wip * add progress function * change bar output * wip * convert list to string * convert to string before output * Lock buggy urllib3 (#5808) * app simplifying (#5806) * Apply lockfile before updating downstream requires (#5771) * apply graph_lock before looking for overrides * first step: get rid of the warning * cleaner if graph_lock is passed to the function * only update requires upstream if no lockfile is applied * fix tests * Deprecation of CONAN_USERNAME and CONAN_CHANNEL: fix error message (#5756) * if CONAN_USERNAME and CONAN_CHANNEL are deprecated, the error cannot recommend them * update tests accordingly * test client load() file method (#5815) * no user/channel repr without _ (#5817) * no user/channel repr without _ * minor fixes * fix tests * Remove py34 (#5820) * fix upload package id (#5824) * - update macOS, watchOS, tvOS, iOS version numbers (#5823) * Refresh token client support. (#5662) * Refresh token client support. Missing tests. Missing migration * public method * WIP * Refresh almost there * Removed prints * Try migrate * Migration * Add comment * Refresh token flow following RFC recommentations * Refresh ok * review * Remove traces * Refactor capabilities * Removed tmp file * Review * #5819 Show warning message for Python 3.4 (#5829) * #5819 Show warning message for Python 3.4 - Add new warning message for python 3.4 which is no longer supported - Added funcional tests to validate both python 3.4 and 2.x Signed-off-by: Uilian Ries * #5819 Fix broken tests Signed-off-by: Uilian Ries * Add cpp_info.name to cmake and pkg_config generators (#5598) * Add cpp_info.name to cmake generators * Fix unit tests to mimic real behavior * cmake_paths test * add test for cmake generator * Add cmake_find_package test * fix test in py3 * Applied cpp_info.name to pkg_config generator * check different name in pkg_config * refactor * refactor download progress * refactor bars * check generator type * minor changes * put adapter * change function name * change upload to write * fix names * refactor written_chunks * remove size query to progress bar * fix download size return * sync with develop * add parallel argument * fix add argument * upload with threadpool * manage bar positions * fix output * fix rewrite length * output changes * catch exceptions * remove import * print all errors * change error message * change default number of threads * upload in parallel test * minor changes * adapt output on parallel upload scope * fix format * fix format * fix test * upload complete parallel * minor changes * minor changes * add parallel upload * merge changes * fix test * fix rest_client * force non interactive mode with parallel uploads * test behaviour * fix test * reorder * add func * minor changes * minor changes * add callback * fix format * add post_upload hook when no packages to upload * prevent repeating headers * minor fixes * revert capabilities change * output with tqdm * remove output messages without revisions * send messages to log * add async comment * number of threads in help * change exception place * move slots to progress bars * remove attribute * kill the thread when problems uploading recipe * default_server_user * remove check * extra checks * prevent upload packages if error * fix test * minor changes * dont call hook * change var name * fix test * remove debugging code * simplify tests * remove redundant code * added mutex for hook lazy loading * remove slots --- conans/client/cmd/uploader.py | 96 ++++++++++++++----- conans/client/command.py | 6 +- conans/client/conan_api.py | 6 +- conans/client/hook_manager.py | 23 +++-- conans/client/rest/rest_client_v1.py | 5 +- conans/client/rest/uploader_downloader.py | 3 +- .../command/upload_complete_test.py | 62 +++++++++++- conans/util/progress_bar.py | 31 ++++-- 8 files changed, 184 insertions(+), 48 deletions(-) diff --git a/conans/client/cmd/uploader.py b/conans/client/cmd/uploader.py index 2e9bb1c6011..95e08ac6d09 100644 --- a/conans/client/cmd/uploader.py +++ b/conans/client/cmd/uploader.py @@ -1,10 +1,13 @@ import os import stat import tarfile +import threading import time from collections import defaultdict +from multiprocessing.pool import ThreadPool from conans.util import progress_bar +from conans.util.progress_bar import left_justify_message from conans.client.remote_manager import is_package_snapshot_complete, calc_files_checksum from conans.client.source import complete_recipe_sources from conans.errors import ConanException, NotFoundException @@ -73,21 +76,42 @@ def __init__(self, cache, user_io, remote_manager, loader, hook_manager): self._remote_manager = remote_manager self._loader = loader self._hook_manager = hook_manager + self._upload_thread_pool = None + self._exceptions_list = [] def upload(self, reference_or_pattern, remotes, upload_recorder, package_id=None, all_packages=None, confirm=False, retry=None, retry_wait=None, integrity_check=False, - policy=None, query=None): + policy=None, query=None, parallel_upload=False): t1 = time.time() refs, confirm = self._collects_refs_to_upload(package_id, reference_or_pattern, confirm) refs_by_remote = self._collect_packages_to_upload(refs, confirm, remotes, all_packages, query, package_id) - # Do the job + + if parallel_upload: + self._upload_thread_pool = ThreadPool(8) + self._user_io.disable_input() + else: + self._upload_thread_pool = ThreadPool(1) + for remote, refs in refs_by_remote.items(): self._output.info("Uploading to remote '{}':".format(remote.name)) - for (ref, conanfile, prefs) in refs: + + def upload_ref(ref_conanfile_prefs): + ref, conanfile, prefs = ref_conanfile_prefs self._upload_ref(conanfile, ref, prefs, retry, retry_wait, integrity_check, policy, remote, upload_recorder, remotes) + self._upload_thread_pool.map(upload_ref, + [(ref, conanfile, prefs) for (ref, conanfile, prefs) in + refs]) + self._upload_thread_pool.close() + self._upload_thread_pool.join() + for exception in self._exceptions_list: + self._output.error(str(exception)) + + if len(self._exceptions_list) > 0: + raise ConanException("Errors uploading some packages") + logger.debug("UPLOAD: Time manager upload: %f" % (time.time() - t1)) def _collects_refs_to_upload(self, package_id, reference_or_pattern, confirm): @@ -185,27 +209,54 @@ def _upload_ref(self, conanfile, ref, prefs, retry, retry_wait, integrity_check, # FIXME: because the recipe can have one and the package a different one self._hook_manager.execute("pre_upload", conanfile_path=conanfile_path, reference=ref, remote=recipe_remote) - - self._output.info("Uploading %s to remote '%s'" % (str(ref), recipe_remote.name)) - self._upload_recipe(ref, conanfile, retry, retry_wait, policy, recipe_remote, remotes) - upload_recorder.add_recipe(ref, recipe_remote.name, recipe_remote.url) + msg = "\rUploading %s to remote '%s'" % (str(ref), recipe_remote.name) + self._output.info(left_justify_message(msg)) + try: + self._upload_recipe(ref, conanfile, retry, retry_wait, policy, recipe_remote, remotes) + upload_recorder.add_recipe(ref, recipe_remote.name, recipe_remote.url) + except ConanException as exc: + self._exceptions_list.append(exc) + return # Now the binaries if prefs: total = len(prefs) - for index, pref in enumerate(prefs): - p_remote = recipe_remote - msg = ("Uploading package %d/%d: %s to '%s'" % (index+1, total, str(pref.id), - p_remote.name)) - self._output.info(msg) - self._upload_package(pref, retry, retry_wait, - integrity_check, policy, p_remote) - upload_recorder.add_package(pref, p_remote.name, p_remote.url) - - # FIXME: I think it makes no sense to specify a remote to "post_upload" - # FIXME: because the recipe can have one and the package a different one - self._hook_manager.execute("post_upload", conanfile_path=conanfile_path, reference=ref, - remote=recipe_remote) + p_remote = recipe_remote + + def upload_package_index(index_pref): + try: + index, pref = index_pref + up_msg = "\rUploading package %d/%d: %s to '%s'" % (index + 1, total, + str(pref.id), + p_remote.name) + self._output.info(left_justify_message(up_msg)) + self._upload_package(pref, retry, retry_wait, + integrity_check, policy, p_remote) + upload_recorder.add_package(pref, p_remote.name, p_remote.url) + return conanfile_path, ref, recipe_remote, None + except ConanException as exc: + return None, None, None, exc + + def upload_package_callback(ret): + for cf_path, r_ref, r_rem, exc in ret: + if exc is None: + # FIXME: I think it makes no sense to specify a remote to "post_upload" + # FIXME: because the recipe can have one and the package a different one + self._hook_manager.execute("post_upload", conanfile_path=cf_path, + reference=r_ref, remote=r_rem) + else: + self._exceptions_list.append(exc) + # This doesn't wait for the packages to end, so the function returns + # and the "pool entry" for the recipe is released + self._upload_thread_pool.map_async(upload_package_index, + [(index, pref) for index, pref + in enumerate(prefs)], + callback=upload_package_callback) + else: + # FIXME: I think it makes no sense to specify a remote to "post_upload" + # FIXME: because the recipe can have one and the package a different one + self._hook_manager.execute("post_upload", conanfile_path=conanfile_path, reference=ref, + remote=recipe_remote) def _upload_recipe(self, ref, conanfile, retry, retry_wait, policy, remote, remotes): @@ -408,11 +459,10 @@ def _package_files_to_upload(self, pref, policy, the_files, remote): return the_files, deleted def _upload_recipe_end_msg(self, ref, remote): - msg = "Uploaded conan recipe '%s' to '%s'" % (str(ref), remote.name) + msg = "\rUploaded conan recipe '%s' to '%s'" % (str(ref), remote.name) url = remote.url.replace("https://api.bintray.com/conan", "https://bintray.com") msg += ": %s" % url - self._output.writeln("") - self._output.info(msg) + self._output.info(left_justify_message(msg)) def _package_integrity_check(self, pref, files, package_folder): # If package has been modified remove tgz to regenerate it diff --git a/conans/client/command.py b/conans/client/command.py index 334fb41eb29..e0cc756b3cf 100644 --- a/conans/client/command.py +++ b/conans/client/command.py @@ -1344,6 +1344,9 @@ def upload(self, *args): help="Uploads package only if recipe is the same as the remote one") parser.add_argument("-j", "--json", default=None, action=OnceArgument, help='json file path where the upload information will be written to') + parser.add_argument("--parallel", action='store_true', default=False, + help='Upload files in parallel using multiple threads ' + 'The default number of launched threads is 8') args = parser.parse_args(*args) @@ -1398,7 +1401,8 @@ def upload(self, *args): query=args.query, remote_name=args.remote, all_packages=args.all, policy=policy, confirm=args.confirm, retry=args.retry, - retry_wait=args.retry_wait, integrity_check=args.check) + retry_wait=args.retry_wait, integrity_check=args.check, + parallel_upload=args.parallel) except ConanException as exc: info = exc.info diff --git a/conans/client/conan_api.py b/conans/client/conan_api.py index bc5824e6003..2d3df0d106a 100644 --- a/conans/client/conan_api.py +++ b/conans/client/conan_api.py @@ -868,7 +868,8 @@ def search_packages(self, reference, query=None, remote_name=None, outdated=Fals @api_method def upload(self, pattern, package=None, remote_name=None, all_packages=False, confirm=False, - retry=None, retry_wait=None, integrity_check=False, policy=None, query=None): + retry=None, retry_wait=None, integrity_check=False, policy=None, query=None, + parallel_upload=False): """ Uploads a package recipe and the generated binary packages to a specified remote """ upload_recorder = UploadRecorder() @@ -877,7 +878,8 @@ def upload(self, pattern, package=None, remote_name=None, all_packages=False, co remotes = self.app.load_remotes(remote_name=remote_name) try: uploader.upload(pattern, remotes, upload_recorder, package, all_packages, confirm, - retry, retry_wait, integrity_check, policy, query=query) + retry, retry_wait, integrity_check, policy, query=query, + parallel_upload=parallel_upload) return upload_recorder.get_info() except ConanException as exc: upload_recorder.error = True diff --git a/conans/client/hook_manager.py b/conans/client/hook_manager.py index c46eda859a3..de22e9ff44c 100644 --- a/conans/client/hook_manager.py +++ b/conans/client/hook_manager.py @@ -3,6 +3,7 @@ import traceback import uuid from collections import defaultdict +from threading import Lock from conans.client.output import ScopedOutput from conans.client.tools.files import chdir @@ -40,15 +41,19 @@ def __init__(self, hooks_folder, hook_names, output): self.hooks = defaultdict(list) self.output = output self._attribute_checker_path = os.path.join(self._hooks_folder, "attribute_checker.py") - - def create_default_hooks(self): - save(self._attribute_checker_path, attribute_checker_hook) + self._mutex = Lock() def execute(self, method_name, **kwargs): - if not os.path.exists(self._attribute_checker_path): - self.create_default_hooks() - if not self.hooks: - self.load_hooks() + # It is necessary to protect the lazy loading of hooks with a mutex, because it can be + # concurrent (e.g. upload --parallel) + self._mutex.acquire() + try: + if not os.path.exists(self._attribute_checker_path): + save(self._attribute_checker_path, attribute_checker_hook) + if not self.hooks: + self.load_hooks() + finally: + self._mutex.release() assert method_name in valid_hook_methods, \ "Method '{}' not in valid hooks methods".format(method_name) @@ -61,9 +66,9 @@ def execute(self, method_name, **kwargs): def load_hooks(self): for name in self._hook_names: - self.load_hook(name) + self._load_hook(name) - def load_hook(self, hook_name): + def _load_hook(self, hook_name): if not hook_name.endswith(".py"): hook_name = "%s.py" % hook_name hook_path = os.path.normpath(os.path.join(self._hooks_folder, hook_name)) diff --git a/conans/client/rest/rest_client_v1.py b/conans/client/rest/rest_client_v1.py index 73499657a4a..aa3807156d3 100644 --- a/conans/client/rest/rest_client_v1.py +++ b/conans/client/rest/rest_client_v1.py @@ -130,10 +130,9 @@ def _upload_package(self, pref, files_to_upload, retry, retry_wait): url = self.router.package_upload_urls(pref) file_sizes = {filename: os.stat(abs_path).st_size for filename, abs_path in files_to_upload.items()} - self._output.rewrite_line("Requesting upload urls...") + logger.debug("Requesting upload urls...") urls = self._get_file_to_url_dict(url, data=file_sizes) - self._output.rewrite_line("Requesting upload urls...Done!") - self._output.writeln("") + logger.debug("Requesting upload urls...Done!") self._upload_files(urls, files_to_upload, self._output, retry, retry_wait) def _upload_files(self, file_urls, files, output, retry, retry_wait): diff --git a/conans/client/rest/uploader_downloader.py b/conans/client/rest/uploader_downloader.py index f7704c8b3a1..a903d1d2fb7 100644 --- a/conans/client/rest/uploader_downloader.py +++ b/conans/client/rest/uploader_downloader.py @@ -1,6 +1,7 @@ import os import traceback import time +from copy import copy from conans.util import progress_bar from conans.client.rest import response_to_str @@ -27,7 +28,7 @@ def upload(self, url, abs_path, auth=None, dedup=False, retry=None, retry_wait=N retry_wait = retry_wait if retry_wait is not None else 5 # Send always the header with the Sha1 - headers = headers or {} + headers = copy(headers) or {} headers["X-Checksum-Sha1"] = sha1sum(abs_path) if dedup: dedup_headers = {"X-Checksum-Deploy": "true"} diff --git a/conans/test/functional/command/upload_complete_test.py b/conans/test/functional/command/upload_complete_test.py index d6aadd78a7f..e7f28e16923 100644 --- a/conans/test/functional/command/upload_complete_test.py +++ b/conans/test/functional/command/upload_complete_test.py @@ -17,7 +17,7 @@ from conans.test.utils.test_files import hello_conan_files, hello_source_files, temp_folder, \ uncompress_packaged_files from conans.test.utils.tools import (NO_SETTINGS_PACKAGE_ID, TestClient, TestRequester, TestServer, - MockedUserIO, TestBufferConanOutput) + MockedUserIO, TestBufferConanOutput, GenConanfile) from conans.util.files import load, mkdir, save myconan1 = """ @@ -59,6 +59,19 @@ def put(self, *args, **kwargs): return super(BadConnectionUploader, self).put(*args, **kwargs) +class FailOnReferencesUploader(BadConnectionUploader): + fail_on = ["lib1", "lib3"] + + def __init__(self, *args, **kwargs): + super(BadConnectionUploader, self).__init__(*args, **kwargs) + + def put(self, *args, **kwargs): + if any(ref in args[0] for ref in self.fail_on): + raise ConnectionError("Connection fails with lib2 and lib4 references!") + else: + return super(BadConnectionUploader, self).put(*args, **kwargs) + + @unittest.skipIf(TestClient().cache.config.revisions_enabled, "We cannot know the folder of the revision without knowing the hash of " "the contents") @@ -253,6 +266,53 @@ def upload_error_with_config_test(self): client.run("upload Hello* --confirm --all") self.assertEqual(str(client.out).count("ERROR: Pair file, error!"), 6) + def upload_parallel_error_test(self): + """Cause an error in the parallel transfer and see some message""" + client = TestClient(requester_class=FailOnReferencesUploader, default_server_user=True) + client.save({"conanfile.py": GenConanfile()}) + client.run('user -p password -r default user') + for index in range(4): + client.run('create . lib{}/1.0@user/channel'.format(index)) + client.run('upload lib* --parallel -c --all -r default', assert_error=True) + self.assertIn("Connection fails with lib2 and lib4 references!", client.out) + self.assertIn("Execute upload again to retry upload the failed files", client.out) + + def upload_parallel_success_test(self): + """Upload 2 packages in parallel with success""" + + client = TestClient(default_server_user=True) + client.save({"conanfile.py": GenConanfile()}) + client.run('create . lib0/1.0@user/channel') + self.assertIn("lib0/1.0@user/channel: Package '{}' created".format(NO_SETTINGS_PACKAGE_ID), + client.out) + client.run('create . lib1/1.0@user/channel') + self.assertIn("lib1/1.0@user/channel: Package '{}' created".format(NO_SETTINGS_PACKAGE_ID), + client.out) + client.run('user -p password -r default user') + client.run('upload lib* --parallel -c --all -r default') + self.assertIn("Uploading lib0/1.0@user/channel to remote 'default'", client.out) + self.assertIn("Uploading lib1/1.0@user/channel to remote 'default'", client.out) + client.run('search lib0/1.0@user/channel -r default') + self.assertIn("lib0/1.0@user/channel", client.out) + client.run('search lib1/1.0@user/channel -r default') + self.assertIn("lib1/1.0@user/channel", client.out) + + def upload_parallel_fail_on_interaction_test(self): + """Upload 2 packages in parallel and fail because non_interactive forced""" + + client = TestClient(default_server_user=True) + client.save({"conanfile.py": GenConanfile()}) + num_references = 2 + for index in range(num_references): + client.run('create . lib{}/1.0@user/channel'.format(index)) + self.assertIn("lib{}/1.0@user/channel: Package '{}' created".format( + index, + NO_SETTINGS_PACKAGE_ID), + client.out) + client.run('user -c') + client.run('upload lib* --parallel -c --all -r default', assert_error=True) + self.assertIn("ERROR: Conan interactive mode disabled. [Remote: default]", client.out) + def upload_with_pattern_and_package_error_test(self): files = hello_conan_files("Hello1", "1.2.1") self.client.save(files) diff --git a/conans/util/progress_bar.py b/conans/util/progress_bar.py index 9515413cf63..1f92e7adb55 100644 --- a/conans/util/progress_bar.py +++ b/conans/util/progress_bar.py @@ -8,6 +8,16 @@ TIMEOUT_BEAT_SECONDS = 30 TIMEOUT_BEAT_CHARACTER = '.' +LEFT_JUSTIFY_DESC = 28 +LEFT_JUSTIFY_MESSAGE = 90 + + +def left_justify_message(msg): + return msg.ljust(LEFT_JUSTIFY_MESSAGE) + + +def left_justify_description(msg): + return msg.ljust(LEFT_JUSTIFY_DESC) class ProgressOutput(ConanOutput): @@ -34,8 +44,9 @@ def __init__(self, length, output, description, print_dot): if self._print_dot: self._last_time = time.time() if self._output and self._output.is_terminal and self._description: - self._tqdm_bar = tqdm(total=self._total_length, desc=self._description, - file=self._output, unit="B", leave=True, dynamic_ncols=False, + self._tqdm_bar = tqdm(total=self._total_length, + desc=left_justify_description(self._description), + file=self._output, unit="B", leave=False, dynamic_ncols=False, ascii=True, unit_scale=True, unit_divisor=1024) def pb_update(self, chunk_size): @@ -62,6 +73,9 @@ def update(self, chunks, chunk_size=1024): def pb_close(self): if self._tqdm_bar is not None: self._tqdm_bar.close() + msg = "\r{} completed [{:1.2f}k]".format(self._description, + self._processed_size / 1024.0) + tqdm.write(left_justify_message(msg), file=self._output, end="\n") class FileWrapper(Progress): @@ -95,15 +109,14 @@ def __init__(self, files_list, output, desc=None): self._last_progress = None self._i_file = 0 self._output = output - self._desc = desc + self._description = desc if self._output and not self._output.is_terminal: output.write("[") elif self._output: - self._tqdm_bar = tqdm(total=len(files_list), desc=desc, file=self._output, unit="files ", - leave=True, dynamic_ncols=False, ascii=True) - - def description(self): - return self._desc + self._tqdm_bar = tqdm(total=len(files_list), + desc=left_justify_description(self._description), + file=self._output, unit="files ", leave=False, dynamic_ncols=False, + ascii=True) def update(self): self._i_file = self._i_file + 1 @@ -118,6 +131,8 @@ def update(self): def pb_close(self): if self._output and self._output.is_terminal: self._tqdm_bar.close() + msg = "\r{} completed [{} files]".format(self._description, self._total_length) + tqdm.write(left_justify_message(msg), file=self._output, end="\n") elif self._output: self._output.writeln("]")