Skip to content

Commit

Permalink
feat(api): expose functions under bentoml.grpc
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Pham <29749331+aarnphm@users.noreply.github.com>
  • Loading branch information
aarnphm committed Sep 16, 2022
1 parent 35b535c commit 0e1b911
Show file tree
Hide file tree
Showing 10 changed files with 463 additions and 121 deletions.
65 changes: 0 additions & 65 deletions bentoml/_internal/server/grpc/interceptors/__init__.py

This file was deleted.

4 changes: 2 additions & 2 deletions bentoml/grpc/buf.yaml
Expand Up @@ -12,9 +12,9 @@ lint:
- RPC_RESPONSE_STANDARD_NAME
ignore_only:
DEFAULT:
- bentoml/grpc/v1/service_test.proto
- bentoml/grpc/v1alpha1/service_test.proto
ENUM_VALUE_PREFIX:
- bentoml/grpc/v1/service.proto
- bentoml/grpc/v1alpha1/service.proto
enum_zero_value_suffix: _UNSPECIFIED
rpc_allow_same_request_response: true
rpc_allow_google_protobuf_empty_requests: true
Expand Down
Empty file.
Expand Up @@ -6,15 +6,14 @@
from timeit import default_timer
from typing import TYPE_CHECKING

import grpc
from grpc import aio

from bentoml.grpc.utils import to_http_status
from bentoml.grpc.utils import wrap_rpc_handler
from bentoml.grpc.utils import GRPC_CONTENT_TYPE

if TYPE_CHECKING:
from grpc.aio._typing import MetadataType
import grpc
from grpc import aio
from grpc.aio._typing import MetadataType # pylint: disable=unused-import

from bentoml.grpc.types import Request
from bentoml.grpc.types import Response
Expand All @@ -24,14 +23,16 @@
from bentoml.grpc.types import BentoServicerContext
from bentoml.grpc.v1alpha1 import service_pb2 as pb
else:
from bentoml.grpc.utils import import_grpc
from bentoml.grpc.utils import import_generated_stubs

pb, _ = import_generated_stubs()
grpc, aio = import_grpc()


class AccessLogServerInterceptor(aio.ServerInterceptor):
"""
An asyncio interceptor for access log.
An asyncio interceptor for access logging.
"""

