Skip to content

Commit

Permalink
chore: removing deadcode
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Pham <29749331+aarnphm@users.noreply.github.com>
  • Loading branch information
aarnphm committed Dec 6, 2022
1 parent d2eca28 commit 606ca6b
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 211 deletions.
3 changes: 1 addition & 2 deletions src/bentoml/_internal/server/grpc/__init__.py
@@ -1,4 +1,3 @@
from .server import Server
from .servicer import Servicer

__all__ = ["Server", "Servicer"]
__all__ = ["Server"]
22 changes: 14 additions & 8 deletions src/bentoml/_internal/server/grpc/server.py
Expand Up @@ -4,6 +4,7 @@
import sys
import typing as t
import asyncio
import inspect
import logging
from typing import TYPE_CHECKING
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -29,7 +30,8 @@

from bentoml.grpc.v1 import service_pb2_grpc as services

from .servicer import Servicer
from ..grpc_app import GrpcServicerFactory

else:
grpc, aio = import_grpc()
_, services = import_generated_stubs()
Expand Down Expand Up @@ -61,7 +63,7 @@ class Server(aio._server.Server):
@inject
def __init__(
self,
servicer: Servicer,
servicer: GrpcServicerFactory,
bind_address: str,
max_message_length: int
| None = Provide[BentoMLContainer.grpc.max_message_length],
Expand All @@ -88,10 +90,6 @@ def __init__(
self.ssl_keyfile = ssl_keyfile
self.ssl_ca_certs = ssl_ca_certs

if not bool(self.servicer):
self.servicer.load()
assert self.servicer.loaded

super().__init__(
# Note that the max_workers are used inside ThreadPoolExecutor.
# This ThreadPoolExecutor are used by aio.Server() to execute non-AsyncIO RPC handlers.
Expand Down Expand Up @@ -189,7 +187,11 @@ async def startup(self) -> None:
from bentoml.exceptions import MissingDependencyException

# Running on_startup callback.
await self.servicer.startup()
for handler in self.servicer.on_startup:
out = handler()
if inspect.isawaitable(out):
await out

# register bento servicer
services.add_BentoServiceServicer_to_server(self.servicer.bento_servicer, self)
services_health.add_HealthServicer_to_server(
Expand Down Expand Up @@ -236,7 +238,11 @@ async def startup(self) -> None:

async def shutdown(self):
# Running on_startup callback.
await self.servicer.shutdown()
for handler in self.servicer.on_shutdown:
out = handler()
if inspect.isawaitable(out):
await out

await self.stop(grace=self.graceful_shutdown_timeout)
await self.servicer.health_servicer.enter_graceful_shutdown()
self.loop.stop()
116 changes: 0 additions & 116 deletions src/bentoml/_internal/server/grpc/servicer/__init__.py
@@ -1,116 +0,0 @@
from __future__ import annotations

import typing as t
import logging
import importlib
from typing import TYPE_CHECKING
from inspect import isawaitable

from ....utils import LazyLoader
from .....grpc.utils import import_generated_stubs
from .....grpc.utils import LATEST_PROTOCOL_VERSION

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
from types import ModuleType

from grpc import aio
from grpc_health.v1 import health
from typing_extensions import Self

from bentoml.grpc.v1 import service_pb2_grpc as services
from bentoml.grpc.types import Interceptors
from bentoml.grpc.types import AddServicerFn
from bentoml.grpc.types import ServicerClass

from ....service.service import Service

class ServicerModule(ModuleType):
@staticmethod
def create_bento_servicer(service: Service) -> services.BentoServiceServicer:
...

else:
health = LazyLoader(
"health",
globals(),
"grpc_health.v1.health",
exc_msg="'grpcio-health-checking' is required for using health checking endpoints. Install with 'pip install grpcio-health-checking'.",
)
struct_pb2 = LazyLoader("struct_pb2", globals(), "google.protobuf.struct_pb2")


class Servicer:
"""Create an instance of gRPC Servicer."""

_cached_module = None

def __init__(
self: Self,
service: Service,
on_startup: t.Sequence[t.Callable[[], t.Any]] | None = None,
on_shutdown: t.Sequence[t.Callable[[], t.Any]] | None = None,
mount_servicers: t.Sequence[tuple[ServicerClass, AddServicerFn, list[str]]]
| None = None,
interceptors: Interceptors | None = None,
protocol_version: str = LATEST_PROTOCOL_VERSION,
) -> None:
self.bento_service = service

self.on_startup = [] if not on_startup else list(on_startup)
self.on_shutdown = [] if not on_shutdown else list(on_shutdown)
self.mount_servicers = [] if not mount_servicers else list(mount_servicers)
self.interceptors = [] if not interceptors else list(interceptors)
self.protocol_version = protocol_version

self.loaded = False

@property
def _servicer_module(self) -> ServicerModule:
if self._cached_module is None:
object.__setattr__(
self,
"_cached_module",
importlib.import_module(f".{self.protocol_version}", package=__name__),
)
assert self._cached_module is not None
return self._cached_module

def load(self):
assert not self.loaded

pb, _ = import_generated_stubs(self.protocol_version)

self.interceptors_stack = self.build_interceptors_stack()

self.bento_servicer = self._servicer_module.create_bento_servicer(
self.bento_service
)

# Create a health check servicer. We use the non-blocking implementation
# to avoid thread starvation.
self.health_servicer = health.aio.HealthServicer()

self.service_names = tuple(
service.full_name for service in pb.DESCRIPTOR.services_by_name.values()
) + (health.SERVICE_NAME,)
self.loaded = True

def build_interceptors_stack(self) -> list[aio.ServerInterceptor]:
return list(map(lambda x: x(), self.interceptors))

async def startup(self):
for handler in self.on_startup:
out = handler()
if isawaitable(out):
await out

async def shutdown(self):
for handler in self.on_shutdown:
out = handler()
if isawaitable(out):
await out

def __bool__(self):
return self.loaded
64 changes: 0 additions & 64 deletions src/bentoml/_internal/server/grpc/servicer/v1/__init__.py
@@ -1,14 +1,12 @@
from __future__ import annotations

import sys
import typing as t
import asyncio
import logging
from typing import TYPE_CHECKING

import anyio

from .....utils import LazyLoader
from ......exceptions import InvalidArgument
from ......exceptions import BentoMLException
from ......grpc.utils import import_grpc
Expand All @@ -22,19 +20,15 @@
from logging import _ExcInfoType as ExcInfoType # type: ignore (private warning)

import grpc
from google.protobuf import struct_pb2

from bentoml.grpc.types import BentoServicerContext
from bentoml.grpc.v1.service_pb2 import ServiceMetadataRequest
from bentoml.grpc.v1.service_pb2 import ServiceMetadataResponse

from ......grpc.v1 import service_pb2 as pb
from ......grpc.v1 import service_pb2_grpc as services
from .....service.service import Service
else:
grpc, _ = import_grpc()
pb, services = import_generated_stubs(version="v1")
struct_pb2 = LazyLoader("struct_pb2", globals(), "google.protobuf.struct_pb2")


def log_exception(request: pb.Request, exc_info: ExcInfoType) -> None:
Expand Down Expand Up @@ -104,62 +98,4 @@ async def Call( # type: ignore (no async types) # pylint: disable=invalid-overr
)
return response

async def ServiceMetadata( # type: ignore
self: services.BentoServiceServicer,
request: ServiceMetadataRequest, # pylint: disable=unused-argument
context: BentoServicerContext, # pylint: disable=unused-argument
) -> ServiceMetadataResponse:
return pb.ServiceMetadataResponse(
name=service.name,
docs=service.doc,
apis=[
pb.ServiceMetadataResponse.InferenceAPI(
name=api.name,
docs=api.doc,
input=make_descriptor_spec(
api.input.to_spec(), pb.ServiceMetadataResponse
),
output=make_descriptor_spec(
api.output.to_spec(), pb.ServiceMetadataResponse
),
)
for api in service.apis.values()
],
)

return BentoServiceImpl()


if TYPE_CHECKING:
NestedDictStrAny = dict[str, dict[str, t.Any] | t.Any]
TupleAny = tuple[t.Any, ...]


def _tuple_converter(d: NestedDictStrAny | None) -> NestedDictStrAny | None:
# handles case for struct_pb2.Value where nested items are tuple.
# if that is the case, then convert to list.
# This dict is only one level deep, as we don't allow nested Multipart.
if d is not None:
for key, value in d.items():
if isinstance(value, tuple):
d[key] = list(t.cast("TupleAny", value))
elif isinstance(value, dict):
d[key] = _tuple_converter(t.cast("NestedDictStrAny", value))
return d


def make_descriptor_spec(
spec: dict[str, t.Any], pb: type[ServiceMetadataResponse]
) -> ServiceMetadataResponse.DescriptorMetadata:
from .....io_descriptors.json import parse_dict_to_proto

descriptor_id = spec.pop("id")
return pb.DescriptorMetadata(
descriptor_id=descriptor_id,
attributes=struct_pb2.Struct(
fields={
key: parse_dict_to_proto(_tuple_converter(value), struct_pb2.Value())
for key, value in spec.items()
}
),
)

0 comments on commit 606ca6b

Please sign in to comment.