Skip to content

Commit

Permalink
feat(api): expose functions under bentoml.grpc
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Pham <29749331+aarnphm@users.noreply.github.com>
  • Loading branch information
aarnphm committed Sep 13, 2022
1 parent 49494b8 commit f01a04c
Show file tree
Hide file tree
Showing 9 changed files with 367 additions and 102 deletions.
65 changes: 0 additions & 65 deletions bentoml/_internal/server/grpc/interceptors/__init__.py

This file was deleted.

4 changes: 2 additions & 2 deletions bentoml/grpc/buf.yaml
Expand Up @@ -12,9 +12,9 @@ lint:
- RPC_RESPONSE_STANDARD_NAME
ignore_only:
DEFAULT:
- bentoml/grpc/v1/service_test.proto
- bentoml/grpc/v1alpha1/service_test.proto
ENUM_VALUE_PREFIX:
- bentoml/grpc/v1/service.proto
- bentoml/grpc/v1alpha1/service.proto
enum_zero_value_suffix: _UNSPECIFIED
rpc_allow_same_request_response: true
rpc_allow_google_protobuf_empty_requests: true
Expand Down
Empty file.
Expand Up @@ -31,7 +31,7 @@

class AccessLogServerInterceptor(aio.ServerInterceptor):
"""
An asyncio interceptor for access log.
An asyncio interceptor for access logging.
"""

