diff --git a/bentoml/testing/grpc/__init__.py b/bentoml/testing/grpc/__init__.py new file mode 100644 index 00000000000..dda3c63851b --- /dev/null +++ b/bentoml/testing/grpc/__init__.py @@ -0,0 +1,155 @@ +from __future__ import annotations + +import typing as t +import traceback +from typing import TYPE_CHECKING +from contextlib import contextmanager +from contextlib import asynccontextmanager +from concurrent.futures import ThreadPoolExecutor + +from bentoml._internal.utils import LazyLoader + +from ._io import make_pb_ndarray +from ._io import randomize_pb_ndarray + +if TYPE_CHECKING: + import grpc + from grpc import aio + from grpc.aio._channel import Channel + from google.protobuf.message import Message + + from bentoml.grpc.v1alpha1 import service_pb2 as pb +else: + from bentoml.grpc.utils import import_generated_stubs + + pb, _ = import_generated_stubs() + exception_msg = ( + "'grpcio' is not installed. Please install it with 'pip install -U grpcio'" + ) + grpc = LazyLoader("grpc", globals(), "grpc", exc_msg=exception_msg) + aio = LazyLoader("aio", globals(), "grpc.aio", exc_msg=exception_msg) + +__all__ = [ + "async_client_call", + "randomize_pb_ndarray", + "make_pb_ndarray", + "create_channel", + "make_standalone_server", +] + + +async def async_client_call( + method: str, + channel: Channel, + data: dict[str, Message | pb.Part | bytes | str | dict[str, t.Any]], + assert_data: pb.Response | t.Callable[[pb.Response], bool] | None = None, + assert_code: grpc.StatusCode | None = None, + assert_details: str | None = None, + timeout: int | None = None, + sanity: bool = True, +) -> pb.Response: + """ + Note that to use this function, 'channel' should not be created with any client interceptors, + since we will handle interceptors' lifecycle separately. + + This function will also mimic the generated stubs function 'Call' from given 'channel'. + + Args: + method: The method name to call. + channel: The given aio.Channel to use for invoking the RPC. + data: The data to send to the server. + assert_data: The data to assert against the response. + assert_code: The code to assert against the response. + assert_details: The details to assert against the response. + timeout: The timeout for the RPC. + sanity: Whether to perform sanity check on the response. + + Returns: + The response from the server. + """ + from bentoml.testing.grpc.interceptors import AssertClientInterceptor + + if assert_code is None: + # by default, we want to check if the request is healthy + assert_code = grpc.StatusCode.OK + assert ( + len( + list( + filter( + lambda x: len(x) != 0, + map( + lambda stack: getattr(channel, stack), + [ + "_unary_unary_interceptors", + "_unary_stream_interceptors", + "_stream_unary_interceptors", + "_stream_stream_interceptors", + ], + ), + ) + ) + ) + == 0 + ), "'channel' shouldn't have any interceptors." + try: + # we will handle adding our testing interceptors here. + # prefer not to use private attributes, but this will do + channel._unary_unary_interceptors.append( # type: ignore (private warning) + AssertClientInterceptor( + assert_code=assert_code, assert_details=assert_details + ) + ) + Call = channel.unary_unary( + "/bentoml.grpc.v1alpha1.BentoService/Call", + request_serializer=pb.Request.SerializeToString, + response_deserializer=pb.Response.FromString, + ) + output = await t.cast( + t.Awaitable[pb.Response], + Call(pb.Request(api_name=method, **data), timeout=timeout), + ) + if sanity: + assert output + if assert_data: + try: + if callable(assert_data): + assert assert_data(output) + else: + assert output == assert_data + except AssertionError: + raise AssertionError(f"Failed while checking data: {output}") from None + return output + finally: + # we will reset interceptors per call + channel._unary_unary_interceptors = [] # type: ignore (private warning) + + +@asynccontextmanager +async def create_channel( + host_url: str, interceptors: t.Sequence[aio.ClientInterceptor] | None = None +) -> t.AsyncGenerator[Channel, None]: + channel: Channel | None = None + try: + async with aio.insecure_channel(host_url, interceptors=interceptors) as channel: + # create a blocking call to wait til channel is ready. + await channel.channel_ready() + yield channel + except aio.AioRpcError as e: + traceback.print_exc() + raise e from None + finally: + if channel: + await channel.close() + + +@contextmanager +def make_standalone_server( + bind_address: str, interceptors: t.Sequence[aio.ServerInterceptor] | None = None +) -> t.Generator[aio.Server, None, None]: + server = aio.server( + interceptors=interceptors, + migration_thread_pool=ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + server.add_insecure_port(bind_address) + yield server diff --git a/bentoml/testing/grpc/_io.py b/bentoml/testing/grpc/_io.py new file mode 100644 index 00000000000..fabd91b9d57 --- /dev/null +++ b/bentoml/testing/grpc/_io.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import typing as t +from typing import TYPE_CHECKING + +from bentoml.exceptions import BentoMLException +from bentoml._internal.utils import LazyLoader + +if TYPE_CHECKING: + import numpy as np + from numpy.typing import NDArray + + from bentoml.grpc.v1alpha1 import service_pb2 as pb +else: + from bentoml.grpc.utils import import_generated_stubs + + pb, _ = import_generated_stubs() + np = LazyLoader("np", globals(), "numpy") + + +def randomize_pb_ndarray(shape: tuple[int, ...]) -> pb.NDArray: + arr: NDArray[np.float32] = t.cast("NDArray[np.float32]", np.random.rand(*shape)) + return pb.NDArray( + shape=list(shape), dtype=pb.NDArray.DTYPE_FLOAT, float_values=arr.ravel() + ) + + +def make_pb_ndarray(arr: NDArray[t.Any]) -> pb.NDArray: + from bentoml._internal.io_descriptors.numpy import npdtype_to_dtypepb_map + from bentoml._internal.io_descriptors.numpy import npdtype_to_fieldpb_map + + try: + fieldpb = npdtype_to_fieldpb_map()[arr.dtype] + dtypepb = npdtype_to_dtypepb_map()[arr.dtype] + return pb.NDArray( + **{ + fieldpb: arr.ravel().tolist(), + "dtype": dtypepb, + "shape": tuple(arr.shape), + }, + ) + except KeyError: + raise BentoMLException( + f"Unsupported dtype '{arr.dtype}' for response message.", + ) from None diff --git a/bentoml/testing/grpc/interceptors.py b/bentoml/testing/grpc/interceptors.py new file mode 100644 index 00000000000..ea7ba176999 --- /dev/null +++ b/bentoml/testing/grpc/interceptors.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import typing as t +from typing import TYPE_CHECKING + +from bentoml._internal.utils import LazyLoader + +if TYPE_CHECKING: + import grpc + from grpc import aio + + from bentoml.grpc.types import Request + from bentoml.grpc.types import BentoUnaryUnaryCall +else: + aio = LazyLoader("aio", globals(), "grpc.aio") + + +class AssertClientInterceptor(aio.UnaryUnaryClientInterceptor): + def __init__( + self, + assert_code: grpc.StatusCode | None = None, + assert_details: str | None = None, + assert_trailing_metadata: aio.Metadata | None = None, + ): + self._assert_code = assert_code + self._assert_details = assert_details + self._assert_trailing_metadata = assert_trailing_metadata + + async def intercept_unary_unary( # type: ignore (unable to infer types from parameters) + self, + continuation: t.Callable[[aio.ClientCallDetails, Request], BentoUnaryUnaryCall], + client_call_details: aio.ClientCallDetails, + request: Request, + ) -> BentoUnaryUnaryCall: + # Note that we cast twice here since grpc.aio._call.UnaryUnaryCall + # implements __await__, which returns ResponseType. However, pyright + # are unable to determine types from given mixin. + # + # continuation(client_call_details, request) -> call: UnaryUnaryCall + # await call -> ResponseType + call = await t.cast( + "t.Awaitable[BentoUnaryUnaryCall]", + continuation(client_call_details, request), + ) + try: + code = await call.code() + details = await call.details() + trailing_metadata = await call.trailing_metadata() + if self._assert_code: + assert ( + code == self._assert_code + ), f"{repr(call)} returns {await call.code()} while expecting {self._assert_code}." + if self._assert_details: + assert ( + self._assert_details in details + ), f"'{self._assert_details}' is not in {await call.details()}." + if self._assert_trailing_metadata: + assert ( + self._assert_trailing_metadata == trailing_metadata + ), f"Trailing metadata '{trailing_metadata}' while expecting '{self._assert_trailing_metadata}'." + return call + except AssertionError as e: + raise e from None diff --git a/bentoml/testing/server.py b/bentoml/testing/server.py index d520e0a88d2..f040ee62f56 100644 --- a/bentoml/testing/server.py +++ b/bentoml/testing/server.py @@ -1,14 +1,13 @@ -# pylint: disable=redefined-outer-name # pragma: no cover +# pylint: disable=redefined-outer-name,not-context-manager from __future__ import annotations import os -import re import sys import time import socket import typing as t import urllib -import logging +import asyncio import itertools import contextlib import subprocess @@ -18,21 +17,29 @@ from typing import TYPE_CHECKING from contextlib import contextmanager -from .._internal.tag import Tag -from .._internal.utils import reserve_free_port -from .._internal.utils import cached_contextmanager -from .._internal.utils.platform import kill_subprocess_tree - -logger = logging.getLogger("bentoml") +import psutil +from bentoml._internal.tag import Tag +from bentoml._internal.utils import LazyLoader +from bentoml._internal.utils import reserve_free_port +from bentoml._internal.utils import cached_contextmanager if TYPE_CHECKING: + from grpc import aio + from grpc_health.v1 import health_pb2 as pb_health from aiohttp.typedefs import LooseHeaders from starlette.datastructures import Headers from starlette.datastructures import FormData + from bentoml._internal.bento.bento import Bento + + DeploymentMode = t.Annotated[str, t.Literal["standalone", "distributed", "docker"]] +else: + pb_health = LazyLoader("pb_health", globals(), "grpc_health.v1.health_pb2") + aio = LazyLoader("aio", globals(), "grpc.aio") + -async def parse_multipart_form(headers: "Headers", body: bytes) -> "FormData": +async def parse_multipart_form(headers: Headers, body: bytes) -> FormData: """ parse starlette forms from headers and body """ @@ -51,10 +58,10 @@ async def async_bytesio(bytes_: bytes) -> t.AsyncGenerator[bytes, None]: async def async_request( method: str, url: str, - headers: t.Optional["LooseHeaders"] = None, + headers: LooseHeaders | None = None, data: t.Any = None, - timeout: t.Optional[int] = None, -) -> t.Tuple[int, "Headers", bytes]: + timeout: int | None = None, +) -> tuple[int, Headers, bytes]: """ A HTTP client with async API. """ @@ -75,91 +82,126 @@ async def async_request( return r.status, Headers(headers), r_body -def _wait_until_api_server_ready( +def kill_subprocess_tree(p: subprocess.Popen[t.Any]) -> None: + """ + Tell the process to terminate and kill all of its children. Availabe both on Windows and Linux. + Note: It will return immediately rather than wait for the process to terminate. + + Args: + p: subprocess.Popen object + """ + if psutil.WINDOWS: + subprocess.call(["taskkill", "/F", "/T", "/PID", str(p.pid)]) + else: + p.terminate() + + +async def server_warmup( host_url: str, timeout: float, + grpc: bool = False, check_interval: float = 1, - popen: t.Optional["subprocess.Popen[t.Any]"] = None, + popen: subprocess.Popen[t.Any] | None = None, + service_name: str | None = None, ) -> bool: + from bentoml.testing.grpc import create_channel + start_time = time.time() - proxy_handler = urllib.request.ProxyHandler({}) - opener = urllib.request.build_opener(proxy_handler) - logger.info("Waiting for host %s to be ready..", host_url) + print("Waiting for host %s to be ready.." % host_url) while time.time() - start_time < timeout: try: if popen and popen.poll() is not None: return False - elif opener.open(f"http://{host_url}/readyz", timeout=1).status == 200: - return True + elif grpc: + if service_name is None: + service_name = "bentoml.grpc.v1alpha1.BentoService" + async with create_channel(host_url) as channel: + Check = channel.unary_unary( + "/grpc.health.v1.Health/Check", + request_serializer=pb_health.HealthCheckRequest.SerializeToString, # type: ignore (no grpc_health type) + response_deserializer=pb_health.HealthCheckResponse.FromString, # type: ignore (no grpc_health type) + ) + resp = await t.cast( + t.Awaitable[pb_health.HealthCheckResponse], + Check( + pb_health.HealthCheckRequest(service=service_name), + timeout=timeout, + ), + ) + if resp.status == pb_health.HealthCheckResponse.SERVING: # type: ignore (no generated enum types) + return True + else: + time.sleep(check_interval) else: - time.sleep(check_interval) + proxy_handler = urllib.request.ProxyHandler({}) + opener = urllib.request.build_opener(proxy_handler) + if opener.open(f"http://{host_url}/readyz", timeout=1).status == 200: + return True + else: + time.sleep(check_interval) except ( + aio.AioRpcError, ConnectionError, urllib.error.URLError, socket.timeout, ) as e: - logger.info(f"[{e}]retrying to connect to the host {host_url}...") - logger.error(e) + print(f"[{e}] Retrying to connect to the host {host_url}...") time.sleep(check_interval) - logger.info( - f"Timed out waiting {timeout} seconds for Server {host_url} to be ready, " - ) + print(f"Timed out waiting {timeout} seconds for Server {host_url} to be ready.") return False @cached_contextmanager("{project_path}") -def bentoml_build(project_path: str) -> t.Generator["Tag", None, None]: +def bentoml_build(project_path: str) -> t.Generator[Bento, None, None]: """ Build a BentoML project. """ - logger.info(f"Building bento: {project_path}") - output = subprocess.check_output( - ["bentoml", "build", project_path], - stderr=subprocess.STDOUT, - env=dict(os.environ, COLUMNS="200"), - ) - match = re.search( - r'Bento\(tag="([A-Za-z0-9\-_\.]+:[a-z0-9]+)"\)', - output.decode(), - ) - assert match, f"Build failed. The details:\n {output.decode()}" - tag = Tag.from_taglike(match[1]) - yield tag - logger.info(f"Deleting bento: {tag}") - subprocess.call(["bentoml", "delete", "-y", str(tag)]) + from bentoml import bentos + + bento = bentos.build_bentofile(build_ctx=project_path) + yield bento @cached_contextmanager("{bento_tag}, {image_tag}") def bentoml_containerize( - bento_tag: t.Union[str, "Tag"], - image_tag: t.Optional[str] = None, + bento_tag: str | Tag, image_tag: str | None = None ) -> t.Generator[str, None, None]: """ Build the docker image from a saved bento, yield the docker image tag """ + from bentoml import bentos + bento_tag = Tag.from_taglike(bento_tag) if image_tag is None: image_tag = bento_tag.name - logger.info(f"Building bento server docker image: {bento_tag}") - subprocess.check_call(["bentoml", "containerize", str(bento_tag), "-t", image_tag]) - yield image_tag - logger.info(f"Removing bento server docker image: {image_tag}") - subprocess.call(["docker", "rmi", image_tag]) + try: + print(f"Building bento server docker image: {bento_tag}") + bentos.containerize( + str(bento_tag), docker_image_tag=[image_tag], progress="plain" + ) + yield image_tag + finally: + print(f"Removing bento server docker image: {image_tag}") + subprocess.call(["docker", "rmi", image_tag]) -@cached_contextmanager("{image_tag}, {config_file}") -def run_bento_server_in_docker( +@cached_contextmanager("{image_tag}, {config_file}, {use_grpc}") +def run_bento_server_docker( image_tag: str, - config_file: t.Optional[str] = None, - timeout: float = 40, + config_file: str | None = None, + use_grpc: bool = False, + timeout: float = 90, + host: str = "0.0.0.0", ): """ Launch a bentoml service container from a docker image, yield the host URL """ + from bentoml._internal.configuration.containers import BentoMLContainer + container_name = f"bentoml-test-{image_tag}-{hash(config_file)}" - with reserve_free_port() as port: + with reserve_free_port(enable_so_reuseport=use_grpc) as port: pass - + bind_port = "3000" cmd = [ "docker", "run", @@ -167,133 +209,146 @@ def run_bento_server_in_docker( "--name", container_name, "--publish", - f"{port}:3000", - "--env", - "BENTOML_LOG_STDOUT=true", - "--env", - "BENTOML_LOG_STDERR=true", + f"{port}:{bind_port}", + "-v", + f"{os.path.abspath(BentoMLContainer.prometheus_multiproc_dir.get())}:/home/bentoml/prometheus_multiproc_dir", ] - + if os.environ.get("GITHUB_ACTIONS"): + # running this on actions, we need to access as root to mount the volume + cmd.extend(["--user", "root"]) if config_file is not None: cmd.extend(["--env", "BENTOML_CONFIG=/home/bentoml/bentoml_config.yml"]) cmd.extend( ["-v", f"{os.path.abspath(config_file)}:/home/bentoml/bentoml_config.yml"] ) + if use_grpc: + bind_prom_port = BentoMLContainer.grpc.metrics.port.get() + cmd.extend(["--publish", f"{bind_prom_port}:{bind_prom_port}"]) cmd.append(image_tag) - - logger.info(f"Running API server docker image: {cmd}") + if use_grpc: + cmd.extend(["serve-grpc", "--production", "--enable-reflection"]) + print(f"Running API server docker image: '{' '.join(cmd)}'") with subprocess.Popen( cmd, stdin=subprocess.PIPE, encoding="utf-8", ) as proc: try: - host_url = f"127.0.0.1:{port}" - if _wait_until_api_server_ready(host_url, timeout, popen=proc): + host_url = f"{host}:{port}" + if asyncio.run( + server_warmup(host_url, timeout=timeout, popen=proc, grpc=use_grpc) + ): yield host_url else: raise RuntimeError( f"API server {host_url} failed to start within {timeout} seconds" - ) + ) from None finally: + print(f"Stopping Bento container {container_name}...") subprocess.call(["docker", "stop", container_name]) time.sleep(1) @contextmanager -def run_bento_server( +def run_bento_server_standalone( bento: str, - workdir: t.Optional[str] = None, - config_file: t.Optional[str] = None, - dev_server: bool = False, + use_grpc: bool = False, + config_file: str | None = None, timeout: float = 90, + host: str = "0.0.0.0", ): """ Launch a bentoml service directly by the bentoml CLI, yields the host URL. """ - workdir = workdir if workdir is not None else "./" - my_env = os.environ.copy() + from bentoml._internal.configuration.containers import BentoMLContainer + + copied = os.environ.copy() if config_file is not None: - my_env["BENTOML_CONFIG"] = os.path.abspath(config_file) - with reserve_free_port() as port: - cmd = [sys.executable, "-m", "bentoml", "serve"] - if not dev_server: - cmd += ["--production"] - if port: - cmd += ["--port", f"{port}"] - cmd += [bento] - cmd += ["--working-dir", workdir] - logger.info(f"Running command: `{cmd}`") + copied["BENTOML_CONFIG"] = os.path.abspath(config_file) + copied["BENTOML_HOME"] = BentoMLContainer.bentoml_home.get() + with reserve_free_port(host=host, enable_so_reuseport=use_grpc) as server_port: + cmd = [ + sys.executable, + "-m", + "bentoml", + "serve-grpc" if use_grpc else "serve", + "--production", + "--port", + f"{server_port}", + ] + if use_grpc: + cmd += ["--host", f"{host}", "--enable-reflection"] + cmd += [bento] + print(f"Running command: '{' '.join(cmd)}'") p = subprocess.Popen( cmd, stderr=subprocess.STDOUT, - env=my_env, + env=copied, encoding="utf-8", ) - try: - host_url = f"127.0.0.1:{port}" - assert _wait_until_api_server_ready(host_url, timeout=timeout, popen=p) + host_url = f"{host}:{server_port}" + assert asyncio.run( + server_warmup(host_url, timeout=timeout, popen=p, grpc=use_grpc) + ) yield host_url finally: + print(f"Stopping process [{p.pid}]...") kill_subprocess_tree(p) p.communicate() -def _start_mitm_proxy(port: int) -> None: - import uvicorn # type: ignore +def start_mitm_proxy(port: int) -> None: + import uvicorn from .utils import http_proxy_app - logger.info(f"proxy serer listen on {port}") - uvicorn.run(http_proxy_app, port=port) # type: ignore + print(f"Proxy server listen on {port}") + uvicorn.run(http_proxy_app, port=port) # type: ignore (not using ASGI3Application) @contextmanager def run_bento_server_distributed( - bento_tag: t.Union[str, "Tag"], - config_file: t.Optional[str] = None, + bento_tag: str | Tag, + config_file: str | None = None, + use_grpc: bool = False, timeout: float = 90, + host: str = "0.0.0.0", ): """ Launch a bentoml service as a simulated distributed environment(Yatai), yields the host URL. """ - with reserve_free_port() as proxy_port: - pass + import yaml - logger.warning(f"Starting proxy on port {proxy_port}") + import bentoml + from bentoml._internal.configuration.containers import BentoMLContainer + + proxy_process = None + with reserve_free_port(enable_so_reuseport=use_grpc) as proxy_port: + pass + print(f"Starting proxy on port {proxy_port}") proxy_process = multiprocessing.Process( - target=_start_mitm_proxy, + target=start_mitm_proxy, args=(proxy_port,), ) proxy_process.start() - - my_env = os.environ.copy() - + copied = os.environ.copy() # to ensure yatai specified headers BP100 - my_env["YATAI_BENTO_DEPLOYMENT_NAME"] = "sdfasdf" - my_env["YATAI_BENTO_DEPLOYMENT_NAMESPACE"] = "yatai" - my_env["HTTP_PROXY"] = f"http://127.0.0.1:{proxy_port}" - + copied["YATAI_BENTO_DEPLOYMENT_NAME"] = "test-deployment" + copied["YATAI_BENTO_DEPLOYMENT_NAMESPACE"] = "yatai" + copied["HTTP_PROXY"] = f"http://127.0.0.1:{proxy_port}" + copied["BENTOML_HOME"] = BentoMLContainer.bentoml_home.get() if config_file is not None: - my_env["BENTOML_CONFIG"] = os.path.abspath(config_file) - - import yaml - - import bentoml + copied["BENTOML_CONFIG"] = os.path.abspath(config_file) + runner_map = {} + processes: list[subprocess.Popen[str]] = [] bento_service = bentoml.bentos.get(bento_tag) - path = bento_service.path - with open(os.path.join(path, "bento.yaml"), "r", encoding="utf-8") as f: bentofile = yaml.safe_load(f) - - runner_map = {} - processes: t.List[subprocess.Popen[str]] = [] - for runner in bentofile["runners"]: - with reserve_free_port() as port: + with reserve_free_port(enable_so_reuseport=use_grpc) as port: runner_map[runner["name"]] = f"tcp://127.0.0.1:{port}" cmd = [ sys.executable, @@ -304,128 +359,147 @@ def run_bento_server_distributed( "--runner-name", runner["name"], "--host", - "127.0.0.1", + host, "--port", f"{port}", "--working-dir", path, ] - logger.info(f"Running command: `{cmd}`") - + print(f"Running command: '{' '.join(cmd)}'") processes.append( subprocess.Popen( cmd, encoding="utf-8", stderr=subprocess.STDOUT, - env=my_env, + env=copied, ) ) - - with reserve_free_port() as server_port: - args_pairs = [ - ("--remote-runner", f"{runner['name']}={runner_map[runner['name']]}") - for runner in bentofile["runners"] - ] - cmd = [ - sys.executable, - "-m", - "bentoml", - "start-http-server", - str(bento_tag), - "--host", - "127.0.0.1", - "--port", - f"{server_port}", - "--working-dir", - path, - *itertools.chain.from_iterable(args_pairs), - ] - logger.info(f"Running command: `{cmd}`") - + runner_args = [ + ("--remote-runner", f"{runner['name']}={runner_map[runner['name']]}") + for runner in bentofile["runners"] + ] + cmd = [ + sys.executable, + "-m", + "bentoml", + "start-http-server" if not use_grpc else "start-grpc-server", + str(bento_tag), + "--host", + host, + "--working-dir", + path, + *itertools.chain.from_iterable(runner_args), + ] + with reserve_free_port(host=host, enable_so_reuseport=use_grpc) as server_port: + cmd.extend(["--port", f"{server_port}"]) + if use_grpc: + cmd.append("--enable-reflection") + print(f"Running command: '{' '.join(cmd)}'") processes.append( subprocess.Popen( cmd, stderr=subprocess.STDOUT, encoding="utf-8", - env=my_env, + env=copied, ) ) try: - host_url = f"127.0.0.1:{server_port}" - _wait_until_api_server_ready(host_url, timeout=timeout) + host_url = f"{host}:{server_port}" + asyncio.run(server_warmup(host_url, timeout=timeout, grpc=use_grpc)) yield host_url finally: for p in processes: kill_subprocess_tree(p) for p in processes: p.communicate() - proxy_process.terminate() - proxy_process.join() + if proxy_process: + proxy_process.terminate() + proxy_process.join() -@cached_contextmanager("{bento}, {project_path}, {config_file}, {deployment_mode}") +@cached_contextmanager( + "{bento_name}, {project_path}, {config_file}, {deployment_mode}, {bentoml_home}, {use_grpc}" +) def host_bento( - bento: t.Union[str, Tag, None] = None, + bento_name: str | Tag | None = None, project_path: str = ".", config_file: str | None = None, - deployment_mode: str = "standalone", + deployment_mode: DeploymentMode = "standalone", + bentoml_home: str | None = None, + use_grpc: bool = False, clean_context: contextlib.ExitStack | None = None, + host: str = "0.0.0.0", ) -> t.Generator[str, None, None]: """ Host a bentoml service, yields the host URL. Args: - bento: a beoto tag or `module_path:service` + bento: a bento tag or :code:`module_path:service` project_path: the path to the project directory config_file: the path to the config file - deployment_mode: the deployment mode, one of `standalone`, `docker` or `distributed` + deployment_mode: the deployment mode, one of :code:`standalone`, :code:`docker` or :code:`distributed` clean_context: a contextlib.ExitStack to clean up the intermediate files, - like docker image and bentos. If None, it will be created. Used for reusing - those files in the same test session. + like docker image and bentos. If None, it will be created. Used for reusing + those files in the same test session. + bentoml_home: if set, we will change the given BentoML home folder to :code:`bentoml_home`. Default + to :code:`$HOME/bentoml` + grpc: if True, running gRPC tests. + host: set a given host for the bento, default to :code:`0.0.0.0` + + Returns: + :obj:`str`: a generated host URL where we run the test bento. """ import bentoml + # host changed to 127.0.0.1 for running on Windows + if psutil.WINDOWS: + host = "127.0.0.1" if clean_context is None: clean_context = contextlib.ExitStack() clean_on_exit = True else: clean_on_exit = False + if bentoml_home: + from bentoml._internal.configuration.containers import BentoMLContainer + BentoMLContainer.bentoml_home.set(bentoml_home) try: - logger.info( - f"starting bento server {bento} at {project_path} " - f"with config file {config_file} " - f"in {deployment_mode} mode..." + print( + f"Starting bento server {bento_name} at '{project_path}' {'with config file '+config_file+' ' if config_file else ' '}in {deployment_mode} mode..." ) - if bento is None or not bentoml.list(bento): - bento_tag = clean_context.enter_context(bentoml_build(project_path)) + if bento_name is None or not bentoml.list(bento_name): + bento = clean_context.enter_context(bentoml_build(project_path)) else: - bento_tag = bentoml.get(bento).tag - - if deployment_mode == "docker": - image_tag = clean_context.enter_context(bentoml_containerize(bento_tag)) - with run_bento_server_in_docker( # pylint: disable=not-context-manager # cached_contextmanager not detected by pylint - image_tag, - config_file, - ) as host: - yield host - elif deployment_mode == "standalone": - with run_bento_server( - str(bento_tag), + bento = bentoml.get(bento_name) + if deployment_mode == "standalone": + with run_bento_server_standalone( + bento.path, config_file=config_file, - workdir=project_path, - ) as host: - yield host + use_grpc=use_grpc, + host=host, + ) as host_url: + yield host_url + elif deployment_mode == "docker": + container = clean_context.enter_context(bentoml_containerize(bento.tag)) + with run_bento_server_docker( + container, + config_file=config_file, + use_grpc=use_grpc, + host=host, + ) as host_url: + yield host_url elif deployment_mode == "distributed": with run_bento_server_distributed( - str(bento_tag), + bento.tag, config_file=config_file, - ) as host: - yield host + use_grpc=use_grpc, + host=host, + ) as host_url: + yield host_url else: - raise ValueError(f"Unknown deployment mode: {deployment_mode}") + raise ValueError(f"Unknown deployment mode: {deployment_mode}") from None finally: - logger.info("shutting down bento server...") + print("Shutting down bento server...") if clean_on_exit: - logger.info("Cleaning up...") + print("Cleaning on exit...") clean_context.close() diff --git a/bentoml/testing/utils.py b/bentoml/testing/utils.py index ec088a12e8e..9e200fcd2ca 100644 --- a/bentoml/testing/utils.py +++ b/bentoml/testing/utils.py @@ -1,15 +1,11 @@ from __future__ import annotations import typing as t -import logging from typing import TYPE_CHECKING import aiohttp import multidict -logger = logging.getLogger("bentoml.tests") - - if TYPE_CHECKING: from starlette.types import Send from starlette.types import Scope @@ -35,19 +31,31 @@ async def async_bytesio(bytes_: bytes) -> t.AsyncGenerator[bytes, None]: return await parser.parse() +def handle_assert_exception(assert_fn: t.Any, obj: t.Any, msg: str): + try: + if callable(assert_fn): + assert assert_fn(obj) + else: + assert obj == assert_fn + except AssertionError: + raise ValueError(msg) from None + except Exception as e: # pylint: disable=broad-except + # if callable has some errors, then we raise it here + raise ValueError( + f"Exception while excuting '{assert_fn.__name__}': {e}" + ) from None + + async def async_request( method: str, url: str, - headers: t.Union[None, t.Tuple[t.Tuple[str, str], ...], "LooseHeaders"] = None, + headers: None | tuple[tuple[str, str], ...] | LooseHeaders = None, data: t.Any = None, - timeout: t.Optional[int] = None, - assert_status: t.Union[int, t.Callable[[int], bool], None] = None, - assert_data: t.Union[bytes, t.Callable[[bytes], bool], None] = None, - assert_headers: t.Optional[t.Callable[[t.Any], bool]] = None, -) -> t.Tuple[int, "Headers", bytes]: - """ - raw async request client - """ + timeout: int | None = None, + assert_status: int | t.Callable[[int], bool] | None = None, + assert_data: bytes | t.Callable[[bytes], bool] | None = None, + assert_headers: t.Callable[[t.Any], bool] | None = None, +) -> tuple[int, Headers, bytes]: import aiohttp from starlette.datastructures import Headers @@ -55,41 +63,45 @@ async def async_request( try: async with sess.request( method, url, data=data, headers=headers, timeout=timeout - ) as r: - r_body = await r.read() + ) as resp: + body = await resp.read() except Exception: - raise RuntimeError( - "Unable to reach host." - ) from None # suppress exception trace + raise RuntimeError("Unable to reach host.") from None if assert_status is not None: - if callable(assert_status): - assert assert_status(r.status), f"{r.status} {repr(r_body)}" - else: - assert r.status == assert_status, f"{r.status} {repr(r_body)}" - + handle_assert_exception( + assert_status, + resp.status, + f"Return [{resp.status}] with status {resp.status}: {repr(body)}", + ) if assert_data is not None: if callable(assert_data): - assert assert_data(r_body), r_body + msg = f"'{assert_data.__name__}' returns {assert_data(body)}" else: - assert r_body == assert_data, r_body - + msg = f"Expects data '{assert_data}'" + handle_assert_exception( + assert_data, + body, + f"{msg}\nReceived response: {body}.", + ) if assert_headers is not None: - assert assert_headers(r.headers), repr(r.headers) - - headers = t.cast(t.Mapping[str, str], r.headers) - return r.status, Headers(headers), r_body + handle_assert_exception( + assert_headers, + resp.headers, + f"Headers assertion failed: {repr(resp.headers)}", + ) + return resp.status, Headers(resp.headers), body -def check_headers(headers: multidict.CIMultiDict[str]) -> bool: - return ( - headers.get("Yatai-Bento-Deployment-Name") == "sdfasdf" +def assert_distributed_header(headers: multidict.CIMultiDict[str]) -> None: + assert ( + headers.get("Yatai-Bento-Deployment-Name") == "test-deployment" and headers.get("Yatai-Bento-Deployment-Namespace") == "yatai" ) async def http_proxy_app(scope: Scope, receive: Receive, send: Send): """ - A simplest HTTP proxy app. To simulate the behavior of yatai + A simple HTTP proxy app that simulate the behavior of Yatai. """ if scope["type"] == "lifespan": return @@ -100,15 +112,14 @@ async def http_proxy_app(scope: Scope, receive: Receive, send: Send): tuple((k.decode(), v.decode()) for k, v in scope["headers"]) ) - assert check_headers(headers) - - bodys: list[bytes] = [] + assert_distributed_header(headers) + bodies: list[bytes] = [] while True: request_message = await receive() assert request_message["type"] == "http.request" request_body = request_message.get("body") assert isinstance(request_body, bytes) - bodys.append(request_body) + bodies.append(request_body) if not request_message["more_body"]: break @@ -116,7 +127,7 @@ async def http_proxy_app(scope: Scope, receive: Receive, send: Send): method=scope["method"], url=scope["path"], headers=headers, - data=b"".join(bodys), + data=b"".join(bodies), ) as response: await send( { @@ -135,4 +146,4 @@ async def http_proxy_app(scope: Scope, receive: Receive, send: Send): ) return - raise NotImplementedError(f"Scope {scope} is not understood") + raise NotImplementedError(f"Scope {scope} is not understood.") from None diff --git a/tests/README.md b/tests/README.md deleted file mode 100644 index 51506d6e0bb..00000000000 --- a/tests/README.md +++ /dev/null @@ -1,5 +0,0 @@ -```bash -tests/utils -> tests helpers and utilities -tests/integration -> integration tests -tests/unit -> unitest -``` diff --git a/tests/conftest.py b/tests/conftest.py index ccaf6e966ef..fd8b69d05db 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +# pylint: disable=unused-argument from __future__ import annotations import os @@ -79,7 +80,7 @@ def noop_sync(data: str) -> str: # type: ignore return svc -@pytest.fixture(scope="function", autouse=True, name="propagate_logs") +@pytest.fixture(scope="function", name="propagate_logs") def fixture_propagate_logs() -> t.Generator[None, None, None]: logger = logging.getLogger("bentoml") # bentoml sets propagate to False by default, so we need to set it to True diff --git a/tests/e2e/README.md b/tests/e2e/README.md new file mode 100644 index 00000000000..c694077828f --- /dev/null +++ b/tests/e2e/README.md @@ -0,0 +1,110 @@ +# End-to-end tests suite + +This folder contains end-to-end test suite. + +## Instruction + +To create a new test suite (for simplicity let's call our test suite `qa`), do the following: + +1. Navigate to [`config.yml`](../../scripts/ci/config.yml) and add the E2E definition: + +```yaml +qa: + <<: *tmpl + root_test_dir: "tests/e2e/qa" + is_dir: true + type_tests: "e2e" + dependencies: # add required Python dependencies here. + - Pillow + - pydantic + - grpcio-status +``` + +2. Create the folder `qa` with the following project structure: + +```bash +. +├── bentofile.yaml +├── configure.py # REQUIRED: See below +... +├── service.py +└── tests + ├── conftest.py + ├── test_io.py + ... + └── test_meta.py +``` + +> Note that files under `tests` are merely examples, feel free to add any types of +> additional tests. + +3. Contents of `configure.py` must have a `create_model()` function: + +```python +import python_model + +import bentoml + + +def create_model(): + bentoml.picklable_model.save_model( + "py_model.case-1.grpc.e2e", + python_model.PythonFunction(), + signatures={ + "echo_json": {"batchable": True}, + "echo_object": {"batchable": False}, + "echo_ndarray": {"batchable": True}, + "double_ndarray": {"batchable": True}, + }, + external_modules=[python_model], + ) + +... +``` + +4. Inside `tests/conftest.py`, create a `host` fixture like so: + +```python +# pylint: disable=unused-argument +from __future__ import annotations + +import typing as t +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from contextlib import ExitStack + + +@pytest.fixture(scope="module") +def host( + bentoml_home: str, + deployment_mode: str, + clean_context: ExitStack, +) -> t.Generator[str, None, None]: + from bentoml.testing.server import host_bento + + with host_bento( + "service:svc", + deployment_mode=deployment_mode, + bentoml_home=bentoml_home, + clean_context=clean_context, + use_grpc=True, + ) as _host: + yield _host +``` + +5. To run the tests, navigate to `GIT_ROOT` (root directory of bentoml), and call: + +```bash +./scripts/ci/run_tests.sh qa +``` + +By default, the E2E suite is setup so that the models and bentos will be created and +saved under pytest temporary directory. To cleanup after the test, passing `--cleanup` +to `run_tests.sh`: + +```bash +./scripts/ci/run_tests.sh qa --cleanup +``` diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/e2e/bento_server_general_features/server_config_cors_enabled.yml b/tests/e2e/bento_server_general_features/server_config_cors_enabled.yml deleted file mode 100644 index cc510661398..00000000000 --- a/tests/e2e/bento_server_general_features/server_config_cors_enabled.yml +++ /dev/null @@ -1,9 +0,0 @@ -api_server: - cors: # standard: https://fetch.spec.whatwg.org/#http-cors-protocol - enabled: True - access_control_allow_origin: "*" - access_control_allow_methods: ["GET", "OPTIONS", "POST", "HEAD", "PUT"] - access_control_allow_credentials: True - access_control_allow_headers: Null - access_control_max_age: Null - access_control_expose_headers: ["Content-Length"] diff --git a/tests/e2e/bento_server_general_features/server_config_default.yml b/tests/e2e/bento_server_general_features/server_config_default.yml deleted file mode 100644 index 096565f0b1c..00000000000 --- a/tests/e2e/bento_server_general_features/server_config_default.yml +++ /dev/null @@ -1,3 +0,0 @@ -api_server: - cors: # standard: https://fetch.spec.whatwg.org/#http-cors-protocol - enabled: False diff --git a/tests/e2e/bento_server_general_features/tests/conftest.py b/tests/e2e/bento_server_general_features/tests/conftest.py deleted file mode 100644 index 17bb94ff19f..00000000000 --- a/tests/e2e/bento_server_general_features/tests/conftest.py +++ /dev/null @@ -1,100 +0,0 @@ -# type: ignore[no-untyped-def] - -import os -import typing as t -import contextlib - -import numpy as np -import psutil -import pytest - - -@pytest.fixture() -def img_file(tmpdir) -> str: - import PIL.Image - - img_file_ = tmpdir.join("test_img.bmp") - img = PIL.Image.fromarray(np.random.randint(255, size=(10, 10, 3)).astype("uint8")) - img.save(str(img_file_)) - return str(img_file_) - - -@pytest.fixture() -def bin_file(tmpdir) -> str: - bin_file_ = tmpdir.join("bin_file.bin") - with open(bin_file_, "wb") as of: - of.write("â".encode("gb18030")) - return str(bin_file_) - - -def pytest_configure(config): # pylint: disable=unused-argument - import sys - import subprocess - - cmd = f"{sys.executable} {os.path.join(os.getcwd(), 'train.py')}" - subprocess.run(cmd, shell=True, check=True) - - # use the local bentoml package in development - os.environ["BENTOML_BUNDLE_LOCAL_BUILD"] = "True" - os.environ["SETUPTOOLS_USE_DISTUTILS"] = "stdlib" - - -@pytest.fixture(scope="session", autouse=True) -def clean_context(): - stack = contextlib.ExitStack() - yield stack - stack.close() - - -@pytest.fixture( - params=[ - "server_config_default.yml", - "server_config_cors_enabled.yml", - ], - scope="session", -) -def server_config_file(request): - return request.param - - -@pytest.fixture( - params=[ - # "dev", - "standalone", - "docker", - "distributed", - ], - scope="session", -) -def deployment_mode(request) -> str: - return request.param - - -@pytest.fixture(scope="session") -def host( - deployment_mode: str, - server_config_file: str, - clean_context: contextlib.ExitStack, -) -> t.Generator[str, None, None]: - if ( - (psutil.WINDOWS or psutil.MACOS) - and os.environ.get("GITHUB_ACTION") - and deployment_mode == "docker" - ): - pytest.skip( - "due to GitHub Action's limitation, docker deployment is not supported on " - "windows/macos. But you can still run this test on macos/windows locally." - ) - - if not psutil.LINUX and deployment_mode == "distributed": - pytest.skip("distributed deployment is only supported on Linux") - - from bentoml.testing.server import host_bento - - with host_bento( - "service:svc", - config_file=server_config_file, - deployment_mode=deployment_mode, - clean_context=clean_context, - ) as host: - yield host diff --git a/tests/e2e/bento_server_grpc/_interceptor.py b/tests/e2e/bento_server_grpc/_interceptor.py new file mode 100644 index 00000000000..dd565cfae7a --- /dev/null +++ b/tests/e2e/bento_server_grpc/_interceptor.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import typing as t +import functools +import dataclasses +from typing import TYPE_CHECKING + +from grpc import aio + +if TYPE_CHECKING: + from bentoml.grpc.types import Request + from bentoml.grpc.types import Response + from bentoml.grpc.types import RpcMethodHandler + from bentoml.grpc.types import AsyncHandlerMethod + from bentoml.grpc.types import HandlerCallDetails + from bentoml.grpc.types import BentoServicerContext + + +@dataclasses.dataclass +class Context: + usage: str + accuracy_score: float + + +class AsyncContextInterceptor(aio.ServerInterceptor): + def __init__(self, *, usage: str, accuracy_score: float) -> None: + self.context = Context(usage=usage, accuracy_score=accuracy_score) + self._record: set[str] = set() + + async def intercept_service( + self, + continuation: t.Callable[[HandlerCallDetails], t.Awaitable[RpcMethodHandler]], + handler_call_details: HandlerCallDetails, + ) -> RpcMethodHandler: + from bentoml.grpc.utils import wrap_rpc_handler + + handler = await continuation(handler_call_details) + + if handler and (handler.response_streaming or handler.request_streaming): + return handler + + def wrapper(behaviour: AsyncHandlerMethod[Response]): + @functools.wraps(behaviour) + async def new_behaviour( + request: Request, context: BentoServicerContext + ) -> Response | t.Awaitable[Response]: + self._record.update( + {f"{self.context.usage}:{self.context.accuracy_score}"} + ) + resp = await behaviour(request, context) + context.set_trailing_metadata( + tuple( + [ + (k, str(v).encode("utf-8")) + for k, v in dataclasses.asdict(self.context).items() + ] + ) + ) + return resp + + return new_behaviour + + return wrap_rpc_handler(wrapper, handler) diff --git a/tests/e2e/bento_server_grpc/_servicer.py b/tests/e2e/bento_server_grpc/_servicer.py new file mode 100644 index 00000000000..206f4e357d8 --- /dev/null +++ b/tests/e2e/bento_server_grpc/_servicer.py @@ -0,0 +1,19 @@ +# pylint: disable=unused-argument +from __future__ import annotations + +from typing import TYPE_CHECKING + +from bentoml.grpc.v1alpha1 import service_test_pb2 as pb +from bentoml.grpc.v1alpha1 import service_test_pb2_grpc as services + +if TYPE_CHECKING: + from grpc import aio + + +class TestServiceServicer(services.TestServiceServicer): + async def Execute( # type: ignore (no async types) + self, + request: pb.ExecuteRequest, + context: aio.ServicerContext[pb.ExecuteRequest, pb.ExecuteResponse], + ) -> pb.ExecuteResponse: + return pb.ExecuteResponse(output="Hello, {}!".format(request.input)) diff --git a/tests/e2e/bento_server_grpc/bentofile.yaml b/tests/e2e/bento_server_grpc/bentofile.yaml new file mode 100644 index 00000000000..1a764fd6104 --- /dev/null +++ b/tests/e2e/bento_server_grpc/bentofile.yaml @@ -0,0 +1,13 @@ +service: service:svc +exclude: + - python_model.py + - "*.xml" +python: + packages: + - pandas + - pydantic + - Pillow + - scikit-learn + - pyarrow + components: + grpc: true diff --git a/tests/e2e/bento_server_grpc/configure.py b/tests/e2e/bento_server_grpc/configure.py new file mode 100644 index 00000000000..ce7ddce4125 --- /dev/null +++ b/tests/e2e/bento_server_grpc/configure.py @@ -0,0 +1,20 @@ +import python_model + +import bentoml + + +def create_model(): + bentoml.picklable_model.save_model( + "py_model.case-1.grpc.e2e", + python_model.PythonFunction(), + signatures={ + "predict_file": {"batchable": True}, + "echo_json": {"batchable": True}, + "echo_object": {"batchable": False}, + "echo_ndarray": {"batchable": True}, + "double_ndarray": {"batchable": True}, + "multiply_float_ndarray": {"batchable": True}, + "double_dataframe_column": {"batchable": True}, + }, + external_modules=[python_model], + ) diff --git a/tests/e2e/bento_server_grpc/python_model.py b/tests/e2e/bento_server_grpc/python_model.py new file mode 100644 index 00000000000..2acf9715e17 --- /dev/null +++ b/tests/e2e/bento_server_grpc/python_model.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from typing import Any +from typing import TYPE_CHECKING + +import numpy as np +import pandas as pd + +if TYPE_CHECKING: + from numpy.typing import NDArray + + from bentoml._internal.types import FileLike + from bentoml._internal.types import JSONSerializable + + +class PythonFunction: + def predict_file(self, files: list[FileLike[bytes]]) -> list[bytes]: + return [f.read() for f in files] + + @classmethod + def echo_json(cls, datas: JSONSerializable) -> JSONSerializable: + return datas + + @classmethod + def echo_ndarray(cls, datas: NDArray[Any]) -> NDArray[Any]: + return datas + + def double_ndarray(self, data: NDArray[Any]) -> NDArray[Any]: + assert isinstance(data, np.ndarray) + return data * 2 + + def multiply_float_ndarray( + self, arr1: NDArray[np.float32], arr2: NDArray[np.float32] + ) -> NDArray[np.float32]: + assert isinstance(arr1, np.ndarray) + assert isinstance(arr2, np.ndarray) + return arr1 * arr2 + + def double_dataframe_column(self, df: pd.DataFrame) -> pd.DataFrame: + assert isinstance(df, pd.DataFrame) + return df[["col1"]] * 2 # type: ignore (no pandas types) + + def echo_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: + return df diff --git a/tests/e2e/bento_server_grpc/service.py b/tests/e2e/bento_server_grpc/service.py new file mode 100644 index 00000000000..bf2ccd4c87d --- /dev/null +++ b/tests/e2e/bento_server_grpc/service.py @@ -0,0 +1,187 @@ +from __future__ import annotations + +import typing as t +from typing import TYPE_CHECKING + +from pydantic import BaseModel +from _servicer import TestServiceServicer +from _interceptor import AsyncContextInterceptor + +import bentoml +from bentoml.io import File +from bentoml.io import JSON +from bentoml.io import Text +from bentoml.io import Image +from bentoml.io import Multipart +from bentoml.io import NumpyNdarray +from bentoml.io import PandasDataFrame +from bentoml._internal.utils import LazyLoader + +if TYPE_CHECKING: + import numpy as np + import pandas as pd + import PIL.Image + from numpy.typing import NDArray + + from bentoml.grpc.v1alpha1 import service_test_pb2 as pb_test + from bentoml.grpc.v1alpha1 import service_test_pb2_grpc as services_test + from bentoml._internal.types import FileLike + from bentoml._internal.types import JSONSerializable + from bentoml.picklable_model import get_runnable + from bentoml._internal.runner.runner import RunnerMethod + + RunnableImpl = get_runnable(bentoml.picklable_model.get("py_model.case-1.grpc.e2e")) + + class PythonModelRunner(bentoml.Runner): + predict_file: RunnerMethod[RunnableImpl, [list[FileLike[bytes]]], list[bytes]] + echo_json: RunnerMethod[ + RunnableImpl, [list[JSONSerializable]], list[JSONSerializable] + ] + echo_ndarray: RunnerMethod[RunnableImpl, [NDArray[t.Any]], NDArray[t.Any]] + double_ndarray: RunnerMethod[RunnableImpl, [NDArray[t.Any]], NDArray[t.Any]] + multiply_float_ndarray: RunnerMethod[ + RunnableImpl, + [NDArray[np.float32], NDArray[np.float32]], + NDArray[np.float32], + ] + double_dataframe_column: RunnerMethod[ + RunnableImpl, [pd.DataFrame], pd.DataFrame + ] + echo_dataframe: RunnerMethod[RunnableImpl, [pd.DataFrame], pd.DataFrame] + +else: + from bentoml.grpc.utils import import_generated_stubs + + pb_test, services_test = import_generated_stubs(file="service_test.proto") + np = LazyLoader("np", globals(), "numpy") + pd = LazyLoader("pd", globals(), "pandas") + PIL = LazyLoader("PIL", globals(), "PIL") + PIL.Image = LazyLoader("PIL.Image", globals(), "PIL.Image") + + +py_model = t.cast( + "PythonModelRunner", + bentoml.picklable_model.get("py_model.case-1.grpc.e2e").to_runner(), +) + +svc = bentoml.Service(name="general_grpc_service.case-1.e2e", runners=[py_model]) + +svc.mount_grpc_servicer( + TestServiceServicer, + add_servicer_fn=services_test.add_TestServiceServicer_to_server, + service_names=[v.full_name for v in pb_test.DESCRIPTOR.services_by_name.values()], +) +svc.add_grpc_interceptor(AsyncContextInterceptor, usage="NLP", accuracy_score=0.8247) + + +class IrisFeatures(BaseModel): + sepal_len: float + sepal_width: float + petal_len: float + petal_width: float + + +class IrisClassificationRequest(BaseModel): + request_id: str + iris_features: IrisFeatures + + +@svc.api(input=Text(), output=Text()) +async def bonjour(inp: str) -> str: + return f"Hello, {inp}!" + + +@svc.api(input=JSON(), output=JSON()) +async def echo_json(json_obj: JSONSerializable) -> JSONSerializable: + batched = await py_model.echo_json.async_run([json_obj]) + return batched[0] + + +@svc.api( + input=JSON(pydantic_model=IrisClassificationRequest), + output=JSON(), +) +def echo_json_validate(input_data: IrisClassificationRequest) -> dict[str, float]: + print("request_id: ", input_data.request_id) + return input_data.iris_features.dict() + + +@svc.api(input=NumpyNdarray(), output=NumpyNdarray()) +async def double_ndarray(arr: NDArray[t.Any]) -> NDArray[t.Any]: + return await py_model.double_ndarray.async_run(arr) + + +@svc.api(input=NumpyNdarray.from_sample(np.random.rand(2, 2)), output=NumpyNdarray()) +async def echo_ndarray_from_sample(arr: NDArray[t.Any]) -> NDArray[t.Any]: + assert arr.shape == (2, 2) + return await py_model.echo_ndarray.async_run(arr) + + +@svc.api(input=NumpyNdarray(shape=(2, 2), enforce_shape=True), output=NumpyNdarray()) +async def echo_ndarray_enforce_shape(arr: NDArray[t.Any]) -> NDArray[t.Any]: + assert arr.shape == (2, 2) + return await py_model.echo_ndarray.async_run(arr) + + +@svc.api( + input=NumpyNdarray(dtype=np.float32, enforce_dtype=True), output=NumpyNdarray() +) +async def echo_ndarray_enforce_dtype(arr: NDArray[t.Any]) -> NDArray[t.Any]: + assert arr.dtype == np.float32 + return await py_model.echo_ndarray.async_run(arr) + + +@svc.api(input=PandasDataFrame(orient="columns"), output=PandasDataFrame()) +async def echo_dataframe(df: pd.DataFrame) -> pd.DataFrame: + assert isinstance(df, pd.DataFrame) + return df + + +@svc.api( + input=PandasDataFrame.from_sample( + pd.DataFrame({"age": [3, 29], "height": [94, 170], "weight": [31, 115]}), + orient="columns", + ), + output=PandasDataFrame(), +) +async def echo_dataframe_from_sample(df: pd.DataFrame) -> pd.DataFrame: + assert isinstance(df, pd.DataFrame) + return df + + +@svc.api( + input=PandasDataFrame(dtype={"col1": "int64"}, orient="columns"), + output=PandasDataFrame(), +) +async def double_dataframe(df: pd.DataFrame) -> pd.DataFrame: + assert df["col1"].dtype == "int64" + output = await py_model.double_dataframe_column.async_run(df) + dfo = pd.DataFrame() + dfo["col1"] = output + return dfo + + +@svc.api(input=File(), output=File()) +async def predict_file(f: FileLike[bytes]) -> bytes: + batch_ret = await py_model.predict_file.async_run([f]) + return batch_ret[0] + + +@svc.api(input=Image(mime_type="image/bmp"), output=Image(mime_type="image/bmp")) +async def echo_image(f: PIL.Image.Image) -> NDArray[t.Any]: + assert isinstance(f, PIL.Image.Image) + return np.array(f) + + +@svc.api( + input=Multipart( + original=Image(mime_type="image/bmp"), compared=Image(mime_type="image/bmp") + ), + output=Multipart(meta=Text(), result=Image(mime_type="image/bmp")), +) +async def predict_multi_images(original: Image, compared: Image): + output_array = await py_model.multiply_float_ndarray.async_run( + np.array(original), np.array(compared) + ) + img = PIL.Image.fromarray(output_array) + return {"meta": "success", "result": img} diff --git a/tests/e2e/bento_server_grpc/tests/conftest.py b/tests/e2e/bento_server_grpc/tests/conftest.py new file mode 100644 index 00000000000..f555b2ba664 --- /dev/null +++ b/tests/e2e/bento_server_grpc/tests/conftest.py @@ -0,0 +1,108 @@ +# pylint: disable=unused-argument +from __future__ import annotations + +import typing as t +from typing import TYPE_CHECKING + +import psutil +import pytest +from opentelemetry import trace as trace_api +from opentelemetry.sdk.trace import export +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.test.globals_test import reset_trace_globals +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from bentoml._internal.configuration.containers import BentoMLContainer + +if TYPE_CHECKING: + from contextlib import ExitStack + + from _pytest.nodes import Item as _PytestItem + from _pytest.config import Config + + from bentoml._internal.server.metrics.prometheus import PrometheusClient + + # fixturenames and funcargs will be added dynamically + # inside tests generation lifecycle + class FunctionItem(_PytestItem): + fixturenames: list[str] + funcargs: dict[str, t.Any] + + +def create_tracer_provider( + **kwargs: t.Any, +) -> tuple[TracerProvider, InMemorySpanExporter]: + tracer_provider = TracerProvider(**kwargs) + memory_exporter = InMemorySpanExporter() + span_processor = export.SimpleSpanProcessor(memory_exporter) + tracer_provider.add_span_processor(span_processor) + return tracer_provider, memory_exporter + + +OTEL_MARKER = "otel" +SKIP_DEPLOYMENT = "skip_deployment_mode" + + +def pytest_configure(config: Config) -> None: + config.addinivalue_line( + "markers", + f"{OTEL_MARKER}: mark the test to use OpenTelemetry fixtures.", + ) + + +def pytest_runtest_setup(item: FunctionItem): + marker = item.get_closest_marker(OTEL_MARKER) + if marker: + tracer_provider, memory_exporter = create_tracer_provider() + BentoMLContainer.tracer_provider.set(tracer_provider) + # This is done because set_tracer_provider cannot override the + # current tracer provider. + reset_trace_globals() + trace_api.set_tracer_provider(tracer_provider) + memory_exporter.clear() + # handling fixtures + fixturenames: list[str] = item.fixturenames + funcargs = item.funcargs + if "tracer_provider" in fixturenames: + fixturenames.remove("tracer_provider") + fixturenames.insert(0, "tracer_provider") + funcargs["tracer_provider"] = tracer_provider + if "memory_exporter" in fixturenames: + fixturenames.remove("memory_exporter") + fixturenames.insert(0, "memory_exporter") + funcargs["memory_exporter"] = memory_exporter + + +def pytest_runtest_teardown(item: FunctionItem, nextitem: FunctionItem | None): + if item.get_closest_marker(OTEL_MARKER): + reset_trace_globals() + BentoMLContainer.tracer_provider.reset() + + +@pytest.fixture(scope="module", name="metrics_client") +def fixture_metrics_client() -> PrometheusClient: + return BentoMLContainer.metrics_client.get() + + +@pytest.fixture(scope="module") +def host( + bentoml_home: str, + deployment_mode: str, + clean_context: ExitStack, +) -> t.Generator[str, None, None]: + from bentoml.testing.server import host_bento + + # import os + # PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + # config_file = os.path.join(PROJECT_DIR, "tracing.yml") + if psutil.WINDOWS: + pytest.skip("gRPC is not supported on Windows.") + with host_bento( + "service:svc", + deployment_mode=deployment_mode, + bentoml_home=bentoml_home, + clean_context=clean_context, + # config_file=config_file, + use_grpc=True, + ) as _host: + yield _host diff --git a/tests/e2e/bento_server_grpc/tests/test_interceptor.py b/tests/e2e/bento_server_grpc/tests/test_interceptor.py new file mode 100644 index 00000000000..ca2ba90f6ff --- /dev/null +++ b/tests/e2e/bento_server_grpc/tests/test_interceptor.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import typing as t +from typing import TYPE_CHECKING + +import grpc +import pytest +from opentelemetry import trace as trace_api +from google.protobuf import wrappers_pb2 +from opentelemetry.semconv.trace import SpanAttributes + +from bentoml.testing.grpc import create_channel +from bentoml.grpc.v1alpha1 import service_pb2 as pb +from bentoml.grpc.v1alpha1 import service_pb2_grpc as services +from bentoml.grpc.v1alpha1 import service_test_pb2 as pb_test +from bentoml.grpc.v1alpha1 import service_test_pb2_grpc as services_test +from bentoml.testing.utils import async_request +from bentoml._internal.utils import LazyLoader + +if TYPE_CHECKING: + import opentelemetry.instrumentation.grpc as otel_grpc + from opentelemetry.sdk.trace import Span + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + + from bentoml._internal.server.metrics.prometheus import PrometheusClient +else: + otel_grpc = LazyLoader("otel_grpc", globals(), "opentelemetry.instrumentation.grpc") + + +@pytest.mark.asyncio +async def test_prometheus_interceptor( + host: str, metrics_client: PrometheusClient +) -> None: + # Currently, tests doesn't change default metrics ports, which is 3001 + async with create_channel(host) as channel: + client = services.BentoServiceStub(channel) # type: ignore (no async types) + await client.Call( + pb.Request( + api_name="bonjour", text=wrappers_pb2.StringValue(value="BentoML") + ) + ) + _, _, body = await async_request( + "GET", + "http://0.0.0.0:3001", + assert_status=200, + ) + assert b"bonjour" in body + assert len(body) == len(metrics_client.generate_latest()) + + +def assert_span_has_attributes(span: Span, attributes: dict[str, t.Any]): + assert span.attributes + for key, value in attributes.items(): + assert key in span.attributes + assert span.attributes[key] == value + + +def sanity_check(span: Span, name: str): + # check if span name is the same as rpc_call + assert span.name == name + assert span.kind == trace_api.SpanKind.SERVER + + # sanity check + assert span.instrumentation_info.name == otel_grpc.__name__ + assert span.instrumentation_info.version == otel_grpc.__version__ + + +@pytest.mark.skip("Currently broken test, will revisit after experimental release.") +@pytest.mark.otel +@pytest.mark.asyncio +async def test_otel_interceptor(memory_exporter: InMemorySpanExporter, host: str): + async with create_channel(host) as channel: + stub = services_test.TestServiceStub(channel) # type: ignore (no async types) + await stub.Execute(pb_test.ExecuteRequest(input="BentoML")) + + spans_list = t.cast("list[Span]", memory_exporter.get_finished_spans()) + assert len(spans_list) == 1 + # We only care about the second span, which is the rpc_call + span = spans_list[0] + service_name = "bentoml.testing.v1alpha1.TestService" + + sanity_check(span, f"/{service_name}/Execute") + assert_span_has_attributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "Execute", + SpanAttributes.RPC_SERVICE: service_name, + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], + }, + ) diff --git a/tests/e2e/bento_server_grpc/tests/test_io.py b/tests/e2e/bento_server_grpc/tests/test_io.py new file mode 100644 index 00000000000..5d86597ec08 --- /dev/null +++ b/tests/e2e/bento_server_grpc/tests/test_io.py @@ -0,0 +1,406 @@ +from __future__ import annotations + +import io +import random +import traceback +from typing import TYPE_CHECKING +from functools import partial + +import pytest + +from bentoml.testing.grpc import create_channel +from bentoml.testing.grpc import async_client_call +from bentoml.testing.grpc import randomize_pb_ndarray +from bentoml._internal.utils import LazyType +from bentoml._internal.utils import LazyLoader + +if TYPE_CHECKING: + import grpc + import numpy as np + import pandas as pd + import PIL.Image as PILImage + from grpc import aio + from google.protobuf import struct_pb2 + from google.protobuf import wrappers_pb2 + + from bentoml._internal import external_typing as ext + from bentoml.grpc.v1alpha1 import service_pb2 as pb +else: + from bentoml.grpc.utils import import_generated_stubs + + exception_msg = ( + "'grpcio' is not installed. Please install it with 'pip install -U grpcio'" + ) + pb, _ = import_generated_stubs() + grpc = LazyLoader("grpc", globals(), "grpc", exc_msg=exception_msg) + aio = LazyLoader("aio", globals(), "grpc.aio", exc_msg=exception_msg) + wrappers_pb2 = LazyLoader("wrappers_pb2", globals(), "google.protobuf.wrappers_pb2") + struct_pb2 = LazyLoader("struct_pb2", globals(), "google.protobuf.struct_pb2") + np = LazyLoader("np", globals(), "numpy") + pd = LazyLoader("pd", globals(), "pandas") + PILImage = LazyLoader("PILImage", globals(), "PIL.Image") + + +def assert_ndarray( + resp: pb.Response, + assert_shape: list[int], + assert_dtype: pb.NDArray.DType.ValueType, +) -> bool: + __tracebackhide__ = True # Hide traceback for py.test + + dtype = resp.ndarray.dtype + try: + assert resp.ndarray.shape == assert_shape + assert dtype == assert_dtype + return True + except AssertionError: + traceback.print_exc() + return False + + +def make_iris_proto(**fields: struct_pb2.Value) -> struct_pb2.Value: + return struct_pb2.Value( + struct_value=struct_pb2.Struct( + fields={ + "request_id": struct_pb2.Value(string_value="123"), + "iris_features": struct_pb2.Value( + struct_value=struct_pb2.Struct(fields=fields) + ), + } + ) + ) + + +@pytest.mark.asyncio +async def test_numpy(host: str): + async with create_channel(host) as channel: + await async_client_call( + "double_ndarray", + channel=channel, + data={"ndarray": randomize_pb_ndarray((1000,))}, + assert_data=partial( + assert_ndarray, assert_shape=[1000], assert_dtype=pb.NDArray.DTYPE_FLOAT + ), + ) + await async_client_call( + "double_ndarray", + channel=channel, + data={"ndarray": pb.NDArray(shape=[2, 2], int32_values=[1, 2, 3, 4])}, + assert_data=lambda resp: resp.ndarray.int32_values == [2, 4, 6, 8], + ) + with pytest.raises(aio.AioRpcError): + await async_client_call( + "double_ndarray", + channel=channel, + data={"ndarray": pb.NDArray(string_values=np.array(["2", "2f"]))}, + assert_code=grpc.StatusCode.INTERNAL, + ) + await async_client_call( + "double_ndarray", + channel=channel, + data={ + "ndarray": pb.NDArray( + dtype=123, string_values=np.array(["2", "2f"]) # type: ignore (test exception) + ) + }, + assert_code=grpc.StatusCode.INVALID_ARGUMENT, + ) + await async_client_call( + "double_ndarray", + channel=channel, + data={ + "_internal_bytes_contents": np.array([1, 2, 3, 4]).ravel().tobytes() + }, + assert_code=grpc.StatusCode.FAILED_PRECONDITION, + ) + await async_client_call( + "double_ndarray", + channel=channel, + data={"text": wrappers_pb2.StringValue(value="asdf")}, + assert_code=grpc.StatusCode.FAILED_PRECONDITION, + ) + await async_client_call( + "echo_ndarray_enforce_shape", + channel=channel, + data={"ndarray": randomize_pb_ndarray((1000,))}, + assert_code=grpc.StatusCode.INVALID_ARGUMENT, + ) + await async_client_call( + "echo_ndarray_enforce_dtype", + channel=channel, + data={"ndarray": pb.NDArray(string_values=np.array(["2", "2f"]))}, + assert_code=grpc.StatusCode.INVALID_ARGUMENT, + ) + + +@pytest.mark.asyncio +async def test_json(host: str): + async with create_channel(host) as channel: + await async_client_call( + "echo_json", + channel=channel, + data={"json": struct_pb2.Value(string_value='"hi"')}, + assert_data=lambda resp: resp.json.string_value == '"hi"', + ) + await async_client_call( + "echo_json", + channel=channel, + data={ + "_internal_bytes_contents": b'{"request_id": "123", "iris_features": {"sepal_len":2.34,"sepal_width":1.58, "petal_len":6.52, "petal_width":3.23}}' + }, + assert_data=lambda resp: resp.json # type: ignore (bad lambda types) + == make_iris_proto( + sepal_len=struct_pb2.Value(number_value=2.34), + sepal_width=struct_pb2.Value(number_value=1.58), + petal_len=struct_pb2.Value(number_value=6.52), + petal_width=struct_pb2.Value(number_value=3.23), + ), + ) + await async_client_call( + "echo_json_validate", + channel=channel, + data={ + "json": make_iris_proto( + **{ + k: struct_pb2.Value(number_value=random.uniform(1.0, 6.0)) + for k in [ + "sepal_len", + "sepal_width", + "petal_len", + "petal_width", + ] + } + ) + }, + ) + with pytest.raises(aio.AioRpcError): + await async_client_call( + "echo_json", + channel=channel, + data={"_internal_bytes_contents": b"\n?xfa"}, + assert_code=grpc.StatusCode.INVALID_ARGUMENT, + ) + await async_client_call( + "echo_json", + channel=channel, + data={"text": wrappers_pb2.StringValue(value="asdf")}, + assert_code=grpc.StatusCode.INVALID_ARGUMENT, + ) + await async_client_call( + "echo_json_validate", + channel=channel, + data={ + "json": make_iris_proto( + sepal_len=struct_pb2.Value(number_value=2.34), + sepal_width=struct_pb2.Value(number_value=1.58), + petal_len=struct_pb2.Value(number_value=6.52), + ), + }, + assert_code=grpc.StatusCode.FAILED_PRECONDITION, + ) + + +@pytest.mark.asyncio +async def test_file(host: str, bin_file: str): + # Test File as binary + with open(str(bin_file), "rb") as f: + fb = f.read() + + async with create_channel(host) as channel: + await async_client_call( + "predict_file", + channel=channel, + data={"_internal_bytes_contents": fb}, + assert_data=lambda resp: resp.file.content == fb, + ) + await async_client_call( + "predict_file", + channel=channel, + data={"file": pb.File(kind=pb.File.FILE_TYPE_BYTES, content=fb)}, + assert_data=lambda resp: resp.file.content == b"\x810\x899" + and resp.file.kind == pb.File.FILE_TYPE_BYTES, + ) + with pytest.raises(aio.AioRpcError): + await async_client_call( + "predict_file", + channel=channel, + data={"file": pb.File(kind=123, content=fb)}, # type: ignore (testing exception) + assert_code=grpc.StatusCode.INVALID_ARGUMENT, + ) + await async_client_call( + "predict_file", + channel=channel, + data={"file": pb.File(kind=pb.File.FILE_TYPE_PDF, content=fb)}, + assert_code=grpc.StatusCode.INVALID_ARGUMENT, + ) + await async_client_call( + "predict_file", + channel=channel, + data={"text": wrappers_pb2.StringValue(value="asdf")}, + assert_code=grpc.StatusCode.FAILED_PRECONDITION, + ) + + +def assert_image( + resp: pb.Response | pb.Part, + assert_kind: pb.File.FileType.ValueType, + im_file: str | ext.NpNDArray, +) -> bool: + fio = io.BytesIO(resp.file.content) + fio.name = "test.bmp" + img = PILImage.open(fio) + a1 = np.array(img) + if LazyType["ext.NpNDArray"]("numpy.ndarray").isinstance(im_file): + a2 = PILImage.fromarray(im_file) + else: + assert isinstance(im_file, str) + a2 = PILImage.open(im_file) + try: + assert resp.file.kind == assert_kind + np.testing.assert_array_almost_equal(a1, np.array(a2)) + return True + except AssertionError: + traceback.print_exc() + return False + + +@pytest.mark.asyncio +async def test_image(host: str, img_file: str): + with open(str(img_file), "rb") as f: + fb = f.read() + + async with create_channel(host) as channel: + await async_client_call( + "echo_image", + channel=channel, + data={"_internal_bytes_contents": fb}, + assert_data=partial( + assert_image, im_file=img_file, assert_kind=pb.File.FILE_TYPE_BMP + ), + ) + await async_client_call( + "echo_image", + channel=channel, + data={"file": pb.File(kind=pb.File.FILE_TYPE_BMP, content=fb)}, + assert_data=partial( + assert_image, im_file=img_file, assert_kind=pb.File.FILE_TYPE_BMP + ), + ) + with pytest.raises(aio.AioRpcError): + await async_client_call( + "echo_image", + channel=channel, + data={"file": pb.File(kind=123, content=fb)}, # type: ignore (testing exception) + assert_code=grpc.StatusCode.INVALID_ARGUMENT, + ) + await async_client_call( + "echo_image", + channel=channel, + data={"file": pb.File(kind=pb.File.FILE_TYPE_PDF, content=fb)}, + assert_code=grpc.StatusCode.INVALID_ARGUMENT, + ) + await async_client_call( + "echo_image", + channel=channel, + data={"text": wrappers_pb2.StringValue(value="asdf")}, + assert_code=grpc.StatusCode.FAILED_PRECONDITION, + ) + + +@pytest.mark.asyncio +async def test_pandas(host: str): + async with create_channel(host) as channel: + await async_client_call( + "echo_dataframe", + channel=channel, + data={ + "dataframe": pb.DataFrame( + column_names=[ + str(i) for i in pd.RangeIndex(0, 3, 1, dtype=np.int64).tolist() + ], + columns=[ + pb.Series(int32_values=[1]), + pb.Series(int32_values=[2]), + pb.Series(int32_values=[3]), + ], + ), + }, + ) + await async_client_call( + "echo_dataframe_from_sample", + channel=channel, + data={ + "dataframe": pb.DataFrame( + column_names=["age", "height", "weight"], + columns=[ + pb.Series(int64_values=[12, 23]), + pb.Series(int64_values=[40, 83]), + pb.Series(int64_values=[32, 89]), + ], + ), + }, + ) + await async_client_call( + "double_dataframe", + channel=channel, + data={ + "dataframe": pb.DataFrame( + column_names=["col1"], + columns=[pb.Series(int64_values=[23])], + ), + }, + assert_data=lambda resp: resp.dataframe # type: ignore (bad lambda types) + == pb.DataFrame( + column_names=["col1"], + columns=[pb.Series(int64_values=[46])], + ), + ) + with pytest.raises(aio.AioRpcError): + await async_client_call( + "echo_dataframe", + channel=channel, + data={ + "dataframe": pb.DataFrame( + column_names=["col1"], + columns=[pb.Series(int64_values=[23], int32_values=[23])], + ), + }, + assert_code=grpc.StatusCode.INVALID_ARGUMENT, + ) + + +def assert_multi_images(resp: pb.Response, method: str, im_file: str) -> bool: + assert method == "pred_multi_images" + img = PILImage.open(im_file) + arr = np.array(img) + expected = arr * arr + return assert_image( + resp.multipart["result"], + assert_kind=pb.File.FILE_TYPE_BMP, + im_file=expected, + ) + + +@pytest.mark.asyncio +async def test_multipart(host: str, img_file: str): + with open(str(img_file), "rb") as f: + fb = f.read() + + async with create_channel(host) as channel: + await async_client_call( + "predict_multi_images", + channel=channel, + data={ + "multipart": { + "original": pb.Part( + file=pb.File(kind=pb.File.FILE_TYPE_BMP, content=fb) + ), + "compared": pb.Part( + file=pb.File(kind=pb.File.FILE_TYPE_BMP, content=fb) + ), + } + }, + assert_data=partial( + assert_multi_images, method="pred_multi_images", im_file=img_file + ), + ) diff --git a/tests/e2e/bento_server_grpc/tests/test_meta.py b/tests/e2e/bento_server_grpc/tests/test_meta.py new file mode 100644 index 00000000000..cb6efd23f2d --- /dev/null +++ b/tests/e2e/bento_server_grpc/tests/test_meta.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import typing as t + +import pytest +from grpc import aio +from grpc_health.v1 import health_pb2 as pb_health +from google.protobuf import wrappers_pb2 + +from bentoml.testing.grpc import create_channel +from bentoml.grpc.v1alpha1 import service_pb2 as pb +from bentoml.grpc.v1alpha1 import service_test_pb2 as pb_test +from bentoml.grpc.v1alpha1 import service_test_pb2_grpc as services_test +from bentoml.testing.grpc.interceptors import AssertClientInterceptor + + +@pytest.mark.asyncio +async def test_success_invocation_custom_servicer(host: str) -> None: + async with create_channel(host) as channel: + Check = channel.unary_unary( + "/grpc.health.v1.Health/Check", + request_serializer=pb_health.HealthCheckRequest.SerializeToString, # type: ignore (no grpc_health type) + response_deserializer=pb_health.HealthCheckResponse.FromString, # type: ignore (no grpc_health type) + ) + hc_resp = await t.cast( + t.Awaitable[pb_health.HealthCheckResponse], + Check( + pb_health.HealthCheckRequest( + service="bentoml.testing.v1alpha1.TestService" + ) + ), + ) + assert hc_resp.status == pb_health.HealthCheckResponse.SERVING # type: ignore ( no generated enum types) + stub = services_test.TestServiceStub(channel) # type: ignore (no async types) + request = pb_test.ExecuteRequest(input="BentoML") + resp: pb_test.ExecuteResponse = await stub.Execute(request) + assert resp.output == "Hello, BentoML!" + + +@pytest.mark.asyncio +async def test_trailing_metadata_interceptors(host: str) -> None: + async with create_channel( + host, + interceptors=[ + AssertClientInterceptor( + assert_trailing_metadata=aio.Metadata.from_tuple( + (("usage", "NLP"), ("accuracy_score", "0.8247")) + ) + ) + ], + ) as channel: + Call = channel.unary_unary( + "/bentoml.grpc.v1alpha1.BentoService/Call", + request_serializer=pb.Request.SerializeToString, + response_deserializer=pb.Response.FromString, + ) + await t.cast( + t.Awaitable[pb.Request], + Call( + pb.Request( + api_name="bonjour", text=wrappers_pb2.StringValue(value="BentoML") + ) + ), + ) diff --git a/tests/e2e/bento_server_grpc/tracing.yml b/tests/e2e/bento_server_grpc/tracing.yml new file mode 100644 index 00000000000..e4981f2207d --- /dev/null +++ b/tests/e2e/bento_server_grpc/tracing.yml @@ -0,0 +1,3 @@ +# tracing: +# type: in_memory # used for testing +# sample_rate: 1.0 diff --git a/tests/e2e/bento_server_general_features/bentofile.yaml b/tests/e2e/bento_server_http/bentofile.yaml similarity index 100% rename from tests/e2e/bento_server_general_features/bentofile.yaml rename to tests/e2e/bento_server_http/bentofile.yaml diff --git a/tests/e2e/bento_server_http/configs/cors_enabled.yml b/tests/e2e/bento_server_http/configs/cors_enabled.yml new file mode 100644 index 00000000000..bc2ca106a3b --- /dev/null +++ b/tests/e2e/bento_server_http/configs/cors_enabled.yml @@ -0,0 +1,10 @@ +api_server: + http: + cors: # standard: https://fetch.spec.whatwg.org/#http-cors-protocol + enabled: True + access_control_allow_origin: "*" + access_control_allow_methods: ["GET", "OPTIONS", "POST", "HEAD", "PUT"] + access_control_allow_credentials: True + access_control_allow_headers: Null + access_control_max_age: Null + access_control_expose_headers: ["Content-Length"] diff --git a/tests/e2e/bento_server_http/configs/default.yml b/tests/e2e/bento_server_http/configs/default.yml new file mode 100644 index 00000000000..bbfaa3aa799 --- /dev/null +++ b/tests/e2e/bento_server_http/configs/default.yml @@ -0,0 +1,4 @@ +api_server: + http: + cors: # standard: https://fetch.spec.whatwg.org/#http-cors-protocol + enabled: False diff --git a/tests/e2e/bento_server_general_features/train.py b/tests/e2e/bento_server_http/configure.py similarity index 82% rename from tests/e2e/bento_server_general_features/train.py rename to tests/e2e/bento_server_http/configure.py index 8fd5a683394..ccb5569ebb0 100644 --- a/tests/e2e/bento_server_general_features/train.py +++ b/tests/e2e/bento_server_http/configure.py @@ -1,11 +1,11 @@ import pickle_model -import bentoml.picklable_model +import bentoml -def train(): +def create_model(): bentoml.picklable_model.save_model( - "py_model.case-1.e2e", + "py_model.case-1.http.e2e", pickle_model.PickleModel(), signatures={ "predict_file": {"batchable": True}, @@ -18,7 +18,3 @@ def train(): }, external_modules=[pickle_model], ) - - -if __name__ == "__main__": - train() diff --git a/tests/e2e/bento_server_general_features/pickle_model.py b/tests/e2e/bento_server_http/pickle_model.py similarity index 57% rename from tests/e2e/bento_server_general_features/pickle_model.py rename to tests/e2e/bento_server_http/pickle_model.py index 4d6627b0ca2..b91ea9bac1e 100644 --- a/tests/e2e/bento_server_general_features/pickle_model.py +++ b/tests/e2e/bento_server_http/pickle_model.py @@ -1,10 +1,16 @@ +from __future__ import annotations + import typing as t +from typing import TYPE_CHECKING import numpy as np import pandas as pd -from bentoml._internal.types import FileLike -from bentoml._internal.types import JSONSerializable +if TYPE_CHECKING: + from numpy.typing import NDArray + + from bentoml._internal.types import FileLike + from bentoml._internal.types import JSONSerializable class PickleModel: @@ -19,29 +25,21 @@ def echo_json(cls, input_datas: JSONSerializable) -> JSONSerializable: def echo_obj(cls, input_datas: t.Any) -> t.Any: return input_datas - def echo_multi_ndarray( - self, - *input_arr: "np.ndarray[t.Any, np.dtype[t.Any]]", - ) -> t.Tuple["np.ndarray[t.Any, np.dtype[t.Any]]", ...]: + def echo_multi_ndarray(self, *input_arr: NDArray[t.Any]) -> tuple[NDArray[t.Any]]: return input_arr - def predict_ndarray( - self, - arr: "np.ndarray[t.Any, np.dtype[t.Any]]", - ) -> "np.ndarray[t.Any, np.dtype[t.Any]]": + def predict_ndarray(self, arr: NDArray[t.Any]) -> NDArray[t.Any]: assert isinstance(arr, np.ndarray) return arr * 2 def predict_multi_ndarray( - self, - arr1: "np.ndarray[t.Any, np.dtype[t.Any]]", - arr2: "np.ndarray[t.Any, np.dtype[t.Any]]", - ) -> "np.ndarray[t.Any, np.dtype[t.Any]]": + self, arr1: NDArray[t.Any], arr2: NDArray[t.Any] + ) -> NDArray[t.Any]: assert isinstance(arr1, np.ndarray) assert isinstance(arr2, np.ndarray) return (arr1 + arr2) // 2 - def predict_dataframe(self, df: "pd.DataFrame") -> "pd.DataFrame": + def predict_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: assert isinstance(df, pd.DataFrame) output = df[["col1"]] * 2 # type: ignore assert isinstance(output, pd.DataFrame) diff --git a/tests/e2e/bento_server_general_features/requirements.txt b/tests/e2e/bento_server_http/requirements.txt similarity index 86% rename from tests/e2e/bento_server_general_features/requirements.txt rename to tests/e2e/bento_server_http/requirements.txt index 108615ac405..102c584db9b 100644 --- a/tests/e2e/bento_server_general_features/requirements.txt +++ b/tests/e2e/bento_server_http/requirements.txt @@ -1,6 +1,6 @@ pandas pydantic -pillow +Pillow scikit-learn pyarrow fastapi diff --git a/tests/e2e/bento_server_general_features/service.py b/tests/e2e/bento_server_http/service.py similarity index 74% rename from tests/e2e/bento_server_general_features/service.py rename to tests/e2e/bento_server_http/service.py index f50b31fe5db..b6acaf3e9a7 100644 --- a/tests/e2e/bento_server_general_features/service.py +++ b/tests/e2e/bento_server_http/service.py @@ -1,29 +1,38 @@ +from __future__ import annotations + import typing as t +from typing import TYPE_CHECKING import numpy as np import pandas as pd import pydantic from PIL.Image import Image as PILImage from PIL.Image import fromarray +from starlette.requests import Request import bentoml -import bentoml.picklable_model from bentoml.io import File from bentoml.io import JSON from bentoml.io import Image from bentoml.io import Multipart from bentoml.io import NumpyNdarray from bentoml.io import PandasDataFrame -from bentoml._internal.types import FileLike -from bentoml._internal.types import JSONSerializable -py_model = bentoml.picklable_model.get("py_model.case-1.e2e").to_runner() +if TYPE_CHECKING: + from numpy.typing import NDArray + from starlette.types import Send + from starlette.types import Scope + from starlette.types import ASGIApp + from starlette.types import Receive + from bentoml._internal.types import FileLike + from bentoml._internal.types import JSONSerializable -svc = bentoml.Service( - name="general_workflow_service.case-1.e2e", - runners=[py_model], -) + +py_model = bentoml.picklable_model.get("py_model.case-1.http.e2e").to_runner() + + +svc = bentoml.Service(name="general_http_service.case-1.e2e", runners=[py_model]) @svc.api(input=JSON(), output=JSON()) @@ -38,13 +47,13 @@ def echo_json_sync(json_obj: JSONSerializable) -> JSONSerializable: return batch_ret[0] -class _Schema(pydantic.BaseModel): +class ValidateSchema(pydantic.BaseModel): name: str endpoints: t.List[str] @svc.api( - input=JSON(pydantic_model=_Schema), + input=JSON(pydantic_model=ValidateSchema), output=JSON(), ) async def echo_json_enforce_structure(json_obj: JSONSerializable) -> JSONSerializable: @@ -61,9 +70,7 @@ async def echo_obj(obj: JSONSerializable) -> JSONSerializable: input=NumpyNdarray(shape=(2, 2), enforce_shape=True), output=NumpyNdarray(shape=(2, 2)), ) -async def predict_ndarray_enforce_shape( - inp: "np.ndarray[t.Any, np.dtype[t.Any]]", -) -> "np.ndarray[t.Any, np.dtype[t.Any]]": +async def predict_ndarray_enforce_shape(inp: NDArray[t.Any]) -> NDArray[t.Any]: assert inp.shape == (2, 2) return await py_model.predict_ndarray.async_run(inp) @@ -72,9 +79,7 @@ async def predict_ndarray_enforce_shape( input=NumpyNdarray(dtype="uint8", enforce_dtype=True), output=NumpyNdarray(dtype="str"), ) -async def predict_ndarray_enforce_dtype( - inp: "np.ndarray[t.Any, np.dtype[t.Any]]", -) -> "np.ndarray[t.Any, np.dtype[t.Any]]": +async def predict_ndarray_enforce_dtype(inp: NDArray[t.Any]) -> NDArray[t.Any]: assert inp.dtype == np.dtype("uint8") return await py_model.predict_ndarray.async_run(inp) @@ -94,7 +99,7 @@ async def predict_ndarray_multi_output( input=PandasDataFrame(dtype={"col1": "int64"}, orient="records"), output=PandasDataFrame(), ) -async def predict_dataframe(df: "pd.DataFrame") -> "pd.DataFrame": +async def predict_dataframe(df: pd.DataFrame) -> pd.DataFrame: assert df["col1"].dtype == "int64" output = await py_model.predict_dataframe.async_run(df) dfo = pd.DataFrame() @@ -110,18 +115,16 @@ async def predict_file(f: FileLike[bytes]) -> bytes: @svc.api(input=Image(), output=Image(mime_type="image/bmp")) -async def echo_image(f: PILImage) -> "np.ndarray[t.Any, np.dtype[t.Any]]": +async def echo_image(f: PILImage) -> NDArray[t.Any]: assert isinstance(f, PILImage) - return np.array(f) # type: ignore[arg-type] + return np.array(f) @svc.api( input=Multipart(original=Image(), compared=Image()), output=Multipart(img1=Image(), img2=Image()), ) -async def predict_multi_images( - original: t.Dict[str, Image], compared: t.Dict[str, Image] -): +async def predict_multi_images(original: dict[str, Image], compared: dict[str, Image]): output_array = await py_model.predict_multi_ndarray.async_run( np.array(original), np.array(compared) ) @@ -130,13 +133,6 @@ async def predict_multi_images( # customise the service -from starlette.types import Send -from starlette.types import Scope -from starlette.types import ASGIApp -from starlette.types import Receive -from starlette.requests import Request - - class AllowPingMiddleware: def __init__( self, @@ -154,7 +150,7 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: return -svc.add_asgi_middleware(AllowPingMiddleware) # type: ignore[arg-type] +svc.add_asgi_middleware(AllowPingMiddleware) # type: ignore (hint not yet supported for hooks) from fastapi import FastAPI diff --git a/tests/e2e/bento_server_http/tests/conftest.py b/tests/e2e/bento_server_http/tests/conftest.py new file mode 100644 index 00000000000..7807a7f21e9 --- /dev/null +++ b/tests/e2e/bento_server_http/tests/conftest.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import os +import typing as t +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from contextlib import ExitStack + + from _pytest.fixtures import FixtureRequest as _PytestFixtureRequest + + class FixtureRequest(_PytestFixtureRequest): + param: str + + +PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + + +@pytest.fixture( + name="server_config_file", + params=["default.yml", "cors_enabled.yml"], + scope="session", +) +def fixture_server_config_file(request: FixtureRequest) -> str: + return os.path.join(PROJECT_DIR, "configs", request.param) + + +@pytest.fixture(scope="module") +def host( + bentoml_home: str, + deployment_mode: str, + server_config_file: str, + clean_context: ExitStack, +) -> t.Generator[str, None, None]: + from bentoml.testing.server import host_bento + + with host_bento( + "service:svc", + config_file=server_config_file, + deployment_mode=deployment_mode, + bentoml_home=bentoml_home, + clean_context=clean_context, + ) as _host: + yield _host diff --git a/tests/e2e/bento_server_general_features/tests/test_io.py b/tests/e2e/bento_server_http/tests/test_io.py similarity index 89% rename from tests/e2e/bento_server_general_features/tests/test_io.py rename to tests/e2e/bento_server_http/tests/test_io.py index de85ae2149f..ff5b9cd71d9 100644 --- a/tests/e2e/bento_server_general_features/tests/test_io.py +++ b/tests/e2e/bento_server_http/tests/test_io.py @@ -1,8 +1,9 @@ -# type: ignore[no-untyped-def] +from __future__ import annotations import io import sys import json +from typing import TYPE_CHECKING import numpy as np import pytest @@ -11,9 +12,16 @@ from bentoml.testing.utils import async_request from bentoml.testing.utils import parse_multipart_form +if TYPE_CHECKING: + import PIL.Image as PILImage +else: + from bentoml._internal.utils import LazyLoader + + PILImage = LazyLoader("PILImage", globals(), "PIL.Image") + @pytest.mark.asyncio -async def test_numpy(host): +async def test_numpy(host: str): await async_request( "POST", f"http://{host}/predict_ndarray_enforce_shape", @@ -55,7 +63,7 @@ async def test_numpy(host): @pytest.mark.asyncio -async def test_json(host): +async def test_json(host: str): ORIGIN = "http://bentoml.ai" await async_request( @@ -87,7 +95,7 @@ async def test_json(host): @pytest.mark.asyncio -async def test_obj(host): +async def test_obj(host: str): for obj in [1, 2.2, "str", [1, 2, 3], {"a": 1, "b": 2}]: obj_str = json.dumps(obj, separators=(",", ":")) await async_request( @@ -101,7 +109,7 @@ async def test_obj(host): @pytest.mark.asyncio -async def test_pandas(host): +async def test_pandas(host: str): import pandas as pd ORIGIN = "http://bentoml.ai" @@ -139,7 +147,7 @@ async def test_pandas(host): @pytest.mark.asyncio -async def test_file(host, bin_file): +async def test_file(host: str, bin_file: str): # Test File as binary with open(str(bin_file), "rb") as f: b = f.read() @@ -174,9 +182,7 @@ async def test_file(host, bin_file): @pytest.mark.asyncio -async def test_image(host, img_file): - import PIL.Image - +async def test_image(host: str, img_file: str): with open(str(img_file), "rb") as f1: img_bytes = f1.read() @@ -191,11 +197,11 @@ async def test_image(host, img_file): bio = io.BytesIO(body) bio.name = "test.bmp" - img = PIL.Image.open(bio) + img = PILImage.open(bio) array1 = np.array(img) - array2 = PIL.Image.open(img_file) + array2 = PILImage.open(img_file) - np.testing.assert_array_almost_equal(array1, array2) + np.testing.assert_array_almost_equal(array1, np.array(array2)) await async_request( "POST", @@ -219,10 +225,8 @@ async def test_image(host, img_file): # SklearnRunner is not suppose to take multiple arguments # TODO: move e2e tests to use a new bentoml.PickleModel module -@pytest.mark.skip @pytest.mark.asyncio -async def test_multipart_image_io(host, img_file): - import PIL.Image +async def test_multipart_image_io(host: str, img_file: str): from starlette.datastructures import UploadFile with open(img_file, "rb") as f1: @@ -242,5 +246,5 @@ async def test_multipart_image_io(host, img_file): form = await parse_multipart_form(headers=headers, body=body) for _, v in form.items(): assert isinstance(v, UploadFile) - img = PIL.Image.open(v.file) + img = PILImage.open(v.file) assert np.array(img).shape == (10, 10, 3) diff --git a/tests/e2e/bento_server_general_features/tests/test_meta.py b/tests/e2e/bento_server_http/tests/test_meta.py similarity index 90% rename from tests/e2e/bento_server_general_features/tests/test_meta.py rename to tests/e2e/bento_server_http/tests/test_meta.py index 48c2fd35c6c..9b3aab75e26 100644 --- a/tests/e2e/bento_server_general_features/tests/test_meta.py +++ b/tests/e2e/bento_server_http/tests/test_meta.py @@ -1,5 +1,8 @@ # pylint: disable=redefined-outer-name -# type: ignore[no-untyped-def] + +from __future__ import annotations + +from pathlib import Path import pytest @@ -45,7 +48,11 @@ async def test_cors(host: str, server_config_file: str) -> None: "Access-Control-Request-Headers": "Content-Type", }, ) - if server_config_file == "server_config_cors_enabled.yml": + + # all test configs lives under ../configs, but we are only interested in name. + fname = Path(server_config_file).name + + if fname == "cors_enabled.yml": assert status == 200 else: assert status != 200 @@ -56,7 +63,7 @@ async def test_cors(host: str, server_config_file: str) -> None: headers={"Content-Type": "application/json", "Origin": ORIGIN}, data='"hi"', ) - if server_config_file == "server_config_cors_enabled.yml": + if fname == "cors_enabled.yml": assert status == 200 assert body == b'"hi"' assert headers["Access-Control-Allow-Origin"] in ("*", ORIGIN) @@ -69,10 +76,10 @@ async def test_cors(host: str, server_config_file: str) -> None: def test_service_init_checks(): - py_model1 = bentoml.picklable_model.get("py_model.case-1.e2e").to_runner( + py_model1 = bentoml.picklable_model.get("py_model.case-1.http.e2e").to_runner( name="invalid" ) - py_model2 = bentoml.picklable_model.get("py_model.case-1.e2e").to_runner( + py_model2 = bentoml.picklable_model.get("py_model.case-1.http.e2e").to_runner( name="invalid" ) with pytest.raises(ValueError) as excinfo: @@ -85,13 +92,13 @@ def test_service_init_checks(): def test_dunder_string(): - runner = bentoml.picklable_model.get("py_model.case-1.e2e").to_runner() + runner = bentoml.picklable_model.get("py_model.case-1.http.e2e").to_runner() svc = bentoml.Service(name="dunder_string", runners=[runner]) assert ( str(svc) - == 'bentoml.Service(name="dunder_string", runners=[py_model.case-1.e2e])' + == 'bentoml.Service(name="dunder_string", runners=[py_model.case-1.http.e2e])' ) diff --git a/tests/e2e/bento_server_general_features/tests/test_microbatch.py b/tests/e2e/bento_server_http/tests/test_microbatch.py similarity index 100% rename from tests/e2e/bento_server_general_features/tests/test_microbatch.py rename to tests/e2e/bento_server_http/tests/test_microbatch.py diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py new file mode 100644 index 00000000000..71dcf2c12f9 --- /dev/null +++ b/tests/e2e/conftest.py @@ -0,0 +1,173 @@ +# pylint: disable=unused-argument +from __future__ import annotations + +import os +import shutil +import typing as t +import tempfile +import contextlib +from typing import TYPE_CHECKING +from importlib import import_module + +import psutil +import pytest +from _pytest.monkeypatch import MonkeyPatch + +from bentoml.exceptions import InvalidArgument +from bentoml._internal.utils import LazyLoader +from bentoml._internal.utils import validate_or_create_dir + +if TYPE_CHECKING: + + import numpy as np + from _pytest.main import Session + from _pytest.config import ExitCode + from _pytest.python import Metafunc + from _pytest.fixtures import FixtureRequest + + class FilledFixtureRequest(FixtureRequest): + param: str + +else: + np = LazyLoader("np", globals(), "numpy") + + +def pytest_sessionstart(session: Session) -> None: + """Create a temporary directory for the BentoML home directory, then monkey patch to config.""" + from bentoml._internal.configuration.containers import BentoMLContainer + + mp = MonkeyPatch() + config = session.config + config.add_cleanup(mp.undo) + # setup test environment + _LOCAL_BUNDLE_BUILD = os.environ.get("BENTOML_BUNDLE_LOCAL_BUILD") + if _LOCAL_BUNDLE_BUILD: + # mp this previous value to session to restore to default after test session + # to avoid affecting local development. + mp.setattr( + session, + "_original_bundle_build", + _LOCAL_BUNDLE_BUILD, + raising=False, + ) + os.environ["BENTOML_BUNDLE_LOCAL_BUILD"] = "True" + os.environ["SETUPTOOLS_USE_DISTUTILS"] = "stdlib" + + _PYTEST_BENTOML_HOME = tempfile.mkdtemp("bentoml-pytest-e2e") + bentos = os.path.join(_PYTEST_BENTOML_HOME, "bentos") + models = os.path.join(_PYTEST_BENTOML_HOME, "models") + prom_dir = os.path.join(_PYTEST_BENTOML_HOME, "prometheus_multiproc_dir") + validate_or_create_dir(bentos, models, prom_dir) + # ensure we setup correct home and prometheus_multiproc_dir folders. + BentoMLContainer.bentoml_home.set(_PYTEST_BENTOML_HOME) + BentoMLContainer.prometheus_multiproc_dir.set(prom_dir) + # setup prometheus multiproc directory for tests. + _PROMETHEUS_MULTIPROC_DIR = os.environ.get("PROMETHEUS_MULTIPROC_DIR") + if _PROMETHEUS_MULTIPROC_DIR: + mp.setattr( + session, + "_original_multiproc_env", + _PROMETHEUS_MULTIPROC_DIR, + raising=False, + ) + # use the local bentoml package in development + os.environ["PROMETHEUS_MULTIPROC_DIR"] = prom_dir + + mp.setattr(config, "_bentoml_home", _PYTEST_BENTOML_HOME, raising=False) + project_dir = config.getoption("project_dir") + assert project_dir, "--project-dir is required" + try: + imported = import_module( + ".configure", + f"tests.e2e.{t.cast(str, project_dir).rstrip('/').split('/')[-1]}", + ) + if not hasattr(imported, "create_model"): + raise InvalidArgument( + "'create_model()' is required to create a test model." + ) from None + except ModuleNotFoundError: + raise ModuleNotFoundError( + f"Failed to find 'configure.py' in E2E project '{project_dir}'." + ) from None + else: + imported.create_model() + + +def pytest_sessionfinish(session: Session, exitstatus: int | ExitCode) -> None: + config = session.config + if hasattr(session, "_original_bundle_build"): + os.environ["BENTOML_BUNDLE_LOCAL_BUILD"] = session._original_bundle_build # type: ignore (dynamic patch) + else: + os.environ.pop("BENTOML_BUNDLE_LOCAL_BUILD") + if hasattr(session, "_original_multiproc_env"): + os.environ["PROMETHEUS_MULTIPROC_DIR"] = session._original_multiproc_env # type: ignore (dynamic patch) + else: + os.environ.pop("PROMETHEUS_MULTIPROC_DIR") + if config.getoption("cleanup"): + from bentoml._internal.configuration.containers import BentoMLContainer + + # reset BentoMLContainer.bentoml_home + BentoMLContainer.bentoml_home.reset() + # Set dynamically by pytest_configure() above. + shutil.rmtree(config._bentoml_home) # type: ignore (dynamic patch) + + +def pytest_addoption(parser: pytest.Parser): + parser.addoption("--project-dir", action="store", default=None) + parser.addoption("--cleanup", action="store_true") + + +def pytest_generate_tests(metafunc: Metafunc): + if "deployment_mode" in metafunc.fixturenames: + if os.getenv("VSCODE_IPC_HOOK_CLI") and not os.getenv("GITHUB_CODESPACE_TOKEN"): + # When running inside VSCode remote container locally, we don't have access to + # exposed reserved ports, so we can't run docker-based tests. However on GitHub + # Codespaces, we can run docker-based tests. (Investigate why this is the case) + # Note that inside the remote container, it is already running as a Linux container. + deployment_mode = ["distributed", "standalone"] + else: + if os.environ.get("GITHUB_ACTIONS") and (psutil.WINDOWS or psutil.MACOS): + # Due to GitHub Actions' limitation, we can't run docker-based tests + # on Windows and macOS. However, we can still running those tests on + # local development. + if psutil.MACOS: + deployment_mode = ["distributed", "standalone"] + else: + deployment_mode = ["standalone"] + else: + if psutil.WINDOWS: + deployment_mode = ["standalone", "docker"] + else: + deployment_mode = ["distributed", "standalone", "docker"] + metafunc.parametrize("deployment_mode", deployment_mode, scope="session") + + +@pytest.fixture(scope="session") +def bentoml_home(request: FixtureRequest) -> str: + # Set dynamically by pytest_configure() above. + return request.config._bentoml_home # type: ignore (dynamic patch) + + +@pytest.fixture(scope="session", autouse=True) +def clean_context() -> t.Generator[contextlib.ExitStack, None, None]: + stack = contextlib.ExitStack() + yield stack + stack.close() + + +@pytest.fixture() +def img_file(tmpdir: str) -> str: + from PIL.Image import fromarray + + img_file_ = tmpdir.join("test_img.bmp") + img = fromarray(np.random.randint(255, size=(10, 10, 3)).astype("uint8")) + img.save(str(img_file_)) + return str(img_file_) + + +@pytest.fixture() +def bin_file(tmpdir: str) -> str: + bin_file_ = tmpdir.join("bin_file.bin") + with open(bin_file_, "wb") as of: + of.write("â".encode("gb18030")) + return str(bin_file_) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index d426aafe6d1..ba54fc20abe 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,4 +1,5 @@ -import typing as t +from __future__ import annotations + import tempfile from typing import TYPE_CHECKING @@ -7,15 +8,13 @@ from bentoml._internal.models import ModelStore if TYPE_CHECKING: + from _pytest.main import Session from _pytest.nodes import Item from _pytest.config import Config from _pytest.config.argparsing import Parser -def pytest_addoption(parser: "Parser") -> None: - parser.addoption( - "--runslow", action="store_true", default=False, help="run slow tests" - ) +def pytest_addoption(parser: Parser) -> None: parser.addoption( "--gpus", action="store_true", default=False, help="run gpus related tests" ) @@ -27,7 +26,7 @@ def pytest_addoption(parser: "Parser") -> None: ) -def pytest_collection_modifyitems(config: "Config", items: t.List["Item"]) -> None: +def pytest_collection_modifyitems(config: Config, items: list[Item]) -> None: if config.getoption("--disable-tf-eager-execution"): try: from tensorflow.python.framework.ops import disable_eager_execution @@ -47,8 +46,8 @@ def pytest_collection_modifyitems(config: "Config", items: t.List["Item"]) -> No item.add_marker(requires_eager_execution) -def pytest_sessionstart(session): - path = tempfile.mkdtemp("bentoml-pytest") +def pytest_sessionstart(session: Session): # pylint: disable=unused-argument + path = tempfile.mkdtemp("bentoml-pytest-unit") from bentoml._internal.configuration.containers import BentoMLContainer BentoMLContainer.model_store.set(ModelStore(path)) diff --git a/tests/unit/_internal/bento/test_bento.py b/tests/unit/_internal/bento/test_bento.py index 99086f32b7a..38ad36969db 100644 --- a/tests/unit/_internal/bento/test_bento.py +++ b/tests/unit/_internal/bento/test_bento.py @@ -1,3 +1,4 @@ +# pylint: disable=unused-argument from __future__ import annotations import os @@ -120,7 +121,12 @@ def test_bento_info(tmpdir: Path): extra_index_url: null pip_args: null wheels: null - extras_require: null + components: + tracing: false + grpc: false + zipkin: false + jaeger: false + otlp: false conda: environment_yml: null channels: null @@ -146,10 +152,7 @@ def build_test_bento(model_store: ModelStore) -> Bento: bento_cfg = BentoBuildConfig( "simplebento.py:svc", include=["*.py", "config.json", "somefile", "*dir*", ".bentoignore"], - exclude=[ - "*.storage", - "/somefile", - ], + exclude=["*.storage", "/somefile", "/subdir2"], conda={ "environment_yml": "./environment.yaml", }, @@ -335,6 +338,7 @@ def test_bento(dummy_model_store: ModelStore): "src", "env", } + print(bento_fs.listdir("src")) assert set(bento_fs.listdir("src")) == { "simplebento.py", "subdir", diff --git a/tests/unit/_internal/io/conftest.py b/tests/unit/_internal/io/conftest.py new file mode 100644 index 00000000000..cbc398551d3 --- /dev/null +++ b/tests/unit/_internal/io/conftest.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from _pytest.python import Metafunc + + +def pytest_generate_tests(metafunc: Metafunc) -> None: + if "use_internal_bytes_contents" in metafunc.fixturenames: + metafunc.parametrize("use_internal_bytes_contents", [True, False]) + + +@pytest.fixture() +def img_file(tmpdir: str) -> str: + import numpy as np + from PIL.Image import fromarray + + img_file_ = tmpdir.join("test_img.bmp") + img = fromarray(np.random.randint(255, size=(10, 10, 3)).astype("uint8")) + img.save(str(img_file_)) + return str(img_file_) + + +@pytest.fixture() +def bin_file(tmpdir: str) -> str: + bin_file_ = tmpdir.join("bin_file.bin") + with open(bin_file_, "wb") as of: + of.write("â".encode("gb18030")) + return str(bin_file_) diff --git a/tests/unit/_internal/io/test_file.py b/tests/unit/_internal/io/test_file.py index eebb7d6cc65..a4e9c122c0b 100644 --- a/tests/unit/_internal/io/test_file.py +++ b/tests/unit/_internal/io/test_file.py @@ -1,8 +1,19 @@ from __future__ import annotations +import io +from typing import TYPE_CHECKING + import pytest from bentoml.io import File +from bentoml.exceptions import BadInput + +if TYPE_CHECKING: + from bentoml.grpc.v1alpha1 import service_pb2 as pb +else: + from bentoml.grpc.utils import import_generated_stubs + + pb, _ = import_generated_stubs() def test_file_openapi_schema(): @@ -27,3 +38,71 @@ def test_file_openapi_request_responses(mime_type: str): assert responses.content assert mime_type in responses.content + + +@pytest.mark.asyncio +async def test_from_proto(use_internal_bytes_contents: bool, bin_file: str): + with open(bin_file, "rb") as f: + content = f.read() + if use_internal_bytes_contents: + res = await File().from_proto( + content, _use_internal_bytes_contents=use_internal_bytes_contents + ) + assert res.read() == b"\x810\x899" + else: + res = await File().from_proto( + pb.Part(file=pb.File(content=content)), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert res.read() == b"\x810\x899" + + +@pytest.mark.asyncio +async def test_exception_from_proto(use_internal_bytes_contents: bool): + with pytest.raises(AssertionError): + await File().from_proto( + pb.NDArray(string_values="asdf"), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + await File().from_proto( + "", _use_internal_bytes_contents=use_internal_bytes_contents + ) + with pytest.raises(BadInput) as exc_info: + await File(mime_type="multipart/form-data").from_proto(pb.File()) + assert "which is not allowed to use with 'from_proto'" in str(exc_info.value) + if not use_internal_bytes_contents: + with pytest.raises(BadInput) as exc_info: + await File(mime_type="image/jpeg").from_proto( + pb.File(kind=pb.File.FILE_TYPE_BYTES, content=b"asdf"), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert "Inferred mime_type from 'kind' is" in str(exc_info.value) + with pytest.raises(BadInput) as exc_info: + await File(mime_type="image/jpeg").from_proto( + pb.File(kind=123, content=b"asdf"), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert "is not a valid File kind." in str(exc_info.value) + with pytest.raises(BadInput) as exc_info: + await File(mime_type="image/jpeg").from_proto( + pb.File(kind=pb.File.FILE_TYPE_JPEG), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert "Content is empty!" == str(exc_info.value) + + +@pytest.mark.asyncio +async def test_exception_to_proto(): + with pytest.raises(BadInput) as exc_info: + await File(mime_type="multipart/form-data").to_proto(io.BytesIO(b"asdf")) + assert "which is not allowed to use with 'to_proto'" in str(exc_info.value) + with pytest.raises(BadInput) as exc_info: + await File(mime_type="application/bentoml.vnd").to_proto(io.BytesIO(b"asdf")) + assert "doesn't have a corresponding File 'kind'" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_to_proto() -> None: + assert await File(mime_type="image/bmp").to_proto(io.BytesIO(b"asdf")) == pb.File( + kind=pb.File.FILE_TYPE_BMP, content=b"asdf" + ) diff --git a/tests/unit/_internal/io/test_image.py b/tests/unit/_internal/io/test_image.py index f16806b0d84..f49bea00083 100644 --- a/tests/unit/_internal/io/test_image.py +++ b/tests/unit/_internal/io/test_image.py @@ -1,9 +1,36 @@ from __future__ import annotations +import io +from typing import TYPE_CHECKING + import pytest from bentoml.io import Image +from bentoml.exceptions import BadInput from bentoml.exceptions import InvalidArgument +from bentoml.exceptions import InternalServerError + +if TYPE_CHECKING: + import numpy as np + import PIL.Image as PILImage + + from bentoml.grpc.v1alpha1 import service_pb2 as pb +else: + from bentoml.grpc.utils import import_generated_stubs + from bentoml._internal.utils import LazyLoader + + pb, _ = import_generated_stubs() + np = LazyLoader("np", globals(), "numpy") + PILImage = LazyLoader("PILImage", globals(), "PIL.Image") + + +def test_invalid_init(): + with pytest.raises(InvalidArgument) as exc_info: + Image(mime_type="application/vnd.bentoml+json") + assert "Invalid Image mime_type" in str(exc_info.value) + with pytest.raises(InvalidArgument) as exc_info: + Image(pilmode="asdf") + assert "Invalid Image pilmode" in str(exc_info.value) def test_image_openapi_schema(): @@ -31,3 +58,73 @@ def test_image_openapi_request_responses(mime_type: str): assert responses.content assert mime_type in responses.content + + +@pytest.mark.asyncio +async def test_from_proto(use_internal_bytes_contents: bool, img_file: str): + with open(img_file, "rb") as f: + content = f.read() + if use_internal_bytes_contents: + res = await Image(mime_type="image/bmp").from_proto( + content, _use_internal_bytes_contents=use_internal_bytes_contents + ) + assert_file = PILImage.open(img_file) + np.testing.assert_array_almost_equal(np.array(res), np.array(assert_file)) + else: + res = await Image(mime_type="image/bmp").from_proto( + pb.Part(file=pb.File(kind=pb.File.FILE_TYPE_BMP, content=content)), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert_file = PILImage.open(img_file) + np.testing.assert_array_almost_equal(np.array(res), np.array(assert_file)) + + +@pytest.mark.asyncio +async def test_exception_from_proto(use_internal_bytes_contents: bool): + with pytest.raises(AssertionError): + await Image().from_proto( + pb.NDArray(string_values="asdf"), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + await Image().from_proto( + "", _use_internal_bytes_contents=use_internal_bytes_contents + ) + if not use_internal_bytes_contents: + with pytest.raises(BadInput) as exc_info: + await Image(mime_type="image/jpeg").from_proto( + pb.File(kind=pb.File.FILE_TYPE_BYTES, content=b"asdf"), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert "Inferred mime_type from 'kind' is" in str(exc_info.value) + with pytest.raises(BadInput) as exc_info: + await Image(mime_type="image/jpeg").from_proto( + pb.File(kind=123, content=b"asdf"), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert "is not a valid File kind." in str(exc_info.value) + with pytest.raises(BadInput) as exc_info: + await Image(mime_type="image/jpeg").from_proto( + pb.File(kind=pb.File.FILE_TYPE_JPEG), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert "Content is empty!" == str(exc_info.value) + + +@pytest.mark.asyncio +async def test_exception_to_proto(): + with pytest.raises(InternalServerError) as exc_info: + await Image().to_proto(io.BytesIO(b"asdf")) + assert "Unsupported Image type received:" in str(exc_info.value) + with pytest.raises(BadInput) as exc_info: + example = np.random.rand(255, 255, 3) + await Image(mime_type="image/sgi").to_proto(example) + assert "doesn't have a corresponding File 'kind'" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_to_proto(img_file: str) -> None: + with open(img_file, "rb") as f: + content = f.read() + img = PILImage.open(io.BytesIO(content)) + res = await Image(mime_type="image/bmp").to_proto(img) + assert res.kind == pb.File.FILE_TYPE_BMP diff --git a/tests/unit/_internal/io/test_json.py b/tests/unit/_internal/io/test_json.py index fbd40b53810..aa293b9b7ea 100644 --- a/tests/unit/_internal/io/test_json.py +++ b/tests/unit/_internal/io/test_json.py @@ -15,12 +15,23 @@ import pydantic from bentoml.io import JSON +from bentoml.exceptions import BadInput +from bentoml.exceptions import UnprocessableEntity +from bentoml._internal.utils.pkg import pkg_version_info from bentoml._internal.io_descriptors.json import DefaultJsonEncoder if TYPE_CHECKING: from _pytest.logging import LogCaptureFixture + from google.protobuf import struct_pb2 + from bentoml.grpc.v1alpha1 import service_pb2 as pb from bentoml._internal.service.openapi.specification import Schema +else: + from bentoml.grpc.utils import import_generated_stubs + from bentoml._internal.utils import LazyLoader + + pb, _ = import_generated_stubs() + struct_pb2 = LazyLoader("struct_pb2", globals(), "google.protobuf.struct_pb2") @dataclass @@ -70,6 +81,24 @@ class Config: ) +@pytest.mark.skipif( + pkg_version_info("pydantic")[0] < 2 and pkg_version_info("bentoml")[:2] <= (1, 1), + reason="Pydantic 2.x is not yet supported until official releases of Pydantic.", +) +def test_not_yet_supported_pydantic(): + with pytest.raises(UnprocessableEntity) as exc_info: + JSON(pydantic_model=Nested) + assert "pydantic 2.x is not yet supported" in str(exc_info.value) + + +def test_invalid_init(): + with pytest.raises(AssertionError) as exc_info: + JSON(pydantic_model=ExampleAttrsClass) + assert "'pydantic_model' must be a subclass of 'pydantic.BaseModel'." == str( + exc_info.value + ) + + def test_json_encoder_dataclass_like(): expected = '{"name":"test","endpoints":["predict","health"]}' assert ( @@ -172,3 +201,117 @@ def test_json_openapi_request_responses(): assert responses.content assert "application/json" in responses.content + + +@pytest.mark.asyncio +async def test_from_proto(use_internal_bytes_contents: bool): + if use_internal_bytes_contents: + res = await JSON().from_proto( + b'{"request_id": "123", "iris_features": {"sepal_len":2.34,"sepal_width":1.58, "petal_len":6.52, "petal_width":3.23}}', + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert res == { + "request_id": "123", + "iris_features": { + "sepal_len": 2.34, + "sepal_width": 1.58, + "petal_len": 6.52, + "petal_width": 3.23, + }, + } + res = await JSON(pydantic_model=BaseSchema).from_proto( + b'{"name":"test","endpoints":["predict","health"]}', + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert isinstance(res, pydantic.BaseModel) and res == BaseSchema( + name="test", endpoints=["predict", "health"] + ) + else: + res = await JSON(pydantic_model=Nested).from_proto( + struct_pb2.Value( + struct_value=struct_pb2.Struct( + fields={ + "toplevel": struct_pb2.Value(string_value="test"), + "nested": struct_pb2.Value( + struct_value=struct_pb2.Struct( + fields={ + "name": struct_pb2.Value(string_value="test"), + "endpoints": struct_pb2.Value( + list_value=struct_pb2.ListValue( + values=[ + struct_pb2.Value( + string_value="predict" + ), + struct_pb2.Value(string_value="health"), + ] + ) + ), + } + ), + ), + } + ) + ), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert isinstance(res, pydantic.BaseModel) and res == Nested( + toplevel="test", + nested=BaseSchema(name="test", endpoints=["predict", "health"]), + ) + res = await JSON().from_proto( + pb.Part(json=struct_pb2.Value(string_value='{"request_id": "123"}')), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert res == '{"request_id": "123"}' + + +@pytest.mark.asyncio +async def test_exception_from_proto(use_internal_bytes_contents: bool): + with pytest.raises(AssertionError): + await JSON().from_proto( + pb.NDArray(string_values="asdf"), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + await JSON().from_proto( + "", _use_internal_bytes_contents=use_internal_bytes_contents + ) + if not use_internal_bytes_contents: + with pytest.raises(BadInput, match="Invalid JSON input received*"): + await JSON(pydantic_model=Nested).from_proto( + struct_pb2.Value(string_value="asdf"), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + else: + with pytest.raises(BadInput, match="Invalid JSON input received*"): + await JSON(pydantic_model=Nested).from_proto( + b"", _use_internal_bytes_contents=use_internal_bytes_contents + ) + await JSON().from_proto( + b"\n?xfa", _use_internal_bytes_contents=use_internal_bytes_contents + ) + + +@pytest.mark.asyncio +async def test_exception_to_proto(): + with pytest.raises(TypeError): + await JSON().to_proto(b"asdf") + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "o", + [ + {"asdf": 1}, + ["asdf", "1"], + "asdf", + 1.0, + 1, + True, + BaseSchema(name="test", endpoints=["predict", "health"]), + np.random.rand(6, 6), + None, + ], +) +async def test_to_proto(o: t.Any) -> None: + res = await JSON().to_proto(o) + assert res and isinstance(res, struct_pb2.Value) diff --git a/tests/unit/_internal/io/test_multipart.py b/tests/unit/_internal/io/test_multipart.py index da51c22708f..8f6b7ee0b3d 100644 --- a/tests/unit/_internal/io/test_multipart.py +++ b/tests/unit/_internal/io/test_multipart.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing import TYPE_CHECKING + import pytest from bentoml.io import JSON @@ -9,9 +11,23 @@ multipart = Multipart(arg1=JSON(), arg2=Image(pilmode="RGB")) +if TYPE_CHECKING: + from google.protobuf import wrappers_pb2 + + from bentoml.grpc.v1alpha1 import service_pb2 as pb +else: + from bentoml.grpc.utils import import_generated_stubs + from bentoml._internal.utils import LazyLoader + + pb, _ = import_generated_stubs() + wrappers_pb2 = LazyLoader("wrappers_pb2", globals(), "google.protobuf.wrappers_pb2") + def test_invalid_multipart(): - with pytest.raises(InvalidArgument): + with pytest.raises( + InvalidArgument, + match="Multipart IO can not contain nested Multipart IO descriptor", + ): _ = Multipart(arg1=Multipart(arg1=JSON())) @@ -30,3 +46,19 @@ def test_multipart_openapi_request_responses(): responses = multipart.openapi_responses() assert responses.content + + +@pytest.mark.asyncio +async def test_exception_from_to_proto(): + with pytest.raises(InvalidArgument): + await multipart.from_proto(b"", _use_internal_bytes_contents=True) + with pytest.raises(InvalidArgument) as e: + await multipart.from_proto( + {"asdf": pb.Part(text=wrappers_pb2.StringValue(value="asdf"))} + ) + assert "as input fields. Invalid fields are: " in str(e.value) + with pytest.raises(InvalidArgument) as e: + await multipart.to_proto( + {"asdf": pb.Part(text=wrappers_pb2.StringValue(value="asdf"))} + ) + assert "as output fields. Invalid fields are: " in str(e.value) diff --git a/tests/unit/_internal/io/test_numpy.py b/tests/unit/_internal/io/test_numpy.py index 2ea623e59ae..ff642e88fbd 100644 --- a/tests/unit/_internal/io/test_numpy.py +++ b/tests/unit/_internal/io/test_numpy.py @@ -1,3 +1,4 @@ +# pylint: disable=unused-argument from __future__ import annotations import logging @@ -10,11 +11,19 @@ from bentoml.io import NumpyNdarray from bentoml.exceptions import BadInput from bentoml.exceptions import BentoMLException +from bentoml.exceptions import InternalServerError +from bentoml.exceptions import UnprocessableEntity from bentoml._internal.service.openapi.specification import Schema if TYPE_CHECKING: from _pytest.logging import LogCaptureFixture + from bentoml.grpc.v1alpha1 import service_pb2 as pb +else: + from bentoml.grpc.utils import import_generated_stubs + + pb, _ = import_generated_stubs() + class ExampleGeneric(str, np.generic): pass @@ -35,6 +44,15 @@ def test_invalid_dtype(): assert "expects a 'numpy.array'" in str(e.value) +def test_invalid_init(): + with pytest.raises(UnprocessableEntity) as exc_info: + NumpyNdarray(enforce_dtype=True) + assert "'dtype' must be specified" in str(exc_info.value) + with pytest.raises(UnprocessableEntity) as exc_info: + NumpyNdarray(enforce_shape=True) + assert "'shape' must be specified" in str(exc_info.value) + + @pytest.mark.parametrize("dtype, expected", [("float", "number"), (">U8", "integer")]) def test_numpy_to_openapi_types(dtype: str, expected: str): assert NumpyNdarray(dtype=dtype)._openapi_types() == expected # type: ignore (private functions warning) @@ -99,3 +117,126 @@ def test_verify_numpy_ndarray(caplog: LogCaptureFixture): with caplog.at_level(logging.DEBUG): example.validate_array(np.array("asdf")) assert "Failed to reshape" in caplog.text + + +def generate_1d_array(dtype: pb.NDArray.DType.ValueType, length: int = 3): + if dtype == pb.NDArray.DTYPE_BOOL: + return [True] * length + elif dtype == pb.NDArray.DTYPE_STRING: + return ["a"] * length + else: + return [1] * length + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "dtype", + filter(lambda x: x > 0, [v.number for v in pb.NDArray.DType.DESCRIPTOR.values]), +) +async def test_from_proto( + use_internal_bytes_contents: bool, dtype: pb.NDArray.DType.ValueType +) -> None: + from bentoml._internal.io_descriptors.numpy import dtypepb_to_fieldpb_map + from bentoml._internal.io_descriptors.numpy import dtypepb_to_npdtype_map + + if use_internal_bytes_contents: + np.testing.assert_array_equal( + await NumpyNdarray(dtype=example.dtype, shape=example.shape).from_proto( + example.ravel().tobytes(), + _use_internal_bytes_contents=use_internal_bytes_contents, + ), + example, + ) + else: + # DTYPE_UNSPECIFIED + np.testing.assert_array_equal( + await NumpyNdarray().from_proto( + pb.NDArray(dtype=pb.NDArray.DType.DTYPE_UNSPECIFIED), + _use_internal_bytes_contents=use_internal_bytes_contents, + ), + np.empty(0), + ) + np.testing.assert_array_equal( + await NumpyNdarray().from_proto( + pb.NDArray(shape=tuple(example.shape)), + _use_internal_bytes_contents=use_internal_bytes_contents, + ), + np.empty(tuple(example.shape)), + ) + # different DTYPE + np.testing.assert_array_equal( + await NumpyNdarray().from_proto( + pb.NDArray( + dtype=dtype, + **{dtypepb_to_fieldpb_map()[dtype]: generate_1d_array(dtype)}, + ), + _use_internal_bytes_contents=use_internal_bytes_contents, + ), + np.array(generate_1d_array(dtype), dtype=dtypepb_to_npdtype_map()[dtype]), + ) + # given shape from message. + np.testing.assert_array_equal( + await NumpyNdarray().from_proto( + pb.NDArray(shape=[3, 3], float_values=[1.0] * 9), + _use_internal_bytes_contents=use_internal_bytes_contents, + ), + np.array([[1.0] * 3] * 3), + ) + # using pb.Part + np.testing.assert_array_equal( + await NumpyNdarray().from_proto( + pb.Part(ndarray=pb.NDArray(shape=[3, 3], float_values=[1.0] * 9)), + _use_internal_bytes_contents=use_internal_bytes_contents, + ), + np.array([[1.0] * 3] * 3), + ) + + +@pytest.mark.asyncio +async def test_exception_from_proto(use_internal_bytes_contents: bool): + with pytest.raises(AssertionError): + await NumpyNdarray().from_proto( + pb.NDArray(string_values="asdf"), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + await NumpyNdarray().from_proto( + pb.File(content=b"asdf"), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + if use_internal_bytes_contents: + with pytest.raises(UnprocessableEntity): + await NumpyNdarray().from_proto( + b"asdf", _use_internal_bytes_contents=use_internal_bytes_contents + ) + else: + with pytest.raises(BadInput) as exc_info: + await NumpyNdarray().from_proto( + pb.NDArray(dtype=123, string_values="asdf"), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert "123 is invalid." == str(exc_info.value) + with pytest.raises(BadInput) as exc_info: + await NumpyNdarray().from_proto( + pb.NDArray(string_values="asdf", float_values=[1.0, 2.0]), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert "Array contents can only be one of" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_exception_to_proto(): + with pytest.raises(InternalServerError): + await NumpyNdarray(dtype=np.float32, enforce_dtype=True).to_proto( + np.array("asdf") + ) + with pytest.raises(BadInput): + await NumpyNdarray(dtype=np.generic).to_proto(np.array("asdf")) + + +@pytest.mark.asyncio +async def test_to_proto() -> None: + assert await NumpyNdarray().to_proto(example) == pb.NDArray( + shape=example.shape, + dtype=pb.NDArray.DType.DTYPE_DOUBLE, + double_values=example.ravel().tolist(), + ) diff --git a/tests/unit/_internal/io/test_text.py b/tests/unit/_internal/io/test_text.py index d2c3be2bfa4..302bfdd8b19 100644 --- a/tests/unit/_internal/io/test_text.py +++ b/tests/unit/_internal/io/test_text.py @@ -1,10 +1,23 @@ from __future__ import annotations +from typing import TYPE_CHECKING + import pytest from bentoml.io import Text from bentoml.exceptions import BentoMLException +if TYPE_CHECKING: + from google.protobuf import wrappers_pb2 + + from bentoml.grpc.v1alpha1 import service_pb2 as pb +else: + from bentoml.grpc.utils import import_generated_stubs + from bentoml._internal.utils import LazyLoader + + pb, _ = import_generated_stubs() + wrappers_pb2 = LazyLoader("wrappers_pb2", globals(), "google.protobuf.wrappers_pb2") + def test_text_openapi_schema(): assert Text().openapi_schema().type == "string" @@ -28,3 +41,39 @@ def test_text_openapi_request_responses(): assert responses.content assert mime_type in responses.content + + +@pytest.mark.asyncio +async def test_from_proto(use_internal_bytes_contents: bool): + if not use_internal_bytes_contents: + res = await Text().from_proto(wrappers_pb2.StringValue(value="asdf")) + assert res == "asdf" + res = await Text().from_proto( + pb.Part(text=wrappers_pb2.StringValue(value="asdf")), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert res == "asdf" + else: + res = await Text().from_proto( + b"asdf", + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + assert res == "asdf" + + +@pytest.mark.asyncio +async def test_exception_from_proto(use_internal_bytes_contents: bool): + with pytest.raises(AssertionError): + await Text().from_proto( + pb.NDArray(string_values="asdf"), + _use_internal_bytes_contents=use_internal_bytes_contents, + ) + await Text().from_proto( + "", _use_internal_bytes_contents=use_internal_bytes_contents + ) + + +@pytest.mark.asyncio +async def test_to_proto() -> None: + res = await Text().to_proto("asdf") + assert res.value == "asdf" diff --git a/tests/unit/_internal/server/__init__.py b/tests/unit/_internal/server/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/unit/_internal/server/test_config.py b/tests/unit/_internal/server/test_config.py new file mode 100644 index 00000000000..b47e7ac4916 --- /dev/null +++ b/tests/unit/_internal/server/test_config.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import typing as t +from typing import TYPE_CHECKING + +import psutil +import pytest + +from bentoml._internal.server.grpc import Config +from bentoml._internal.server.grpc import Servicer + +if TYPE_CHECKING: + from bentoml import Service + + +@pytest.fixture() +def servicer(noop_service: Service) -> Servicer: + return Servicer(noop_service) + + +@pytest.mark.skipif(not psutil.WINDOWS, reason="Windows test.") +def test_windows_config_options(servicer: Servicer) -> None: + config = Config( + servicer, + bind_address="0.0.0.0", + max_message_length=None, + max_concurrent_streams=None, + maximum_concurrent_rpcs=None, + ) + assert not config.options + + +@pytest.mark.skipif(psutil.WINDOWS, reason="Unix test.") +@pytest.mark.parametrize( + "options,expected", + [ + ( + {"max_concurrent_streams": 128}, + ( + ("grpc.so_reuseport", 1), + ("grpc.max_concurrent_streams", 128), + ("grpc.max_message_length", -1), + ("grpc.max_receive_message_length", -1), + ("grpc.max_send_message_length", -1), + ), + ), + ( + {"max_message_length": 2048}, + ( + ("grpc.so_reuseport", 1), + ("grpc.max_message_length", 2048), + ("grpc.max_receive_message_length", 2048), + ("grpc.max_send_message_length", 2048), + ), + ), + ], +) +def test_unix_options( + servicer: Servicer, + options: dict[str, t.Any], + expected: tuple[tuple[str, t.Any], ...], +) -> None: + config = Config(servicer, bind_address="0.0.0.0", **options) + assert config.options + assert config.options == expected diff --git a/tests/unit/_internal/test_configuration.py b/tests/unit/_internal/test_configuration.py index 18d303c1fa1..9b52a2ca64a 100644 --- a/tests/unit/_internal/test_configuration.py +++ b/tests/unit/_internal/test_configuration.py @@ -1,19 +1,89 @@ -from tempfile import NamedTemporaryFile +from __future__ import annotations + +import typing as t +import logging +from typing import TYPE_CHECKING + +import pytest from bentoml._internal.configuration.containers import BentoMLConfiguration +if TYPE_CHECKING: + from pathlib import Path + + from _pytest.logging import LogCaptureFixture + from simple_di.providers import ConfigDictType + + +@pytest.fixture(scope="function", name="config_cls") +def fixture_config_cls(tmp_path: Path) -> t.Callable[[str], ConfigDictType]: + def inner(config: str) -> ConfigDictType: + path = tmp_path / "configuration.yaml" + path.write_text(config) + return BentoMLConfiguration(override_config_file=path.__fspath__()).as_dict() + + return inner + + +@pytest.mark.usefixtures("config_cls") +def test_backward_configuration(config_cls: t.Callable[[str], ConfigDictType]): + OLD_CONFIG = """\ +api_server: + max_request_size: 8624612341 +""" + bentoml_cfg = config_cls(OLD_CONFIG) + assert "max_request_size" not in bentoml_cfg["api_server"] + assert "cors" not in bentoml_cfg["api_server"] + assert bentoml_cfg["api_server"]["http"]["max_request_size"] == 8624612341 + -def get_bentomlconfiguration_from_str(config_str: str): - tmpfile = NamedTemporaryFile(mode="w+", delete=False) - tmpfile.write(config_str) - tmpfile.flush() - tmpfile.close() +@pytest.mark.usefixtures("config_cls") +def test_backward_warning( + config_cls: t.Callable[[str], ConfigDictType], caplog: LogCaptureFixture +): - bentoml_cfg = BentoMLConfiguration(override_config_file=tmpfile.name).as_dict() - return bentoml_cfg + OLD_HOST = """\ +api_server: + host: 0.0.0.0 +""" + with caplog.at_level(logging.WARNING): + config_cls(OLD_HOST) + assert "field 'api_server.host' is deprecated" in caplog.text + caplog.clear() + + OLD_PORT = """\ +api_server: + port: 4096 +""" + with caplog.at_level(logging.WARNING): + config_cls(OLD_PORT) + assert "field 'api_server.port' is deprecated" in caplog.text + caplog.clear() + + OLD_MAX_REQUEST_SIZE = """\ +api_server: + max_request_size: 8624612341 +""" + with caplog.at_level(logging.WARNING): + config_cls(OLD_MAX_REQUEST_SIZE) + assert "field 'api_server.max_request_size' is deprecated" in caplog.text + caplog.clear() + + OLD_CORS = """\ +api_server: + cors: + enabled: false +""" + with caplog.at_level(logging.WARNING): + config_cls(OLD_CORS) + assert "field 'api_server.cors' is deprecated" in caplog.text + caplog.clear() -def test_bentoml_configuration_runner_override(): +@pytest.mark.usefixtures("config_cls") +def test_bentoml_configuration_runner_override( + config_cls: t.Callable[[str], ConfigDictType] +): OVERRIDE_RUNNERS = """\ runners: batching: @@ -40,7 +110,7 @@ def test_bentoml_configuration_runner_override(): enabled: True """ - bentoml_cfg = get_bentomlconfiguration_from_str(OVERRIDE_RUNNERS) + bentoml_cfg = config_cls(OVERRIDE_RUNNERS) runner_cfg = bentoml_cfg["runners"] # test_runner_1 @@ -73,13 +143,14 @@ def test_bentoml_configuration_runner_override(): assert test_runner_batching["resources"]["cpu"] == 4 # should use global -def test_runner_gpu_configuration(): +@pytest.mark.usefixtures("config_cls") +def test_runner_gpu_configuration(config_cls: t.Callable[[str], ConfigDictType]): GPU_INDEX = """\ runners: resources: nvidia.com/gpu: [1, 2, 4] """ - bentoml_cfg = get_bentomlconfiguration_from_str(GPU_INDEX) + bentoml_cfg = config_cls(GPU_INDEX) assert bentoml_cfg["runners"]["resources"] == {"nvidia.com/gpu": [1, 2, 4]} GPU_INDEX_WITH_STRING = """\ @@ -87,12 +158,14 @@ def test_runner_gpu_configuration(): resources: nvidia.com/gpu: "[1, 2, 4]" """ - bentoml_cfg = get_bentomlconfiguration_from_str(GPU_INDEX_WITH_STRING) + bentoml_cfg = config_cls(GPU_INDEX_WITH_STRING) # this behaviour can be confusing assert bentoml_cfg["runners"]["resources"] == {"nvidia.com/gpu": "[1, 2, 4]"} -RUNNER_TIMEOUTS = """\ +@pytest.mark.usefixtures("config_cls") +def test_runner_timeouts(config_cls: t.Callable[[str], ConfigDictType]): + RUNNER_TIMEOUTS = """\ runners: timeout: 50 test_runner_1: @@ -100,10 +173,7 @@ def test_runner_gpu_configuration(): test_runner_2: resources: system """ - - -def test_runner_timeouts(): - bentoml_cfg = get_bentomlconfiguration_from_str(RUNNER_TIMEOUTS) + bentoml_cfg = config_cls(RUNNER_TIMEOUTS) runner_cfg = bentoml_cfg["runners"] assert runner_cfg["timeout"] == 50 assert runner_cfg["test_runner_1"]["timeout"] == 100