Skip to content

Commit

Permalink
feat: grpc servicer implementation per version
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Pham <29749331+aarnphm@users.noreply.github.com>
  • Loading branch information
aarnphm committed Dec 6, 2022
1 parent c91f83f commit d2eca28
Show file tree
Hide file tree
Showing 12 changed files with 408 additions and 131 deletions.
7 changes: 4 additions & 3 deletions src/bentoml/_internal/bento/build_dev_bentoml_whl.py
Expand Up @@ -7,6 +7,7 @@
from ..utils.pkg import source_locations
from ...exceptions import BentoMLException
from ...exceptions import MissingDependencyException
from ...grpc.utils import LATEST_PROTOCOL_VERSION
from ..configuration import is_pypi_installed_bentoml

logger = logging.getLogger(__name__)
Expand All @@ -15,7 +16,7 @@


def build_bentoml_editable_wheel(
target_path: str, *, _internal_stubs_version: str = "v1"
target_path: str, *, _internal_protocol_version: str = LATEST_PROTOCOL_VERSION
) -> None:
"""
This is for BentoML developers to create Bentos that contains the local bentoml
Expand Down Expand Up @@ -52,10 +53,10 @@ def build_bentoml_editable_wheel(
bentoml_path = Path(module_location)

if not Path(
module_location, "grpc", _internal_stubs_version, "service_pb2.py"
module_location, "grpc", _internal_protocol_version, "service_pb2.py"
).exists():
raise ModuleNotFoundError(
f"Generated stubs for version {_internal_stubs_version} are missing. Make sure to run '{bentoml_path.as_posix()}/scripts/generate_grpc_stubs.sh {_internal_stubs_version}' beforehand to generate gRPC stubs."
f"Generated stubs for version {_internal_protocol_version} are missing. Make sure to run '{bentoml_path.as_posix()}/scripts/generate_grpc_stubs.sh {_internal_protocol_version}' beforehand to generate gRPC stubs."
) from None

# location to pyproject.toml
Expand Down
2 changes: 1 addition & 1 deletion src/bentoml/_internal/io_descriptors/base.py
Expand Up @@ -36,7 +36,7 @@
IOType = t.TypeVar("IOType")


def from_spec(spec: dict[str, str]) -> IODescriptor[t.Any]:
def from_spec(spec: dict[str, t.Any]) -> IODescriptor[t.Any]:
if "id" not in spec:
raise InvalidArgument(f"IO descriptor spec ({spec}) missing ID.")
return IO_DESCRIPTOR_REGISTRY[spec["id"]].from_spec(spec)
Expand Down
41 changes: 26 additions & 15 deletions src/bentoml/_internal/io_descriptors/json.py
Expand Up @@ -30,6 +30,7 @@

import pydantic
import pydantic.schema as schema
from google.protobuf import message as _message
from google.protobuf import struct_pb2
from typing_extensions import Self

Expand Down Expand Up @@ -392,19 +393,29 @@ async def to_proto(self, obj: JSONType) -> struct_pb2.Value:
if LazyType["pydantic.BaseModel"]("pydantic.BaseModel").isinstance(obj):
obj = obj.dict()
msg = struct_pb2.Value()
# To handle None cases.
if obj is not None:
from google.protobuf.json_format import ParseDict

if isinstance(obj, (dict, str, list, float, int, bool)):
# ParseDict handles google.protobuf.Struct type
# directly if given object has a supported type
ParseDict(obj, msg)
else:
# If given object doesn't have a supported type, we will
# use given JSON encoder to convert it to dictionary
# and then parse it to google.protobuf.Struct.
# Note that if a custom JSON encoder is used, it mustn't
# take any arguments.
ParseDict(self._json_encoder().default(obj), msg)
return parse_dict_to_proto(obj, msg, json_encoder=self._json_encoder)


def parse_dict_to_proto(
obj: JSONType,
msg: _message.Message,
json_encoder: type[json.JSONEncoder] = DefaultJsonEncoder,
) -> t.Any:
if obj is None:
# this function is an identity op for the msg if obj is None.
return msg

from google.protobuf.json_format import ParseDict

if isinstance(obj, (dict, str, list, float, int, bool)):
# ParseDict handles google.protobuf.Struct type
# directly if given object has a supported type
ParseDict(obj, msg)
else:
# If given object doesn't have a supported type, we will
# use given JSON encoder to convert it to dictionary
# and then parse it to google.protobuf.Struct.
# Note that if a custom JSON encoder is used, it mustn't
# take any arguments.
ParseDict(json_encoder().default(obj), msg)
return msg
116 changes: 116 additions & 0 deletions src/bentoml/_internal/server/grpc/servicer/__init__.py
@@ -0,0 +1,116 @@
from __future__ import annotations

import typing as t
import logging
import importlib
from typing import TYPE_CHECKING
from inspect import isawaitable

from ....utils import LazyLoader
from .....grpc.utils import import_generated_stubs
from .....grpc.utils import LATEST_PROTOCOL_VERSION

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
from types import ModuleType

from grpc import aio
from grpc_health.v1 import health
from typing_extensions import Self

from bentoml.grpc.v1 import service_pb2_grpc as services
from bentoml.grpc.types import Interceptors
from bentoml.grpc.types import AddServicerFn
from bentoml.grpc.types import ServicerClass

from ....service.service import Service

class ServicerModule(ModuleType):
@staticmethod
def create_bento_servicer(service: Service) -> services.BentoServiceServicer:
...

else:
health = LazyLoader(
"health",
globals(),
"grpc_health.v1.health",
exc_msg="'grpcio-health-checking' is required for using health checking endpoints. Install with 'pip install grpcio-health-checking'.",
)
struct_pb2 = LazyLoader("struct_pb2", globals(), "google.protobuf.struct_pb2")


class Servicer:
"""Create an instance of gRPC Servicer."""

_cached_module = None

def __init__(
self: Self,
service: Service,
on_startup: t.Sequence[t.Callable[[], t.Any]] | None = None,
on_shutdown: t.Sequence[t.Callable[[], t.Any]] | None = None,
mount_servicers: t.Sequence[tuple[ServicerClass, AddServicerFn, list[str]]]
| None = None,
interceptors: Interceptors | None = None,
protocol_version: str = LATEST_PROTOCOL_VERSION,
) -> None:
self.bento_service = service

self.on_startup = [] if not on_startup else list(on_startup)
self.on_shutdown = [] if not on_shutdown else list(on_shutdown)
self.mount_servicers = [] if not mount_servicers else list(mount_servicers)
self.interceptors = [] if not interceptors else list(interceptors)
self.protocol_version = protocol_version

self.loaded = False

@property
def _servicer_module(self) -> ServicerModule:
if self._cached_module is None:
object.__setattr__(
self,
"_cached_module",
importlib.import_module(f".{self.protocol_version}", package=__name__),
)
assert self._cached_module is not None
return self._cached_module

def load(self):
assert not self.loaded

pb, _ = import_generated_stubs(self.protocol_version)

self.interceptors_stack = self.build_interceptors_stack()

self.bento_servicer = self._servicer_module.create_bento_servicer(
self.bento_service
)

# Create a health check servicer. We use the non-blocking implementation
# to avoid thread starvation.
self.health_servicer = health.aio.HealthServicer()

self.service_names = tuple(
service.full_name for service in pb.DESCRIPTOR.services_by_name.values()
) + (health.SERVICE_NAME,)
self.loaded = True

def build_interceptors_stack(self) -> list[aio.ServerInterceptor]:
return list(map(lambda x: x(), self.interceptors))

async def startup(self):
for handler in self.on_startup:
out = handler()
if isawaitable(out):
await out

async def shutdown(self):
for handler in self.on_shutdown:
out = handler()
if isawaitable(out):
await out

def __bool__(self):
return self.loaded

0 comments on commit d2eca28

Please sign in to comment.