From 606ca6b986d9e129a8815963c4c918b4c4eb08cc Mon Sep 17 00:00:00 2001 From: Aaron Pham <29749331+aarnphm@users.noreply.github.com> Date: Mon, 5 Dec 2022 21:07:43 -0800 Subject: [PATCH] chore: removing deadcode Signed-off-by: Aaron Pham <29749331+aarnphm@users.noreply.github.com> --- src/bentoml/_internal/server/grpc/__init__.py | 3 +- src/bentoml/_internal/server/grpc/server.py | 22 ++-- .../server/grpc/servicer/__init__.py | 116 ------------------ .../server/grpc/servicer/v1/__init__.py | 64 ---------- src/bentoml/_internal/server/grpc_app.py | 76 +++++++++--- src/bentoml/_internal/service/service.py | 8 +- 6 files changed, 78 insertions(+), 211 deletions(-) diff --git a/src/bentoml/_internal/server/grpc/__init__.py b/src/bentoml/_internal/server/grpc/__init__.py index df277aa1d9a..09ed39ca17f 100644 --- a/src/bentoml/_internal/server/grpc/__init__.py +++ b/src/bentoml/_internal/server/grpc/__init__.py @@ -1,4 +1,3 @@ from .server import Server -from .servicer import Servicer -__all__ = ["Server", "Servicer"] +__all__ = ["Server"] diff --git a/src/bentoml/_internal/server/grpc/server.py b/src/bentoml/_internal/server/grpc/server.py index b96d87506ef..f2ca52c8596 100644 --- a/src/bentoml/_internal/server/grpc/server.py +++ b/src/bentoml/_internal/server/grpc/server.py @@ -4,6 +4,7 @@ import sys import typing as t import asyncio +import inspect import logging from typing import TYPE_CHECKING from concurrent.futures import ThreadPoolExecutor @@ -29,7 +30,8 @@ from bentoml.grpc.v1 import service_pb2_grpc as services - from .servicer import Servicer + from ..grpc_app import GrpcServicerFactory + else: grpc, aio = import_grpc() _, services = import_generated_stubs() @@ -61,7 +63,7 @@ class Server(aio._server.Server): @inject def __init__( self, - servicer: Servicer, + servicer: GrpcServicerFactory, bind_address: str, max_message_length: int | None = Provide[BentoMLContainer.grpc.max_message_length], @@ -88,10 +90,6 @@ def __init__( self.ssl_keyfile = ssl_keyfile self.ssl_ca_certs = ssl_ca_certs - if not bool(self.servicer): - self.servicer.load() - assert self.servicer.loaded - super().__init__( # Note that the max_workers are used inside ThreadPoolExecutor. # This ThreadPoolExecutor are used by aio.Server() to execute non-AsyncIO RPC handlers. @@ -189,7 +187,11 @@ async def startup(self) -> None: from bentoml.exceptions import MissingDependencyException # Running on_startup callback. - await self.servicer.startup() + for handler in self.servicer.on_startup: + out = handler() + if inspect.isawaitable(out): + await out + # register bento servicer services.add_BentoServiceServicer_to_server(self.servicer.bento_servicer, self) services_health.add_HealthServicer_to_server( @@ -236,7 +238,11 @@ async def startup(self) -> None: async def shutdown(self): # Running on_startup callback. - await self.servicer.shutdown() + for handler in self.servicer.on_shutdown: + out = handler() + if inspect.isawaitable(out): + await out + await self.stop(grace=self.graceful_shutdown_timeout) await self.servicer.health_servicer.enter_graceful_shutdown() self.loop.stop() diff --git a/src/bentoml/_internal/server/grpc/servicer/__init__.py b/src/bentoml/_internal/server/grpc/servicer/__init__.py index 058d1ca7062..e69de29bb2d 100644 --- a/src/bentoml/_internal/server/grpc/servicer/__init__.py +++ b/src/bentoml/_internal/server/grpc/servicer/__init__.py @@ -1,116 +0,0 @@ -from __future__ import annotations - -import typing as t -import logging -import importlib -from typing import TYPE_CHECKING -from inspect import isawaitable - -from ....utils import LazyLoader -from .....grpc.utils import import_generated_stubs -from .....grpc.utils import LATEST_PROTOCOL_VERSION - -logger = logging.getLogger(__name__) - -if TYPE_CHECKING: - from types import ModuleType - - from grpc import aio - from grpc_health.v1 import health - from typing_extensions import Self - - from bentoml.grpc.v1 import service_pb2_grpc as services - from bentoml.grpc.types import Interceptors - from bentoml.grpc.types import AddServicerFn - from bentoml.grpc.types import ServicerClass - - from ....service.service import Service - - class ServicerModule(ModuleType): - @staticmethod - def create_bento_servicer(service: Service) -> services.BentoServiceServicer: - ... - -else: - health = LazyLoader( - "health", - globals(), - "grpc_health.v1.health", - exc_msg="'grpcio-health-checking' is required for using health checking endpoints. Install with 'pip install grpcio-health-checking'.", - ) - struct_pb2 = LazyLoader("struct_pb2", globals(), "google.protobuf.struct_pb2") - - -class Servicer: - """Create an instance of gRPC Servicer.""" - - _cached_module = None - - def __init__( - self: Self, - service: Service, - on_startup: t.Sequence[t.Callable[[], t.Any]] | None = None, - on_shutdown: t.Sequence[t.Callable[[], t.Any]] | None = None, - mount_servicers: t.Sequence[tuple[ServicerClass, AddServicerFn, list[str]]] - | None = None, - interceptors: Interceptors | None = None, - protocol_version: str = LATEST_PROTOCOL_VERSION, - ) -> None: - self.bento_service = service - - self.on_startup = [] if not on_startup else list(on_startup) - self.on_shutdown = [] if not on_shutdown else list(on_shutdown) - self.mount_servicers = [] if not mount_servicers else list(mount_servicers) - self.interceptors = [] if not interceptors else list(interceptors) - self.protocol_version = protocol_version - - self.loaded = False - - @property - def _servicer_module(self) -> ServicerModule: - if self._cached_module is None: - object.__setattr__( - self, - "_cached_module", - importlib.import_module(f".{self.protocol_version}", package=__name__), - ) - assert self._cached_module is not None - return self._cached_module - - def load(self): - assert not self.loaded - - pb, _ = import_generated_stubs(self.protocol_version) - - self.interceptors_stack = self.build_interceptors_stack() - - self.bento_servicer = self._servicer_module.create_bento_servicer( - self.bento_service - ) - - # Create a health check servicer. We use the non-blocking implementation - # to avoid thread starvation. - self.health_servicer = health.aio.HealthServicer() - - self.service_names = tuple( - service.full_name for service in pb.DESCRIPTOR.services_by_name.values() - ) + (health.SERVICE_NAME,) - self.loaded = True - - def build_interceptors_stack(self) -> list[aio.ServerInterceptor]: - return list(map(lambda x: x(), self.interceptors)) - - async def startup(self): - for handler in self.on_startup: - out = handler() - if isawaitable(out): - await out - - async def shutdown(self): - for handler in self.on_shutdown: - out = handler() - if isawaitable(out): - await out - - def __bool__(self): - return self.loaded diff --git a/src/bentoml/_internal/server/grpc/servicer/v1/__init__.py b/src/bentoml/_internal/server/grpc/servicer/v1/__init__.py index f453d09a62a..d88e7613e7d 100644 --- a/src/bentoml/_internal/server/grpc/servicer/v1/__init__.py +++ b/src/bentoml/_internal/server/grpc/servicer/v1/__init__.py @@ -1,14 +1,12 @@ from __future__ import annotations import sys -import typing as t import asyncio import logging from typing import TYPE_CHECKING import anyio -from .....utils import LazyLoader from ......exceptions import InvalidArgument from ......exceptions import BentoMLException from ......grpc.utils import import_grpc @@ -22,11 +20,8 @@ from logging import _ExcInfoType as ExcInfoType # type: ignore (private warning) import grpc - from google.protobuf import struct_pb2 from bentoml.grpc.types import BentoServicerContext - from bentoml.grpc.v1.service_pb2 import ServiceMetadataRequest - from bentoml.grpc.v1.service_pb2 import ServiceMetadataResponse from ......grpc.v1 import service_pb2 as pb from ......grpc.v1 import service_pb2_grpc as services @@ -34,7 +29,6 @@ else: grpc, _ = import_grpc() pb, services = import_generated_stubs(version="v1") - struct_pb2 = LazyLoader("struct_pb2", globals(), "google.protobuf.struct_pb2") def log_exception(request: pb.Request, exc_info: ExcInfoType) -> None: @@ -104,62 +98,4 @@ async def Call( # type: ignore (no async types) # pylint: disable=invalid-overr ) return response - async def ServiceMetadata( # type: ignore - self: services.BentoServiceServicer, - request: ServiceMetadataRequest, # pylint: disable=unused-argument - context: BentoServicerContext, # pylint: disable=unused-argument - ) -> ServiceMetadataResponse: - return pb.ServiceMetadataResponse( - name=service.name, - docs=service.doc, - apis=[ - pb.ServiceMetadataResponse.InferenceAPI( - name=api.name, - docs=api.doc, - input=make_descriptor_spec( - api.input.to_spec(), pb.ServiceMetadataResponse - ), - output=make_descriptor_spec( - api.output.to_spec(), pb.ServiceMetadataResponse - ), - ) - for api in service.apis.values() - ], - ) - return BentoServiceImpl() - - -if TYPE_CHECKING: - NestedDictStrAny = dict[str, dict[str, t.Any] | t.Any] - TupleAny = tuple[t.Any, ...] - - -def _tuple_converter(d: NestedDictStrAny | None) -> NestedDictStrAny | None: - # handles case for struct_pb2.Value where nested items are tuple. - # if that is the case, then convert to list. - # This dict is only one level deep, as we don't allow nested Multipart. - if d is not None: - for key, value in d.items(): - if isinstance(value, tuple): - d[key] = list(t.cast("TupleAny", value)) - elif isinstance(value, dict): - d[key] = _tuple_converter(t.cast("NestedDictStrAny", value)) - return d - - -def make_descriptor_spec( - spec: dict[str, t.Any], pb: type[ServiceMetadataResponse] -) -> ServiceMetadataResponse.DescriptorMetadata: - from .....io_descriptors.json import parse_dict_to_proto - - descriptor_id = spec.pop("id") - return pb.DescriptorMetadata( - descriptor_id=descriptor_id, - attributes=struct_pb2.Struct( - fields={ - key: parse_dict_to_proto(_tuple_converter(value), struct_pb2.Value()) - for key, value in spec.items() - } - ), - ) diff --git a/src/bentoml/_internal/server/grpc_app.py b/src/bentoml/_internal/server/grpc_app.py index cc6df9e624e..71c537c93e5 100644 --- a/src/bentoml/_internal/server/grpc_app.py +++ b/src/bentoml/_internal/server/grpc_app.py @@ -3,34 +3,55 @@ import typing as t import asyncio import logging +import importlib from typing import TYPE_CHECKING from functools import partial from simple_di import inject from simple_di import Provide +from ..utils import LazyLoader +from ...grpc.utils import import_generated_stubs +from ...grpc.utils import LATEST_PROTOCOL_VERSION from ..configuration.containers import BentoMLContainer logger = logging.getLogger(__name__) if TYPE_CHECKING: + from types import ModuleType + + from grpc_health.v1 import health from bentoml.grpc.types import Interceptors from ..service import Service - from .grpc.servicer import Servicer + from ...grpc.v1 import service_pb2_grpc as services + + class ServicerModule(ModuleType): + @staticmethod + def create_bento_servicer(service: Service) -> services.BentoServiceServicer: + ... OnStartup = list[t.Callable[[], t.Union[None, t.Coroutine[t.Any, t.Any, None]]]] +else: + health = LazyLoader( + "health", + globals(), + "grpc_health.v1.health", + exc_msg="'grpcio-health-checking' is required for using health checking endpoints. Install with 'pip install grpcio-health-checking'.", + ) -class GRPCAppFactory: +class GrpcServicerFactory: """ - GRPCApp creates an async gRPC API server based on APIs defined with a BentoService via BentoService#apis. - This is a light wrapper around GRPCServer with addition to `on_startup` and `on_shutdown` hooks. + GrpcServicerFactory creates an async gRPC API server based on APIs defined with a BentoService via BentoService.apis. + This is a light wrapper around GrpcServer with addition to `on_startup` and `on_shutdown` hooks. Note that even though the code are similar with BaseAppFactory, gRPC protocol is different from ASGI. """ + _cached_module = None + @inject def __init__( self, @@ -39,9 +60,27 @@ def __init__( enable_metrics: bool = Provide[ BentoMLContainer.api_server_config.metrics.enabled ], + protocol_version: str = LATEST_PROTOCOL_VERSION, ) -> None: + pb, _ = import_generated_stubs(protocol_version) + self.bento_service = bento_service self.enable_metrics = enable_metrics + self.protocol_version = protocol_version + self.interceptors_stack = list(map(lambda x: x(), self.interceptors)) + + self.bento_servicer = self._servicer_module.create_bento_servicer( + self.bento_service + ) + self.mount_servicers = self.bento_service.mount_servicers + + # Create a health check servicer. We use the non-blocking implementation + # to avoid thread starvation. + self.health_servicer = health.aio.HealthServicer() + + self.service_names = tuple( + service.full_name for service in pb.DESCRIPTOR.services_by_name.values() + ) + (health.SERVICE_NAME,) @inject async def wait_for_runner_ready( @@ -93,34 +132,37 @@ def on_shutdown(self) -> list[t.Callable[[], None]]: return on_shutdown - def __call__(self) -> Servicer: - from .grpc import Servicer - - return Servicer( - self.bento_service, - on_startup=self.on_startup, - on_shutdown=self.on_shutdown, - mount_servicers=self.bento_service.mount_servicers, - interceptors=self.interceptors, - ) + @property + def _servicer_module(self) -> ServicerModule: + if self._cached_module is None: + object.__setattr__( + self, + "_cached_module", + importlib.import_module( + f".grpc.servicer.{self.protocol_version}", + package="bentoml._internal.server", + ), + ) + assert self._cached_module is not None + return self._cached_module @property def interceptors(self) -> Interceptors: # Note that order of interceptors is important here. - from bentoml.grpc.interceptors.opentelemetry import ( + from ...grpc.interceptors.opentelemetry import ( AsyncOpenTelemetryServerInterceptor, ) interceptors: Interceptors = [AsyncOpenTelemetryServerInterceptor] if self.enable_metrics: - from bentoml.grpc.interceptors.prometheus import PrometheusServerInterceptor + from ...grpc.interceptors.prometheus import PrometheusServerInterceptor interceptors.append(PrometheusServerInterceptor) if BentoMLContainer.api_server_config.logging.access.enabled.get(): - from bentoml.grpc.interceptors.access import AccessLogServerInterceptor + from ...grpc.interceptors.access import AccessLogServerInterceptor access_logger = logging.getLogger("bentoml.access") if access_logger.getEffectiveLevel() <= logging.INFO: diff --git a/src/bentoml/_internal/service/service.py b/src/bentoml/_internal/service/service.py index c905f818011..5b820c3c859 100644 --- a/src/bentoml/_internal/service/service.py +++ b/src/bentoml/_internal/service/service.py @@ -25,7 +25,7 @@ from .. import external_typing as ext from ..bento import Bento - from ..server.grpc.servicer import Servicer + from ..server.grpc_app import GrpcServicerFactory from .openapi.specification import OpenAPISpecification else: grpc, _ = import_grpc() @@ -221,10 +221,10 @@ def on_grpc_server_shutdown(self) -> None: pass @property - def grpc_servicer(self) -> Servicer: - from ..server.grpc_app import GRPCAppFactory + def grpc_servicer(self) -> GrpcServicerFactory: + from ..server.grpc_app import GrpcServicerFactory - return GRPCAppFactory(self)() + return GrpcServicerFactory(self) @property def asgi_app(self) -> "ext.ASGIApp":