Skip to content

Commit

Permalink
Upload packages in parallel (#5856)
Browse files Browse the repository at this point in the history
* Added required = True to subparsers in order to print error message in Py2 and Py3.

* sync

* basic concurrent upload at reference level with futures

* revert changes

* add line

* add progress bar for file list

* refactor compressing progress bars

* refactor upload progress bar

* name change

* move download code

* put IterableToFileAdapter back

* make file iterable

* file iterator

* fix iterable file

* change function place

* check output exists

* change ouput to tqdm

* minor changes

* wip

* minor changes

* wip

* add progress function

* change bar output

* wip

* convert list to string

* convert to string before output

* Lock buggy urllib3 (#5808)

* app simplifying (#5806)

* Apply lockfile before updating downstream requires (#5771)

* apply graph_lock before looking for overrides

* first step: get rid of the warning

* cleaner if graph_lock is passed to the function

* only update requires upstream if no lockfile is applied

* fix tests

* Deprecation of CONAN_USERNAME and CONAN_CHANNEL: fix error message (#5756)

* if CONAN_USERNAME and CONAN_CHANNEL are deprecated, the error cannot recommend them

* update tests accordingly

* test client load() file method (#5815)

* no user/channel repr without _ (#5817)

* no user/channel repr without _

* minor fixes

* fix tests

* Remove py34 (#5820)

* fix upload package id (#5824)

* - update macOS, watchOS, tvOS, iOS version numbers (#5823)

* Refresh token client support.  (#5662)

* Refresh token client support. Missing tests. Missing migration

* public method

* WIP

* Refresh almost there

* Removed prints

* Try migrate

* Migration

* Add comment

* Refresh token flow following RFC recommentations

* Refresh ok

* review

* Remove traces

* Refactor capabilities

* Removed tmp file

* Review

* #5819 Show warning message for Python 3.4 (#5829)

* #5819 Show warning message for Python 3.4

- Add new warning message for python 3.4 which is no longer supported
- Added funcional tests to validate both python 3.4 and 2.x

Signed-off-by: Uilian Ries <uilianries@gmail.com>

* #5819 Fix broken tests

Signed-off-by: Uilian Ries <uilianries@gmail.com>

* Add cpp_info.name to cmake and pkg_config generators (#5598)

* Add cpp_info.name to cmake generators

* Fix unit tests to mimic real behavior

* cmake_paths test

* add test for cmake generator

* Add cmake_find_package test

* fix test in py3

* Applied cpp_info.name to pkg_config generator

* check different name in pkg_config

* refactor

* refactor download progress

* refactor bars

* check generator type

* minor changes

* put adapter

* change function name

* change upload to write

* fix names

* refactor written_chunks

* remove size query to progress bar

* fix download size return

* sync with develop

* add parallel argument

* fix add argument

* upload with threadpool

* manage bar positions

* fix output

* fix rewrite length

* output changes

* catch exceptions

* remove import

* print all errors

* change error message

* change default number of threads

* upload in parallel test

* minor changes

* adapt output on parallel upload scope

* fix format

* fix format

* fix test

* upload complete parallel

* minor changes

* minor changes

* add parallel upload

* merge changes

* fix test

* fix rest_client

* force non interactive mode with parallel uploads

* test behaviour

* fix test

* reorder

* add func

* minor changes

* minor changes

* add callback

* fix format

* add post_upload hook when no packages to upload

* prevent repeating headers

* minor fixes

* revert capabilities change

* output with tqdm

* remove output messages without revisions

* send messages to log

* add async comment

* number of threads in help

* change exception place

* move slots to progress bars

* remove attribute

* kill the thread when problems uploading recipe

* default_server_user

* remove check

* extra checks

* prevent upload packages if error

* fix test

* minor changes

* dont call hook

* change var name

* fix test

* remove debugging code

* simplify tests

* remove redundant code

* added mutex for hook lazy loading

* remove slots
  • Loading branch information
czoido committed Nov 26, 2019
1 parent 003fdf5 commit b9f82c3
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 48 deletions.
96 changes: 73 additions & 23 deletions conans/client/cmd/uploader.py
@@ -1,10 +1,13 @@
import os
import stat
import tarfile
import threading
import time
from collections import defaultdict
from multiprocessing.pool import ThreadPool

from conans.util import progress_bar
from conans.util.progress_bar import left_justify_message
from conans.client.remote_manager import is_package_snapshot_complete, calc_files_checksum
from conans.client.source import complete_recipe_sources
from conans.errors import ConanException, NotFoundException
Expand Down Expand Up @@ -73,21 +76,42 @@ def __init__(self, cache, user_io, remote_manager, loader, hook_manager):
self._remote_manager = remote_manager
self._loader = loader
self._hook_manager = hook_manager
self._upload_thread_pool = None
self._exceptions_list = []

def upload(self, reference_or_pattern, remotes, upload_recorder, package_id=None,
all_packages=None, confirm=False, retry=None, retry_wait=None, integrity_check=False,
policy=None, query=None):
policy=None, query=None, parallel_upload=False):
t1 = time.time()
refs, confirm = self._collects_refs_to_upload(package_id, reference_or_pattern, confirm)
refs_by_remote = self._collect_packages_to_upload(refs, confirm, remotes, all_packages,
query, package_id)
# Do the job

if parallel_upload:
self._upload_thread_pool = ThreadPool(8)
self._user_io.disable_input()
else:
self._upload_thread_pool = ThreadPool(1)

for remote, refs in refs_by_remote.items():
self._output.info("Uploading to remote '{}':".format(remote.name))
for (ref, conanfile, prefs) in refs:

def upload_ref(ref_conanfile_prefs):
ref, conanfile, prefs = ref_conanfile_prefs
self._upload_ref(conanfile, ref, prefs, retry, retry_wait,
integrity_check, policy, remote, upload_recorder, remotes)

self._upload_thread_pool.map(upload_ref,
[(ref, conanfile, prefs) for (ref, conanfile, prefs) in
refs])
self._upload_thread_pool.close()
self._upload_thread_pool.join()
for exception in self._exceptions_list:
self._output.error(str(exception))

if len(self._exceptions_list) > 0:
raise ConanException("Errors uploading some packages")

logger.debug("UPLOAD: Time manager upload: %f" % (time.time() - t1))

def _collects_refs_to_upload(self, package_id, reference_or_pattern, confirm):
Expand Down Expand Up @@ -185,27 +209,54 @@ def _upload_ref(self, conanfile, ref, prefs, retry, retry_wait, integrity_check,
# FIXME: because the recipe can have one and the package a different one
self._hook_manager.execute("pre_upload", conanfile_path=conanfile_path,
reference=ref, remote=recipe_remote)

self._output.info("Uploading %s to remote '%s'" % (str(ref), recipe_remote.name))
self._upload_recipe(ref, conanfile, retry, retry_wait, policy, recipe_remote, remotes)
upload_recorder.add_recipe(ref, recipe_remote.name, recipe_remote.url)
msg = "\rUploading %s to remote '%s'" % (str(ref), recipe_remote.name)
self._output.info(left_justify_message(msg))
try:
self._upload_recipe(ref, conanfile, retry, retry_wait, policy, recipe_remote, remotes)
upload_recorder.add_recipe(ref, recipe_remote.name, recipe_remote.url)
except ConanException as exc:
self._exceptions_list.append(exc)
return

# Now the binaries
if prefs:
total = len(prefs)
for index, pref in enumerate(prefs):
p_remote = recipe_remote
msg = ("Uploading package %d/%d: %s to '%s'" % (index+1, total, str(pref.id),
p_remote.name))
self._output.info(msg)
self._upload_package(pref, retry, retry_wait,
integrity_check, policy, p_remote)
upload_recorder.add_package(pref, p_remote.name, p_remote.url)

# FIXME: I think it makes no sense to specify a remote to "post_upload"
# FIXME: because the recipe can have one and the package a different one
self._hook_manager.execute("post_upload", conanfile_path=conanfile_path, reference=ref,
remote=recipe_remote)
p_remote = recipe_remote

def upload_package_index(index_pref):
try:
index, pref = index_pref
up_msg = "\rUploading package %d/%d: %s to '%s'" % (index + 1, total,
str(pref.id),
p_remote.name)
self._output.info(left_justify_message(up_msg))
self._upload_package(pref, retry, retry_wait,
integrity_check, policy, p_remote)
upload_recorder.add_package(pref, p_remote.name, p_remote.url)
return conanfile_path, ref, recipe_remote, None
except ConanException as exc:
return None, None, None, exc

def upload_package_callback(ret):
for cf_path, r_ref, r_rem, exc in ret:
if exc is None:
# FIXME: I think it makes no sense to specify a remote to "post_upload"
# FIXME: because the recipe can have one and the package a different one
self._hook_manager.execute("post_upload", conanfile_path=cf_path,
reference=r_ref, remote=r_rem)
else:
self._exceptions_list.append(exc)
# This doesn't wait for the packages to end, so the function returns
# and the "pool entry" for the recipe is released
self._upload_thread_pool.map_async(upload_package_index,
[(index, pref) for index, pref
in enumerate(prefs)],
callback=upload_package_callback)
else:
# FIXME: I think it makes no sense to specify a remote to "post_upload"
# FIXME: because the recipe can have one and the package a different one
self._hook_manager.execute("post_upload", conanfile_path=conanfile_path, reference=ref,
remote=recipe_remote)

def _upload_recipe(self, ref, conanfile, retry, retry_wait, policy, remote, remotes):

Expand Down Expand Up @@ -408,11 +459,10 @@ def _package_files_to_upload(self, pref, policy, the_files, remote):
return the_files, deleted

def _upload_recipe_end_msg(self, ref, remote):
msg = "Uploaded conan recipe '%s' to '%s'" % (str(ref), remote.name)
msg = "\rUploaded conan recipe '%s' to '%s'" % (str(ref), remote.name)
url = remote.url.replace("https://api.bintray.com/conan", "https://bintray.com")
msg += ": %s" % url
self._output.writeln("")
self._output.info(msg)
self._output.info(left_justify_message(msg))

def _package_integrity_check(self, pref, files, package_folder):
# If package has been modified remove tgz to regenerate it
Expand Down
6 changes: 5 additions & 1 deletion conans/client/command.py
Expand Up @@ -1344,6 +1344,9 @@ def upload(self, *args):
help="Uploads package only if recipe is the same as the remote one")
parser.add_argument("-j", "--json", default=None, action=OnceArgument,
help='json file path where the upload information will be written to')
parser.add_argument("--parallel", action='store_true', default=False,
help='Upload files in parallel using multiple threads '
'The default number of launched threads is 8')

args = parser.parse_args(*args)

Expand Down Expand Up @@ -1398,7 +1401,8 @@ def upload(self, *args):
query=args.query, remote_name=args.remote,
all_packages=args.all, policy=policy,
confirm=args.confirm, retry=args.retry,
retry_wait=args.retry_wait, integrity_check=args.check)
retry_wait=args.retry_wait, integrity_check=args.check,
parallel_upload=args.parallel)

except ConanException as exc:
info = exc.info
Expand Down
6 changes: 4 additions & 2 deletions conans/client/conan_api.py
Expand Up @@ -868,7 +868,8 @@ def search_packages(self, reference, query=None, remote_name=None, outdated=Fals

@api_method
def upload(self, pattern, package=None, remote_name=None, all_packages=False, confirm=False,
retry=None, retry_wait=None, integrity_check=False, policy=None, query=None):
retry=None, retry_wait=None, integrity_check=False, policy=None, query=None,
parallel_upload=False):
""" Uploads a package recipe and the generated binary packages to a specified remote
"""
upload_recorder = UploadRecorder()
Expand All @@ -877,7 +878,8 @@ def upload(self, pattern, package=None, remote_name=None, all_packages=False, co
remotes = self.app.load_remotes(remote_name=remote_name)
try:
uploader.upload(pattern, remotes, upload_recorder, package, all_packages, confirm,
retry, retry_wait, integrity_check, policy, query=query)
retry, retry_wait, integrity_check, policy, query=query,
parallel_upload=parallel_upload)
return upload_recorder.get_info()
except ConanException as exc:
upload_recorder.error = True
Expand Down
23 changes: 14 additions & 9 deletions conans/client/hook_manager.py
Expand Up @@ -3,6 +3,7 @@
import traceback
import uuid
from collections import defaultdict
from threading import Lock

from conans.client.output import ScopedOutput
from conans.client.tools.files import chdir
Expand Down Expand Up @@ -40,15 +41,19 @@ def __init__(self, hooks_folder, hook_names, output):
self.hooks = defaultdict(list)
self.output = output
self._attribute_checker_path = os.path.join(self._hooks_folder, "attribute_checker.py")

def create_default_hooks(self):
save(self._attribute_checker_path, attribute_checker_hook)
self._mutex = Lock()

def execute(self, method_name, **kwargs):
if not os.path.exists(self._attribute_checker_path):
self.create_default_hooks()
if not self.hooks:
self.load_hooks()
# It is necessary to protect the lazy loading of hooks with a mutex, because it can be
# concurrent (e.g. upload --parallel)
self._mutex.acquire()
try:
if not os.path.exists(self._attribute_checker_path):
save(self._attribute_checker_path, attribute_checker_hook)
if not self.hooks:
self.load_hooks()
finally:
self._mutex.release()

assert method_name in valid_hook_methods, \
"Method '{}' not in valid hooks methods".format(method_name)
Expand All @@ -61,9 +66,9 @@ def execute(self, method_name, **kwargs):

def load_hooks(self):
for name in self._hook_names:
self.load_hook(name)
self._load_hook(name)

def load_hook(self, hook_name):
def _load_hook(self, hook_name):
if not hook_name.endswith(".py"):
hook_name = "%s.py" % hook_name
hook_path = os.path.normpath(os.path.join(self._hooks_folder, hook_name))
Expand Down
5 changes: 2 additions & 3 deletions conans/client/rest/rest_client_v1.py
Expand Up @@ -130,10 +130,9 @@ def _upload_package(self, pref, files_to_upload, retry, retry_wait):
url = self.router.package_upload_urls(pref)
file_sizes = {filename: os.stat(abs_path).st_size for filename,
abs_path in files_to_upload.items()}
self._output.rewrite_line("Requesting upload urls...")
logger.debug("Requesting upload urls...")
urls = self._get_file_to_url_dict(url, data=file_sizes)
self._output.rewrite_line("Requesting upload urls...Done!")
self._output.writeln("")
logger.debug("Requesting upload urls...Done!")
self._upload_files(urls, files_to_upload, self._output, retry, retry_wait)

def _upload_files(self, file_urls, files, output, retry, retry_wait):
Expand Down
3 changes: 2 additions & 1 deletion conans/client/rest/uploader_downloader.py
@@ -1,6 +1,7 @@
import os
import traceback
import time
from copy import copy

from conans.util import progress_bar
from conans.client.rest import response_to_str
Expand All @@ -27,7 +28,7 @@ def upload(self, url, abs_path, auth=None, dedup=False, retry=None, retry_wait=N
retry_wait = retry_wait if retry_wait is not None else 5

# Send always the header with the Sha1
headers = headers or {}
headers = copy(headers) or {}
headers["X-Checksum-Sha1"] = sha1sum(abs_path)
if dedup:
dedup_headers = {"X-Checksum-Deploy": "true"}
Expand Down
62 changes: 61 additions & 1 deletion conans/test/functional/command/upload_complete_test.py
Expand Up @@ -17,7 +17,7 @@
from conans.test.utils.test_files import hello_conan_files, hello_source_files, temp_folder, \
uncompress_packaged_files
from conans.test.utils.tools import (NO_SETTINGS_PACKAGE_ID, TestClient, TestRequester, TestServer,
MockedUserIO, TestBufferConanOutput)
MockedUserIO, TestBufferConanOutput, GenConanfile)
from conans.util.files import load, mkdir, save

myconan1 = """
Expand Down Expand Up @@ -59,6 +59,19 @@ def put(self, *args, **kwargs):
return super(BadConnectionUploader, self).put(*args, **kwargs)


class FailOnReferencesUploader(BadConnectionUploader):
fail_on = ["lib1", "lib3"]

def __init__(self, *args, **kwargs):
super(BadConnectionUploader, self).__init__(*args, **kwargs)

def put(self, *args, **kwargs):
if any(ref in args[0] for ref in self.fail_on):
raise ConnectionError("Connection fails with lib2 and lib4 references!")
else:
return super(BadConnectionUploader, self).put(*args, **kwargs)


@unittest.skipIf(TestClient().cache.config.revisions_enabled,
"We cannot know the folder of the revision without knowing the hash of "
"the contents")
Expand Down Expand Up @@ -253,6 +266,53 @@ def upload_error_with_config_test(self):
client.run("upload Hello* --confirm --all")
self.assertEqual(str(client.out).count("ERROR: Pair file, error!"), 6)

def upload_parallel_error_test(self):
"""Cause an error in the parallel transfer and see some message"""
client = TestClient(requester_class=FailOnReferencesUploader, default_server_user=True)
client.save({"conanfile.py": GenConanfile()})
client.run('user -p password -r default user')
for index in range(4):
client.run('create . lib{}/1.0@user/channel'.format(index))
client.run('upload lib* --parallel -c --all -r default', assert_error=True)
self.assertIn("Connection fails with lib2 and lib4 references!", client.out)
self.assertIn("Execute upload again to retry upload the failed files", client.out)

def upload_parallel_success_test(self):
"""Upload 2 packages in parallel with success"""

client = TestClient(default_server_user=True)
client.save({"conanfile.py": GenConanfile()})
client.run('create . lib0/1.0@user/channel')
self.assertIn("lib0/1.0@user/channel: Package '{}' created".format(NO_SETTINGS_PACKAGE_ID),
client.out)
client.run('create . lib1/1.0@user/channel')
self.assertIn("lib1/1.0@user/channel: Package '{}' created".format(NO_SETTINGS_PACKAGE_ID),
client.out)
client.run('user -p password -r default user')
client.run('upload lib* --parallel -c --all -r default')
self.assertIn("Uploading lib0/1.0@user/channel to remote 'default'", client.out)
self.assertIn("Uploading lib1/1.0@user/channel to remote 'default'", client.out)
client.run('search lib0/1.0@user/channel -r default')
self.assertIn("lib0/1.0@user/channel", client.out)
client.run('search lib1/1.0@user/channel -r default')
self.assertIn("lib1/1.0@user/channel", client.out)

def upload_parallel_fail_on_interaction_test(self):
"""Upload 2 packages in parallel and fail because non_interactive forced"""

client = TestClient(default_server_user=True)
client.save({"conanfile.py": GenConanfile()})
num_references = 2
for index in range(num_references):
client.run('create . lib{}/1.0@user/channel'.format(index))
self.assertIn("lib{}/1.0@user/channel: Package '{}' created".format(
index,
NO_SETTINGS_PACKAGE_ID),
client.out)
client.run('user -c')
client.run('upload lib* --parallel -c --all -r default', assert_error=True)
self.assertIn("ERROR: Conan interactive mode disabled. [Remote: default]", client.out)

def upload_with_pattern_and_package_error_test(self):
files = hello_conan_files("Hello1", "1.2.1")
self.client.save(files)
Expand Down

0 comments on commit b9f82c3

Please sign in to comment.