async def intercept_service(
Expand All @@ -51,25 +52,20 @@ def wrapper(behaviour: AsyncHandlerMethod[Response]):
async def new_behaviour(
request: Request, context: BentoServicerContext
) -> Response | t.Awaitable[Response]:

content_type = GRPC_CONTENT_TYPE

trailing_metadata: MetadataType | None = context.trailing_metadata()
if trailing_metadata:
trailing = dict(trailing_metadata)
content_type = trailing.get("content-type", GRPC_CONTENT_TYPE)

response = pb.Response()

start = default_timer()
try:
response = await behaviour(request, context)
except Exception as e: # pylint: disable=broad-except
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
finally:
if TYPE_CHECKING:
assert response
latency = max(default_timer() - start, 0) * 1000

req = [
Expand All @@ -79,6 +75,9 @@ async def new_behaviour(
f"size={request.ByteSize()}",
]

# Note that in order AccessLogServerInterceptor to work, the
# interceptor must be added to the server after AsyncOpenTeleServerInterceptor
# and PrometheusServerInterceptor.
typed_context_code = t.cast(grpc.StatusCode, context.code())
resp = [
f"http_status={to_http_status(typed_context_code)}",
Expand Down
Expand Up @@ -6,8 +6,6 @@
from typing import TYPE_CHECKING
from contextlib import asynccontextmanager

import grpc
from grpc import aio
from simple_di import inject
from simple_di import Provide
from opentelemetry import trace
Expand All @@ -21,11 +19,12 @@
from bentoml.grpc.utils import wrap_rpc_handler
from bentoml.grpc.utils import GRPC_CONTENT_TYPE
from bentoml.grpc.utils import parse_method_name

from ....utils.pkg import get_pkg_version
from ....configuration.containers import BentoMLContainer
from bentoml._internal.utils.pkg import get_pkg_version
from bentoml._internal.configuration.containers import BentoMLContainer

if TYPE_CHECKING:
import grpc
from grpc import aio
from grpc.aio._typing import MetadataKey
from grpc.aio._typing import MetadataType
from grpc.aio._typing import MetadataValue
Expand All @@ -38,6 +37,10 @@
from bentoml.grpc.types import AsyncHandlerMethod
from bentoml.grpc.types import HandlerCallDetails
from bentoml.grpc.types import BentoServicerContext
else:
from bentoml.grpc.utils import import_grpc

grpc, aio = import_grpc()

logger = logging.getLogger(__name__)

Expand All @@ -48,6 +51,10 @@ def __init__(self, servicer_context: BentoServicerContext, active_span: Span):
self._active_span = active_span
self._code = grpc.StatusCode.OK
self._details = ""
super().__init__()

def __getattr__(self, attr: str) -> t.Any:
return getattr(self._servicer_context, attr)

async def read(self) -> Request:
return await self._servicer_context.read()
Expand Down Expand Up @@ -156,15 +163,16 @@ async def set_remote_context(
self, servicer_context: BentoServicerContext
) -> t.AsyncGenerator[None, None]:
metadata = servicer_context.invocation_metadata()
if not metadata:
yield
md: dict[MetadataKey, MetadataValue] = {m.key: m.value for m in metadata}
ctx = extract(md)
token = attach(ctx)
try:
if metadata:
md: dict[MetadataKey, MetadataValue] = {m.key: m.value for m in metadata}
ctx = extract(md)
token = attach(ctx)
try:
yield
finally:
detach(token)
else:
yield
finally:
detach(token)

def start_span(
self,
Expand Down
Expand Up @@ -7,33 +7,34 @@
from timeit import default_timer
from typing import TYPE_CHECKING

import grpc
from grpc import aio
from simple_di import inject
from simple_di import Provide

from bentoml.grpc.utils import to_http_status
from bentoml.grpc.utils import wrap_rpc_handler

from ....configuration.containers import BentoMLContainer
from bentoml._internal.context import component_context
from bentoml._internal.configuration.containers import BentoMLContainer

START_TIME_VAR: contextvars.ContextVar[float] = contextvars.ContextVar("START_TIME_VAR")

if TYPE_CHECKING:
import grpc
from grpc import aio

from bentoml.grpc.types import Request
from bentoml.grpc.types import Response
from bentoml.grpc.types import RpcMethodHandler
from bentoml.grpc.types import AsyncHandlerMethod
from bentoml.grpc.types import HandlerCallDetails
from bentoml.grpc.types import BentoServicerContext
from bentoml.grpc.v1alpha1 import service_pb2 as pb

from ....service import Service
from ...metrics.prometheus import PrometheusClient
from bentoml._internal.server.metrics.prometheus import PrometheusClient
else:
from bentoml.grpc.utils import import_grpc
from bentoml.grpc.utils import import_generated_stubs

pb, _ = import_generated_stubs()
grpc, aio = import_grpc()


logger = logging.getLogger(__name__)
Expand All @@ -44,34 +45,46 @@ class PrometheusServerInterceptor(aio.ServerInterceptor):
An async interceptor for Prometheus metrics.
"""

def __init__(self, bento_service: Service):
def __init__(self, *, namespace: str = "bentoml_api_server"):
self._is_setup = False
self.bento_service = bento_service
self.namespace = namespace

@inject
def _setup(
self,
metrics_client: PrometheusClient = Provide[BentoMLContainer.metrics_client],
duration_buckets: tuple[float, ...] = Provide[
BentoMLContainer.duration_buckets
],
): # pylint: disable=attribute-defined-outside-init

# a valid tag name may includes invalid characters, so we need to escape them
# ref: https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
service_name = self.bento_service.name.replace("-", ":").replace(".", "::")

self.metrics_request_duration = metrics_client.Histogram(
name=f"{service_name}_request_duration_seconds",
documentation=f"{service_name} API GRPC request duration in seconds",
labelnames=["api_name", "service_version", "http_response_code"],
namespace=self.namespace,
name="request_duration_seconds",
documentation="API GRPC request duration in seconds",
labelnames=[
"api_name",
"service_name",
"service_version",
"http_response_code",
],
buckets=duration_buckets,
)
self.metrics_request_total = metrics_client.Counter(
name=f"{service_name}_request_total",
namespace=self.namespace,
name="request_total",
documentation="Total number of GRPC requests",
labelnames=["api_name", "service_version", "http_response_code"],
labelnames=[
"api_name",
"service_name",
"service_version",
"http_response_code",
],
)
self.metrics_request_in_progress = metrics_client.Gauge(
name=f"{service_name}_request_in_progress",
namespace=self.namespace,
name="request_in_progress",
documentation="Total number of GRPC requests in progress now",
labelnames=["api_name", "service_version"],
labelnames=["api_name", "service_name", "service_version"],
multiprocess_mode="livesum",
)
self._is_setup = True
Expand All @@ -89,10 +102,6 @@ async def intercept_service(
if handler and (handler.response_streaming or handler.request_streaming):
return handler

service_version = (
self.bento_service.tag.version if self.bento_service.tag else ""
)

START_TIME_VAR.set(default_timer())

def wrapper(behaviour: AsyncHandlerMethod[Response]):
Expand All @@ -108,7 +117,8 @@ async def new_behaviour(
# instrument request total count
self.metrics_request_total.labels(
api_name=api_name,
service_version=service_version,
service_name=component_context.bento_name,
service_version=component_context.bento_version,
http_response_code=to_http_status(
t.cast(grpc.StatusCode, context.code())
),
Expand All @@ -119,19 +129,20 @@ async def new_behaviour(
total_time = max(default_timer() - START_TIME_VAR.get(), 0)
self.metrics_request_duration.labels( # type: ignore (unfinished prometheus types)
api_name=api_name,
service_version=service_version,
service_name=component_context.bento_name,
service_version=component_context.bento_version,
http_response_code=to_http_status(
t.cast(grpc.StatusCode, context.code())
),
).observe(
total_time
)

START_TIME_VAR.set(0)

# instrument request in progress
with self.metrics_request_in_progress.labels(
api_name=api_name, service_version=service_version
api_name=api_name,
service_version=component_context.bento_version,
service_name=component_context.bento_name,
).track_inprogress():
response = await behaviour(request, context)
return response
Expand Down

0 comments on commit 0e1b911

Please sign in to comment.