diff --git a/bentoml/_internal/bento/build_dev_bentoml_whl.py b/bentoml/_internal/bento/build_dev_bentoml_whl.py index 2b576623145..25f29118dfd 100644 --- a/bentoml/_internal/bento/build_dev_bentoml_whl.py +++ b/bentoml/_internal/bento/build_dev_bentoml_whl.py @@ -28,9 +28,8 @@ def build_bentoml_editable_wheel(target_path: str) -> None: return try: - from build.env import IsolatedEnvBuilder - from build import ProjectBuilder + from build.env import IsolatedEnvBuilder except ModuleNotFoundError as e: raise MissingDependencyException(_exc_message) from e diff --git a/bentoml/_internal/yatai_client/__init__.py b/bentoml/_internal/yatai_client/__init__.py index d8d304e38a9..e0c84895974 100644 --- a/bentoml/_internal/yatai_client/__init__.py +++ b/bentoml/_internal/yatai_client/__init__.py @@ -4,11 +4,13 @@ import typing as t import tarfile import tempfile +import threading from typing import TYPE_CHECKING from pathlib import Path from tempfile import NamedTemporaryFile from functools import wraps from contextlib import contextmanager +from concurrent.futures import Future from concurrent.futures import ThreadPoolExecutor import fs @@ -51,13 +53,19 @@ from ..yatai_rest_api_client.schemas import CreateModelSchema from ..yatai_rest_api_client.schemas import ModelUploadStatus from ..yatai_rest_api_client.schemas import UpdateBentoSchema +from ..yatai_rest_api_client.schemas import CompletePartSchema from ..yatai_rest_api_client.schemas import BentoManifestSchema from ..yatai_rest_api_client.schemas import ModelManifestSchema +from ..yatai_rest_api_client.schemas import TransmissionStrategy from ..yatai_rest_api_client.schemas import FinishUploadBentoSchema from ..yatai_rest_api_client.schemas import FinishUploadModelSchema from ..yatai_rest_api_client.schemas import BentoRunnerResourceSchema from ..yatai_rest_api_client.schemas import CreateBentoRepositorySchema from ..yatai_rest_api_client.schemas import CreateModelRepositorySchema +from ..yatai_rest_api_client.schemas import CompleteMultipartUploadSchema +from ..yatai_rest_api_client.schemas import PreSignMultipartUploadUrlSchema + +FILE_CHUNK_SIZE = 100 * 1024 * 1024 # 100Mb class ObjectWrapper(object): @@ -185,12 +193,12 @@ def spin(self, *, text: str): self.spinner_progress.stop_task(task_id) self.spinner_progress.update(task_id, visible=False) - def push_bento(self, bento: "Bento", *, force: bool = False): + def push_bento(self, bento: "Bento", *, force: bool = False, threads: int = 10): with Live(self.progress_group): upload_task_id = self.transmission_progress.add_task( f'Pushing Bento "{bento.tag}"', start=False, visible=False ) - self._do_push_bento(bento, upload_task_id, force=force) + self._do_push_bento(bento, upload_task_id, force=force, threads=threads) def _do_push_bento( self, @@ -198,6 +206,7 @@ def _do_push_bento( upload_task_id: TaskID, *, force: bool = False, + threads: int = 10, ): yatai_rest_client = get_current_yatai_rest_api_client() name = bento.tag.name @@ -206,17 +215,19 @@ def _do_push_bento( raise BentoMLException(f"Bento {bento.tag} version cannot be None") info = bento.info model_tags = [m.tag for m in info.models] - model_store = bento._model_store + model_store = bento._model_store # type: ignore models = (model_store.get(name) for name in model_tags) with ThreadPoolExecutor(max_workers=max(len(model_tags), 1)) as executor: - def push_model(model: "Model"): + def push_model(model: "Model") -> None: model_upload_task_id = self.transmission_progress.add_task( f'Pushing model "{model.tag}"', start=False, visible=False ) - self._do_push_model(model, model_upload_task_id, force=force) + self._do_push_model( + model, model_upload_task_id, force=force, threads=threads + ) - futures = executor.map(push_model, models) + futures: t.Iterator[None] = executor.map(push_model, models) list(futures) with self.spin(text=f'Fetching Bento repository "{name}"'): bento_repository = yatai_rest_client.get_bento_repository( @@ -270,7 +281,7 @@ def push_model(model: "Model"): ) if not remote_bento: with self.spin(text=f'Registering Bento "{bento.tag}" with Yatai..'): - yatai_rest_client.create_bento( + remote_bento = yatai_rest_client.create_bento( bento_repository_name=bento_repository.name, req=CreateBentoSchema( description="", @@ -282,7 +293,7 @@ def push_model(model: "Model"): ) else: with self.spin(text=f'Updating Bento "{bento.tag}"..'): - yatai_rest_client.update_bento( + remote_bento = yatai_rest_client.update_bento( bento_repository_name=bento_repository.name, version=version, req=UpdateBentoSchema( @@ -290,15 +301,28 @@ def push_model(model: "Model"): labels=labels, ), ) - with self.spin(text=f'Getting a presigned upload url for "{bento.tag}" ..'): - remote_bento = yatai_rest_client.presign_bento_upload_url( - bento_repository_name=bento_repository.name, version=version - ) + + transmission_strategy: TransmissionStrategy = "proxy" + presigned_upload_url: str | None = None + + if remote_bento.transmission_strategy is not None: + transmission_strategy = remote_bento.transmission_strategy + else: + with self.spin( + text=f'Getting a presigned upload url for bento "{bento.tag}" ..' + ): + remote_bento = yatai_rest_client.presign_bento_upload_url( + bento_repository_name=bento_repository.name, version=version + ) + if remote_bento.presigned_upload_url: + transmission_strategy = "presigned_url" + presigned_upload_url = remote_bento.presigned_upload_url + with io.BytesIO() as tar_io: bento_dir_path = bento.path if bento_dir_path is None: raise BentoMLException(f'Bento "{bento}" path cannot be None') - with self.spin(text=f'Creating tar archive for Bento "{bento.tag}"..'): + with self.spin(text=f'Creating tar archive for bento "{bento.tag}"..'): with tarfile.open(fileobj=tar_io, mode="w:gz") as tar: def filter_( @@ -312,11 +336,11 @@ def filter_( tar.add(bento_dir_path, arcname="./", filter=filter_) tar_io.seek(0, 0) - if not remote_bento.presigned_urls_deprecated: - with self.spin(text=f'Start uploading Bento "{bento.tag}"..'): - yatai_rest_client.start_upload_bento( - bento_repository_name=bento_repository.name, version=version - ) + + with self.spin(text=f'Start uploading bento "{bento.tag}"..'): + yatai_rest_client.start_upload_bento( + bento_repository_name=bento_repository.name, version=version + ) file_size = tar_io.getbuffer().nbytes @@ -325,15 +349,19 @@ def filter_( ) self.transmission_progress.start_task(upload_task_id) + io_mutex = threading.Lock() + def io_cb(x: int): - self.transmission_progress.update(upload_task_id, advance=x) + with io_mutex: + self.transmission_progress.update(upload_task_id, advance=x) wrapped_file = CallbackIOWrapper( io_cb, tar_io, "read", ) - if remote_bento.presigned_urls_deprecated: + + if transmission_strategy == "proxy": try: yatai_rest_client.upload_bento( bento_repository_name=bento_repository.name, @@ -342,11 +370,11 @@ def io_cb(x: int): ) except Exception as e: # pylint: disable=broad-except self.log_progress.add_task( - f'[bold red]Failed to upload Bento "{bento.tag}"' + f'[bold red]Failed to upload bento "{bento.tag}"' ) raise e self.log_progress.add_task( - f'[bold green]Successfully pushed Bento "{bento.tag}"' + f'[bold green]Successfully pushed bento "{bento.tag}"' ) return finish_req = FinishUploadBentoSchema( @@ -354,14 +382,124 @@ def io_cb(x: int): reason="", ) try: - resp = requests.put( - remote_bento.presigned_upload_url, data=wrapped_file - ) - if resp.status_code != 200: - finish_req = FinishUploadBentoSchema( - status=BentoUploadStatus.FAILED, - reason=resp.text, - ) + if presigned_upload_url is not None: + resp = requests.put(presigned_upload_url, data=wrapped_file) + if resp.status_code != 200: + finish_req = FinishUploadBentoSchema( + status=BentoUploadStatus.FAILED, + reason=resp.text, + ) + else: + with self.spin( + text=f'Start multipart uploading Bento "{bento.tag}"...' + ): + remote_bento = yatai_rest_client.start_bento_multipart_upload( + bento_repository_name=bento_repository.name, + version=version, + ) + if not remote_bento.upload_id: + raise BentoMLException( + f'Failed to start multipart upload for Bento "{bento.tag}", upload_id is empty' + ) + + upload_id: str = remote_bento.upload_id + + chunks_count = file_size // FILE_CHUNK_SIZE + 1 + + def chunk_upload( + upload_id: str, chunk_number: int + ) -> FinishUploadBentoSchema | t.Tuple[str, int]: + with self.spin( + text=f'({chunk_number}/{chunks_count}) Presign multipart upload url of Bento "{bento.tag}"...' + ): + remote_bento = ( + yatai_rest_client.presign_bento_multipart_upload_url( + bento_repository_name=bento_repository.name, + version=version, + req=PreSignMultipartUploadUrlSchema( + upload_id=upload_id, + part_number=chunk_number, + ), + ) + ) + with self.spin( + text=f'({chunk_number}/{chunks_count}) Uploading chunk of Bento "{bento.tag}"...' + ): + + chunk = ( + tar_io.getbuffer()[ + (chunk_number - 1) + * FILE_CHUNK_SIZE : chunk_number + * FILE_CHUNK_SIZE + ] + if chunk_number < chunks_count + else tar_io.getbuffer()[ + (chunk_number - 1) * FILE_CHUNK_SIZE : + ] + ) + + with io.BytesIO(chunk) as chunk_io: + wrapped_file = CallbackIOWrapper( + io_cb, + chunk_io, + "read", + ) + + resp = requests.put( + remote_bento.presigned_upload_url, data=wrapped_file + ) + if resp.status_code != 200: + return FinishUploadBentoSchema( + status=BentoUploadStatus.FAILED, + reason=resp.text, + ) + return resp.headers["ETag"], chunk_number + + futures_: t.List[ + Future[FinishUploadBentoSchema | t.Tuple[str, int]] + ] = [] + + with ThreadPoolExecutor( + max_workers=min(max(chunks_count, 1), threads) + ) as executor: + for i in range(1, chunks_count + 1): + future = executor.submit( + chunk_upload, + upload_id, + i, + ) + futures_.append(future) + + parts: t.List[CompletePartSchema] = [] + + for future in futures_: + result = future.result() + if isinstance(result, FinishUploadBentoSchema): + finish_req = result + break + else: + etag, chunk_number = result + parts.append( + CompletePartSchema( + part_number=chunk_number, + etag=etag, + ) + ) + + with self.spin( + text=f'Completing multipart upload of Bento "{bento.tag}"...' + ): + remote_bento = ( + yatai_rest_client.complete_bento_multipart_upload( + bento_repository_name=bento_repository.name, + version=version, + req=CompleteMultipartUploadSchema( + upload_id=upload_id, + parts=parts, + ), + ) + ) + except Exception as e: # pylint: disable=broad-except finish_req = FinishUploadBentoSchema( status=BentoUploadStatus.FAILED, @@ -459,20 +597,40 @@ def pull_model(model_tag: Tag): futures = executor.map(pull_model, remote_bento.manifest.models) list(futures) + # Download bento files from yatai - with self.spin(text=f'Getting a presigned download url for bento "{_tag}"'): - remote_bento = yatai_rest_client.presign_bento_download_url( - name, version - ) - if not remote_bento.presigned_urls_deprecated: - response = requests.get( - remote_bento.presigned_download_url, stream=True - ) + transmission_strategy: TransmissionStrategy = "proxy" + presigned_download_url: str | None = None + + if remote_bento.transmission_strategy is not None: + transmission_strategy = remote_bento.transmission_strategy else: + with self.spin( + text=f'Getting a presigned download url for bento "{_tag}"' + ): + remote_bento = yatai_rest_client.presign_bento_download_url( + name, version + ) + if remote_bento.presigned_download_url: + presigned_download_url = remote_bento.presigned_download_url + transmission_strategy = "presigned_url" + + if transmission_strategy == "proxy": response = yatai_rest_client.download_bento( bento_repository_name=name, version=version, ) + else: + if presigned_download_url is None: + with self.spin( + text=f'Getting a presigned download url for bento "{_tag}"' + ): + remote_bento = yatai_rest_client.presign_bento_download_url( + name, version + ) + presigned_download_url = remote_bento.presigned_download_url + response = requests.get(presigned_download_url, stream=True) + if response.status_code != 200: raise BentoMLException( f'Failed to download bento "{_tag}": {response.text}' @@ -497,38 +655,46 @@ def pull_model(model_tag: Tag): ) tar_file.seek(0, 0) tar = tarfile.open(fileobj=tar_file, mode="r:gz") - with fs.open_fs("temp://") as temp_fs: - for member in tar.getmembers(): - f = tar.extractfile(member) - if f is None: - continue - p = Path(member.name) - if p.parent != Path("."): - temp_fs.makedirs(str(p.parent), recreate=True) - temp_fs.writebytes(member.name, f.read()) - bento = Bento.from_fs(temp_fs) - for model_tag in remote_bento.manifest.models: - with self.spin(text=f'Copying model "{model_tag}" to bento'): - copy_model( - model_tag, - src_model_store=model_store, - target_model_store=bento._model_store, # type: ignore - ) - bento = bento.save(bento_store) - self.log_progress.add_task( - f'[bold green]Successfully pulled bento "{_tag}"' - ) - return bento + with self.spin(text=f'Extracting bento "{_tag}" tar file'): + with fs.open_fs("temp://") as temp_fs: + for member in tar.getmembers(): + f = tar.extractfile(member) + if f is None: + continue + p = Path(member.name) + if p.parent != Path("."): + temp_fs.makedirs(str(p.parent), recreate=True) + temp_fs.writebytes(member.name, f.read()) + bento = Bento.from_fs(temp_fs) + for model_tag in remote_bento.manifest.models: + with self.spin( + text=f'Copying model "{model_tag}" to bento' + ): + copy_model( + model_tag, + src_model_store=model_store, + target_model_store=bento._model_store, # type: ignore + ) + bento = bento.save(bento_store) + self.log_progress.add_task( + f'[bold green]Successfully pulled bento "{_tag}"' + ) + return bento - def push_model(self, model: "Model", *, force: bool = False): + def push_model(self, model: "Model", *, force: bool = False, threads: int = 10): with Live(self.progress_group): upload_task_id = self.transmission_progress.add_task( f'Pushing model "{model.tag}"', start=False, visible=False ) - self._do_push_model(model, upload_task_id, force=force) + self._do_push_model(model, upload_task_id, force=force, threads=threads) def _do_push_model( - self, model: "Model", upload_task_id: TaskID, *, force: bool = False + self, + model: "Model", + upload_task_id: TaskID, + *, + force: bool = False, + threads: int = 10, ): yatai_rest_client = get_current_yatai_rest_api_client() name = model.tag.name @@ -564,7 +730,7 @@ def _do_push_model( for key, value in info.labels.items() ] with self.spin(text=f'Registering model "{model.tag}" with Yatai..'): - yatai_rest_client.create_model( + remote_model = yatai_rest_client.create_model( model_repository_name=model_repository.name, req=CreateModelSchema( description="", @@ -582,23 +748,33 @@ def _do_push_model( labels=labels, ), ) - with self.spin( - text=f'Getting a presigned upload url for model "{model.tag}"..' - ): - remote_model = yatai_rest_client.presign_model_upload_url( - model_repository_name=model_repository.name, version=version - ) + + transmission_strategy: TransmissionStrategy = "proxy" + presigned_upload_url: str | None = None + + if remote_model.transmission_strategy is not None: + transmission_strategy = remote_model.transmission_strategy + else: + with self.spin( + text=f'Getting a presigned upload url for Model "{model.tag}" ..' + ): + remote_model = yatai_rest_client.presign_model_upload_url( + model_repository_name=model_repository.name, version=version + ) + if remote_model.presigned_upload_url: + transmission_strategy = "presigned_url" + presigned_upload_url = remote_model.presigned_upload_url + with io.BytesIO() as tar_io: bento_dir_path = model.path with self.spin(text=f'Creating tar archive for model "{model.tag}"..'): with tarfile.open(fileobj=tar_io, mode="w:gz") as tar: tar.add(bento_dir_path, arcname="./") tar_io.seek(0, 0) - if not remote_model.presigned_urls_deprecated: - with self.spin(text=f'Start uploading model "{model.tag}"..'): - yatai_rest_client.start_upload_model( - model_repository_name=model_repository.name, version=version - ) + with self.spin(text=f'Start uploading model "{model.tag}"..'): + yatai_rest_client.start_upload_model( + model_repository_name=model_repository.name, version=version + ) file_size = tar_io.getbuffer().nbytes self.transmission_progress.update( upload_task_id, @@ -608,15 +784,18 @@ def _do_push_model( ) self.transmission_progress.start_task(upload_task_id) + io_mutex = threading.Lock() + def io_cb(x: int): - self.transmission_progress.update(upload_task_id, advance=x) + with io_mutex: + self.transmission_progress.update(upload_task_id, advance=x) wrapped_file = CallbackIOWrapper( io_cb, tar_io, "read", ) - if remote_model.presigned_urls_deprecated: + if transmission_strategy == "proxy": try: yatai_rest_client.upload_model( model_repository_name=model_repository.name, @@ -637,14 +816,124 @@ def io_cb(x: int): reason="", ) try: - resp = requests.put( - remote_model.presigned_upload_url, data=wrapped_file - ) - if resp.status_code != 200: - finish_req = FinishUploadModelSchema( - status=ModelUploadStatus.FAILED, - reason=resp.text, - ) + if presigned_upload_url is not None: + resp = requests.put(presigned_upload_url, data=wrapped_file) + if resp.status_code != 200: + finish_req = FinishUploadModelSchema( + status=ModelUploadStatus.FAILED, + reason=resp.text, + ) + else: + with self.spin( + text=f'Start multipart uploading Model "{model.tag}"...' + ): + remote_model = yatai_rest_client.start_model_multipart_upload( + model_repository_name=model_repository.name, + version=version, + ) + if not remote_model.upload_id: + raise BentoMLException( + f'Failed to start multipart upload for model "{model.tag}", upload_id is empty' + ) + + upload_id: str = remote_model.upload_id + + chunks_count = file_size // FILE_CHUNK_SIZE + 1 + + def chunk_upload( + upload_id: str, chunk_number: int + ) -> FinishUploadModelSchema | t.Tuple[str, int]: + with self.spin( + text=f'({chunk_number}/{chunks_count}) Presign multipart upload url of model "{model.tag}"...' + ): + remote_model = ( + yatai_rest_client.presign_model_multipart_upload_url( + model_repository_name=model_repository.name, + version=version, + req=PreSignMultipartUploadUrlSchema( + upload_id=upload_id, + part_number=chunk_number, + ), + ) + ) + + with self.spin( + text=f'({chunk_number}/{chunks_count}) Uploading chunk of model "{model.tag}"...' + ): + chunk = ( + tar_io.getbuffer()[ + (chunk_number - 1) + * FILE_CHUNK_SIZE : chunk_number + * FILE_CHUNK_SIZE + ] + if chunk_number < chunks_count + else tar_io.getbuffer()[ + (chunk_number - 1) * FILE_CHUNK_SIZE : + ] + ) + + with io.BytesIO(chunk) as chunk_io: + wrapped_file = CallbackIOWrapper( + io_cb, + chunk_io, + "read", + ) + + resp = requests.put( + remote_model.presigned_upload_url, data=wrapped_file + ) + if resp.status_code != 200: + return FinishUploadModelSchema( + status=ModelUploadStatus.FAILED, + reason=resp.text, + ) + return resp.headers["ETag"], chunk_number + + futures_: t.List[ + Future[FinishUploadModelSchema | t.Tuple[str, int]] + ] = [] + + with ThreadPoolExecutor( + max_workers=min(max(chunks_count, 1), threads) + ) as executor: + for i in range(1, chunks_count + 1): + future = executor.submit( + chunk_upload, + upload_id, + i, + ) + futures_.append(future) + + parts: t.List[CompletePartSchema] = [] + + for future in futures_: + result = future.result() + if isinstance(result, FinishUploadModelSchema): + finish_req = result + break + else: + etag, chunk_number = result + parts.append( + CompletePartSchema( + part_number=chunk_number, + etag=etag, + ) + ) + + with self.spin( + text=f'Completing multipart upload of model "{model.tag}"...' + ): + remote_model = ( + yatai_rest_client.complete_model_multipart_upload( + model_repository_name=model_repository.name, + version=version, + req=CompleteMultipartUploadSchema( + upload_id=upload_id, + parts=parts, + ), + ) + ) + except Exception as e: # pylint: disable=broad-except finish_req = FinishUploadModelSchema( status=ModelUploadStatus.FAILED, @@ -716,16 +1005,41 @@ def _do_pull_model( if not remote_model: raise BentoMLException(f'Model "{_tag}" not found on Yatai') - if not remote_model.presigned_urls_deprecated: - response = requests.get(remote_model.presigned_download_url, stream=True) - if response.status_code != 200: - raise BentoMLException( - f'Failed to download model "{_tag}": {response.text}' - ) + + # Download model files from yatai + transmission_strategy: TransmissionStrategy = "proxy" + presigned_download_url: str | None = None + + if remote_model.transmission_strategy is not None: + transmission_strategy = remote_model.transmission_strategy else: + with self.spin(text=f'Getting a presigned download url for model "{_tag}"'): + remote_model = yatai_rest_client.presign_model_download_url( + name, version + ) + if remote_model.presigned_download_url: + presigned_download_url = remote_model.presigned_download_url + transmission_strategy = "presigned_url" + + if transmission_strategy == "proxy": response = yatai_rest_client.download_model( model_repository_name=name, version=version ) + else: + if presigned_download_url is None: + with self.spin( + text=f'Getting a presigned download url for model "{_tag}"' + ): + remote_model = yatai_rest_client.presign_model_download_url( + name, version + ) + presigned_download_url = remote_model.presigned_download_url + + response = requests.get(presigned_download_url, stream=True) + if response.status_code != 200: + raise BentoMLException( + f'Failed to download model "{_tag}": {response.text}' + ) total_size_in_bytes = int(response.headers.get("content-length", 0)) block_size = 1024 # 1 Kibibyte @@ -745,20 +1059,21 @@ def _do_pull_model( ) tar_file.seek(0, 0) tar = tarfile.open(fileobj=tar_file, mode="r:gz") - with fs.open_fs("temp://") as temp_fs: - for member in tar.getmembers(): - f = tar.extractfile(member) - if f is None: - continue - p = Path(member.name) - if p.parent != Path("."): - temp_fs.makedirs(str(p.parent), recreate=True) - temp_fs.writebytes(member.name, f.read()) - model = Model.from_fs(temp_fs).save(model_store) - self.log_progress.add_task( - f'[bold green]Successfully pulled model "{_tag}"' - ) - return model + with self.spin(text=f'Extracting model "{_tag}" tar file'): + with fs.open_fs("temp://") as temp_fs: + for member in tar.getmembers(): + f = tar.extractfile(member) + if f is None: + continue + p = Path(member.name) + if p.parent != Path("."): + temp_fs.makedirs(str(p.parent), recreate=True) + temp_fs.writebytes(member.name, f.read()) + model = Model.from_fs(temp_fs).save(model_store) + self.log_progress.add_task( + f'[bold green]Successfully pulled model "{_tag}"' + ) + return model yatai_client = YataiClient() diff --git a/bentoml/_internal/yatai_rest_api_client/schemas.py b/bentoml/_internal/yatai_rest_api_client/schemas.py index 5db855ded12..a293961a46a 100644 --- a/bentoml/_internal/yatai_rest_api_client/schemas.py +++ b/bentoml/_internal/yatai_rest_api_client/schemas.py @@ -4,6 +4,7 @@ from typing import Dict from typing import List from typing import Type +from typing import Literal from typing import TypeVar from typing import Optional from datetime import datetime @@ -164,6 +165,9 @@ class BentoManifestSchema: runners: Optional[List[BentoRunnerSchema]] = attr.field(factory=list) +TransmissionStrategy = Literal["presigned_url", "proxy"] + + @attr.define class BentoSchema(ResourceSchema): description: str @@ -175,7 +179,9 @@ class BentoSchema(ResourceSchema): presigned_download_url: str manifest: BentoManifestSchema - presigned_urls_deprecated: Optional[bool] = attr.field(default=None) + transmission_strategy: TransmissionStrategy | None = attr.field(default=None) + upload_id: str | None = attr.field(default=None) + upload_started_at: Optional[datetime] = attr.field(default=None) upload_finished_at: Optional[datetime] = attr.field(default=None) build_at: datetime = attr.field(factory=datetime.now) @@ -202,6 +208,24 @@ class UpdateBentoSchema: labels: Optional[List[LabelItemSchema]] = attr.field(default=None) +@attr.define +class PreSignMultipartUploadUrlSchema: + upload_id: str + part_number: int + + +@attr.define +class CompletePartSchema: + part_number: int + etag: str + + +@attr.define +class CompleteMultipartUploadSchema: + parts: List[CompletePartSchema] + upload_id: str + + @attr.define class FinishUploadBentoSchema: status: Optional[BentoUploadStatus] @@ -250,7 +274,9 @@ class ModelSchema(ResourceSchema): presigned_download_url: str manifest: ModelManifestSchema - presigned_urls_deprecated: Optional[bool] = attr.field(default=None) + transmission_strategy: Optional[TransmissionStrategy] = attr.field(default=None) + upload_id: Optional[str] = attr.field(default=None) + upload_started_at: Optional[datetime] = attr.field(default=None) upload_finished_at: Optional[datetime] = attr.field(default=None) build_at: datetime = attr.field(factory=datetime.now) diff --git a/bentoml/_internal/yatai_rest_api_client/yatai.py b/bentoml/_internal/yatai_rest_api_client/yatai.py index 2dd8c32758e..09703b31df3 100644 --- a/bentoml/_internal/yatai_rest_api_client/yatai.py +++ b/bentoml/_internal/yatai_rest_api_client/yatai.py @@ -20,6 +20,8 @@ from .schemas import FinishUploadModelSchema from .schemas import CreateBentoRepositorySchema from .schemas import CreateModelRepositorySchema +from .schemas import CompleteMultipartUploadSchema +from .schemas import PreSignMultipartUploadUrlSchema from ...exceptions import YataiRESTApiClientError from ..configuration import BENTOML_VERSION @@ -140,6 +142,45 @@ def presign_bento_download_url( self._check_resp(resp) return schema_from_json(resp.text, BentoSchema) + def start_bento_multipart_upload( + self, bento_repository_name: str, version: str + ) -> BentoSchema: + url = urljoin( + self.endpoint, + f"/api/v1/bento_repositories/{bento_repository_name}/bentos/{version}/start_multipart_upload", + ) + resp = self.session.patch(url) + self._check_resp(resp) + return schema_from_json(resp.text, BentoSchema) + + def presign_bento_multipart_upload_url( + self, + bento_repository_name: str, + version: str, + req: PreSignMultipartUploadUrlSchema, + ) -> BentoSchema: + url = urljoin( + self.endpoint, + f"/api/v1/bento_repositories/{bento_repository_name}/bentos/{version}/presign_multipart_upload_url", + ) + resp = self.session.patch(url, data=schema_to_json(req)) + self._check_resp(resp) + return schema_from_json(resp.text, BentoSchema) + + def complete_bento_multipart_upload( + self, + bento_repository_name: str, + version: str, + req: CompleteMultipartUploadSchema, + ) -> BentoSchema: + url = urljoin( + self.endpoint, + f"/api/v1/bento_repositories/{bento_repository_name}/bentos/{version}/complete_multipart_upload", + ) + resp = self.session.patch(url, data=schema_to_json(req)) + self._check_resp(resp) + return schema_from_json(resp.text, BentoSchema) + def start_upload_bento( self, bento_repository_name: str, version: str ) -> BentoSchema: @@ -255,6 +296,45 @@ def presign_model_download_url( self._check_resp(resp) return schema_from_json(resp.text, ModelSchema) + def start_model_multipart_upload( + self, model_repository_name: str, version: str + ) -> ModelSchema: + url = urljoin( + self.endpoint, + f"/api/v1/model_repositories/{model_repository_name}/models/{version}/start_multipart_upload", + ) + resp = self.session.patch(url) + self._check_resp(resp) + return schema_from_json(resp.text, ModelSchema) + + def presign_model_multipart_upload_url( + self, + model_repository_name: str, + version: str, + req: PreSignMultipartUploadUrlSchema, + ) -> ModelSchema: + url = urljoin( + self.endpoint, + f"/api/v1/model_repositories/{model_repository_name}/models/{version}/presign_multipart_upload_url", + ) + resp = self.session.patch(url, data=schema_to_json(req)) + self._check_resp(resp) + return schema_from_json(resp.text, ModelSchema) + + def complete_model_multipart_upload( + self, + model_repository_name: str, + version: str, + req: CompleteMultipartUploadSchema, + ) -> ModelSchema: + url = urljoin( + self.endpoint, + f"/api/v1/model_repositories/{model_repository_name}/models/{version}/complete_multipart_upload", + ) + resp = self.session.patch(url, data=schema_to_json(req)) + self._check_resp(resp) + return schema_from_json(resp.text, ModelSchema) + def start_upload_model( self, model_repository_name: str, version: str ) -> ModelSchema: diff --git a/bentoml_cli/bentos.py b/bentoml_cli/bentos.py index 551652b33a6..93bb449937e 100644 --- a/bentoml_cli/bentos.py +++ b/bentoml_cli/bentos.py @@ -254,12 +254,18 @@ def pull(bento_tag: str, force: bool) -> None: # type: ignore (not accessed) default=False, help="Forced push to yatai even if it exists in yatai", ) - def push(bento_tag: str, force: bool) -> None: # type: ignore (not accessed) + @click.option( + "-t", + "--threads", + default=10, + help="Number of threads to use for upload", + ) + def push(bento_tag: str, force: bool, threads: int) -> None: # type: ignore (not accessed) """Push Bento to a yatai server.""" bento_obj = bento_store.get(bento_tag) if not bento_obj: raise click.ClickException(f"Bento {bento_tag} not found in local store") - yatai_client.push_bento(bento_obj, force=force) + yatai_client.push_bento(bento_obj, force=force, threads=threads) @cli.command() @click.argument("build_ctx", type=click.Path(), default=".") diff --git a/bentoml_cli/models.py b/bentoml_cli/models.py index adde121ee49..2609d8ee4f3 100644 --- a/bentoml_cli/models.py +++ b/bentoml_cli/models.py @@ -244,9 +244,15 @@ def pull(model_tag: str, force: bool): # type: ignore (not accessed) default=False, help="Forced push to yatai even if it exists in yatai", ) - def push(model_tag: str, force: bool): # type: ignore (not accessed) + @click.option( + "-t", + "--threads", + default=10, + help="Number of threads to use for upload", + ) + def push(model_tag: str, force: bool, threads: int = 10): # type: ignore (not accessed) """Push Model to a yatai server.""" model_obj = model_store.get(model_tag) if not model_obj: raise click.ClickException(f"Model {model_tag} not found in local store") - yatai_client.push_model(model_obj, force=force) + yatai_client.push_model(model_obj, force=force, threads=threads)