async def intercept_service(
Expand All @@ -51,25 +51,20 @@ def wrapper(behaviour: AsyncHandlerMethod[Response]):
async def new_behaviour(
request: Request, context: BentoServicerContext
) -> Response | t.Awaitable[Response]:

content_type = GRPC_CONTENT_TYPE

trailing_metadata: MetadataType | None = context.trailing_metadata()
if trailing_metadata:
trailing = dict(trailing_metadata)
content_type = trailing.get("content-type", GRPC_CONTENT_TYPE)

response = pb.Response()

start = default_timer()
try:
response = await behaviour(request, context)
except Exception as e: # pylint: disable=broad-except
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
finally:
if TYPE_CHECKING:
assert response
latency = max(default_timer() - start, 0) * 1000

req = [
Expand Down
Expand Up @@ -21,9 +21,8 @@
from bentoml.grpc.utils import wrap_rpc_handler
from bentoml.grpc.utils import GRPC_CONTENT_TYPE
from bentoml.grpc.utils import parse_method_name

from ....utils.pkg import get_pkg_version
from ....configuration.containers import BentoMLContainer
from bentoml._internal.utils.pkg import get_pkg_version
from bentoml._internal.configuration.containers import BentoMLContainer

if TYPE_CHECKING:
from grpc.aio._typing import MetadataKey
Expand All @@ -48,6 +47,10 @@ def __init__(self, servicer_context: BentoServicerContext, active_span: Span):
self._active_span = active_span
self._code = grpc.StatusCode.OK
self._details = ""
super().__init__()

def __getattr__(self, attr: str) -> t.Any:
return getattr(self._servicer_context, attr)

async def read(self) -> Request:
return await self._servicer_context.read()
Expand Down Expand Up @@ -156,15 +159,16 @@ async def set_remote_context(
self, servicer_context: BentoServicerContext
) -> t.AsyncGenerator[None, None]:
metadata = servicer_context.invocation_metadata()
if not metadata:
yield
md: dict[MetadataKey, MetadataValue] = {m.key: m.value for m in metadata}
ctx = extract(md)
token = attach(ctx)
try:
if metadata:
md: dict[MetadataKey, MetadataValue] = {m.key: m.value for m in metadata}
ctx = extract(md)
token = attach(ctx)
try:
yield
finally:
detach(token)
else:
yield
finally:
detach(token)

def start_span(
self,
Expand Down
Expand Up @@ -14,8 +14,7 @@

from bentoml.grpc.utils import to_http_status
from bentoml.grpc.utils import wrap_rpc_handler

from ....configuration.containers import BentoMLContainer
from bentoml._internal.configuration.containers import BentoMLContainer

START_TIME_VAR: contextvars.ContextVar[float] = contextvars.ContextVar("START_TIME_VAR")

Expand All @@ -27,9 +26,8 @@
from bentoml.grpc.types import HandlerCallDetails
from bentoml.grpc.types import BentoServicerContext
from bentoml.grpc.v1alpha1 import service_pb2 as pb

from ....service import Service
from ...metrics.prometheus import PrometheusClient
from bentoml._internal.service import Service
from bentoml._internal.server.metrics.prometheus import PrometheusClient
else:
from bentoml.grpc.utils import import_generated_stubs

Expand All @@ -53,25 +51,20 @@ def _setup(
self,
metrics_client: PrometheusClient = Provide[BentoMLContainer.metrics_client],
): # pylint: disable=attribute-defined-outside-init

# a valid tag name may includes invalid characters, so we need to escape them
# ref: https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
service_name = self.bento_service.name.replace("-", ":").replace(".", "::")

self.metrics_request_duration = metrics_client.Histogram(
name=f"{service_name}_request_duration_seconds",
documentation=f"{service_name} API GRPC request duration in seconds",
labelnames=["api_name", "service_version", "http_response_code"],
name="request_duration_seconds",
documentation="API GRPC request duration in seconds",
labelnames=["api_name", "service_version", "http_response_code", "service"],
)
self.metrics_request_total = metrics_client.Counter(
name=f"{service_name}_request_total",
name="request_total",
documentation="Total number of GRPC requests",
labelnames=["api_name", "service_version", "http_response_code"],
labelnames=["api_name", "service_version", "http_response_code", "service"],
)
self.metrics_request_in_progress = metrics_client.Gauge(
name=f"{service_name}_request_in_progress",
name="request_in_progress",
documentation="Total number of GRPC requests in progress now",
labelnames=["api_name", "service_version"],
labelnames=["api_name", "service_version", "service"],
multiprocess_mode="livesum",
)
self._is_setup = True
Expand All @@ -92,6 +85,9 @@ async def intercept_service(
service_version = (
self.bento_service.tag.version if self.bento_service.tag else ""
)
# a valid tag name may includes invalid characters, so we need to escape them
# ref: https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
service_name = self.bento_service.name.replace("-", ":").replace(".", "::")

START_TIME_VAR.set(default_timer())

Expand All @@ -112,6 +108,7 @@ async def new_behaviour(
http_response_code=to_http_status(
t.cast(grpc.StatusCode, context.code())
),
service=service_name,
).inc()

# instrument request duration
Expand All @@ -123,6 +120,7 @@ async def new_behaviour(
http_response_code=to_http_status(
t.cast(grpc.StatusCode, context.code())
),
service=service_name,
).observe(
total_time
)
Expand All @@ -131,7 +129,9 @@ async def new_behaviour(

# instrument request in progress
with self.metrics_request_in_progress.labels(
api_name=api_name, service_version=service_version
api_name=api_name,
service_version=service_version,
service=service_name,
).track_inprogress():
response = await behaviour(request, context)
return response
Expand Down
101 changes: 101 additions & 0 deletions bentoml/grpc/types.py
@@ -0,0 +1,101 @@
"""
Specific types for BentoService gRPC server.
"""
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Any
from typing import Type
from typing import Literal
from typing import TypeVar
from typing import Callable
from typing import Optional
from typing import Annotated
from typing import Awaitable
from typing import NamedTuple
from functools import partial

import grpc
from grpc import aio

from bentoml.grpc.v1alpha1.service_pb2 import Request
from bentoml.grpc.v1alpha1.service_pb2 import Response
from bentoml.grpc.v1alpha1.service_pb2_grpc import BentoServiceServicer

P = TypeVar("P")

BentoServicerContext = aio.ServicerContext[Request, Response]

RequestDeserializerFn = Callable[[Request | None], object] | None
ResponseSerializerFn = Callable[[bytes], Response | None] | None

HandlerMethod = Callable[[Request, BentoServicerContext], P]
AsyncHandlerMethod = Callable[[Request, BentoServicerContext], Awaitable[P]]

class RpcMethodHandler(
NamedTuple(
"RpcMethodHandler",
request_streaming=bool,
response_streaming=bool,
request_deserializer=RequestDeserializerFn,
response_serializer=ResponseSerializerFn,
unary_unary=Optional[HandlerMethod[Response]],
unary_stream=Optional[HandlerMethod[Response]],
stream_unary=Optional[HandlerMethod[Response]],
stream_stream=Optional[HandlerMethod[Response]],
),
grpc.RpcMethodHandler,
):
"""An implementation of a single RPC method."""

request_streaming: bool
response_streaming: bool
request_deserializer: RequestDeserializerFn
response_serializer: ResponseSerializerFn
unary_unary: Optional[HandlerMethod[Response]]
unary_stream: Optional[HandlerMethod[Response]]
stream_unary: Optional[HandlerMethod[Response]]
stream_stream: Optional[HandlerMethod[Response]]

class HandlerCallDetails(
NamedTuple("HandlerCallDetails", method=str, invocation_metadata=aio.Metadata),
grpc.HandlerCallDetails,
):
"""Describes an RPC that has just arrived for service.
Attributes:
method: The method name of the RPC.
invocation_metadata: A sequence of metadatum, a key-value pair included in the HTTP header.
An example is: ``('binary-metadata-bin', b'\\x00\\xFF')``
"""

method: str
invocation_metadata: aio.Metadata

ServicerImpl = TypeVar("ServicerImpl")
Servicer = Annotated[ServicerImpl, object]
ServicerClass = Type[Servicer[Any]]
AddServicerFn = Callable[[Servicer[Any], aio.Server | grpc.Server], None]

ProtoField = Literal[
"dataframe", "file", "json", "ndarray", "series", "text", "_internal_bytes_contents"
]

Interceptors = list[
Callable[[], aio.ServerInterceptor] | partial[aio.ServerInterceptor]
]

# types defined for client interceptors
BentoUnaryUnaryCall = aio.UnaryUnaryCall[Request, Response]

__all__ = [
"Request",
"Response",
"BentoServicerContext",
"BentoServiceServicer",
"HandlerCallDetails",
"RpcMethodHandler",
"BentoUnaryUnaryCall",
]

0 comments on commit f01a04c

Please sign in to comment.