Skip to content

Commit

Permalink
feat: adding OpenTelemetry interceptor (#2844)
Browse files Browse the repository at this point in the history
  • Loading branch information
aarnphm committed Aug 3, 2022
1 parent d9d2fac commit c0b5009
Show file tree
Hide file tree
Showing 13 changed files with 592 additions and 166 deletions.
16 changes: 8 additions & 8 deletions bentoml/_internal/bento/build_dev_bentoml_whl.py
Expand Up @@ -26,6 +26,14 @@ def build_bentoml_editable_wheel(target_path: str) -> None:
# skip this entirely if BentoML is installed from PyPI
return

try:
from build import ProjectBuilder
from build.env import IsolatedEnvBuilder
except ModuleNotFoundError:
raise BentoMLException(
f"`{BENTOML_DEV_BUILD}=True`, which requires the `pypa/build` package. Install development dependencies with `pip install -r requirements/dev-requirements.txt` and try again."
)

# Find bentoml module path
module_location = source_locations("bentoml")
if not module_location:
Expand All @@ -40,14 +48,6 @@ def build_bentoml_editable_wheel(target_path: str) -> None:
logger.info(
"BentoML is installed in `editable` mode; building BentoML distribution with the local BentoML code base. The built wheel file will be included in the target bento."
)
try:
from build import ProjectBuilder
from build.env import IsolatedEnvBuilder
except ModuleNotFoundError:
raise BentoMLException(
f"Environment variable {BENTOML_DEV_BUILD}=True detected, which requires the `pypa/build` package. Make sure to install all dev dependencies via `pip install -r requirements/dev-requirements.txt` and try again."
)

with IsolatedEnvBuilder() as env:
builder = ProjectBuilder(os.path.dirname(pyproject))
builder.python_executable = env.executable
Expand Down
44 changes: 25 additions & 19 deletions bentoml/_internal/io_descriptors/numpy.py
Expand Up @@ -218,26 +218,28 @@ def openapi_responses_schema(self) -> t.Dict[str, t.Any]:
def _verify_ndarray(
self,
obj: "ext.NpNDArray",
dtype: np.dtype[t.Any] | None,
shape: tuple[int, ...] | None,
exception_cls: t.Type[Exception] = BadInput,
) -> "ext.NpNDArray":
if self._dtype is not None and self._dtype != obj.dtype:
if dtype is not None and dtype != obj.dtype:
# ‘same_kind’ means only safe casts or casts within a kind, like float64
# to float32, are allowed.
if np.can_cast(obj.dtype, self._dtype, casting="same_kind"):
obj = obj.astype(self._dtype, casting="same_kind") # type: ignore
if np.can_cast(obj.dtype, dtype, casting="same_kind"):
obj = obj.astype(dtype, casting="same_kind") # type: ignore
else:
msg = f'{self.__class__.__name__}: Expecting ndarray of dtype "{self._dtype}", but "{obj.dtype}" was received.'
msg = f'{self.__class__.__name__}: Expecting ndarray of dtype "{dtype}", but "{obj.dtype}" was received.'
if self._enforce_dtype:
raise exception_cls(msg)
else:
logger.debug(msg)

if self._shape is not None and not _is_matched_shape(self._shape, obj.shape):
msg = f'{self.__class__.__name__}: Expecting ndarray of shape "{self._shape}", but "{obj.shape}" was received.'
if shape is not None and not _is_matched_shape(shape, obj.shape):
msg = f'{self.__class__.__name__}: Expecting ndarray of shape "{shape}", but "{obj.shape}" was received.'
if self._enforce_shape:
raise exception_cls(msg)
try:
obj = obj.reshape(self._shape)
obj = obj.reshape(shape)
except ValueError as e:
logger.debug(f"{msg} Failed to reshape: {e}.")

Expand All @@ -260,7 +262,7 @@ async def from_http_request(self, request: Request) -> ext.NpNDArray:
res = np.array(obj, dtype=self._dtype)
except ValueError:
res = np.array(obj)
return self._verify_ndarray(res, BadInput)
return self._verify_ndarray(res, dtype=self._dtype, shape=self._shape)

async def to_http_response(self, obj: ext.NpNDArray, ctx: Context | None = None):
"""
Expand All @@ -273,7 +275,9 @@ async def to_http_response(self, obj: ext.NpNDArray, ctx: Context | None = None)
HTTP Response of type `starlette.responses.Response`. This can
be accessed via cURL or any external web traffic.
"""
obj = self._verify_ndarray(obj, InternalServerError)
obj = self._verify_ndarray(
obj, dtype=self._dtype, shape=self._shape, exception_cls=InternalServerError
)
if ctx is not None:
res = Response(
json.dumps(obj.tolist()),
Expand Down Expand Up @@ -313,6 +317,7 @@ async def from_grpc_request(
)
if not self._shape:
raise UnprocessableEntity("'shape' is required when 'packed' is set.")

metadata = serialized["metadata"]
if not self._dtype:
if "dtype" not in metadata:
Expand Down Expand Up @@ -340,13 +345,11 @@ async def from_grpc_request(
logger.warning(
f"'shape={self._shape},enforce_shape={self._enforce_shape}' is set with {self.__class__.__name__}, while 'shape' field is present in request message. To avoid this warning, set 'enforce_shape=True'. Using 'shape={shape}' from request message."
)
self._shape = shape
else:
logger.debug(
f"'enforce_shape={self._enforce_shape}', ignoring 'shape' field in request message."
)
else:
self._shape = shape
shape = self._shape

array = serialized["array"]
else:
Expand All @@ -360,20 +363,18 @@ async def from_grpc_request(
logger.warning(
f"'dtype={self._dtype},enforce_dtype={self._enforce_dtype}' is set with {self.__class__.__name__}, while 'dtype' field is present in request message. To avoid this warning, set 'enforce_dtype=True'. Using 'dtype={dtype}' from request message."
)
self._dtype = dtype
else:
logger.debug(
f"'enforce_dtype={self._enforce_dtype}', ignoring 'dtype' field in request message."
)
else:
self._dtype = dtype
dtype = self._dtype

try:
res = np.array(content, dtype=self._dtype)
res = np.array(content, dtype=dtype)
except ValueError:
res = np.array(content)

return self._verify_ndarray(res, BadInput)
return self._verify_ndarray(res, dtype=dtype, shape=shape)

async def to_grpc_response(
self, obj: ext.NpNDArray, context: BentoServicerContext
Expand All @@ -395,7 +396,12 @@ async def to_grpc_response(
value_key = _NP_TO_VALUE_MAP[obj.dtype]

try:
obj = self._verify_ndarray(obj, InternalServerError)
obj = self._verify_ndarray(
obj,
dtype=self._dtype,
shape=self._shape,
exception_cls=InternalServerError,
)
except InternalServerError as e:
context.set_code(grpc_status_code(e))
context.set_details(e.message)
Expand All @@ -411,7 +417,7 @@ async def to_grpc_response(
)
value.raw_value.CopyFrom(raw)
else:
if self._bytesorder:
if self._bytesorder and self._bytesorder != "C":
logger.warning(
f"'bytesorder={self._bytesorder}' is ignored when 'packed={self._packed}'."
)
Expand Down
5 changes: 2 additions & 3 deletions bentoml/_internal/server/grpc/__init__.py
@@ -1,5 +1,4 @@
from .server import GRPCServer
from .servicer import register_bento_servicer
from .servicer import register_health_servicer
from .servicer import create_bento_servicer

__all__ = ["GRPCServer", "register_health_servicer", "register_bento_servicer"]
__all__ = ["GRPCServer", "create_bento_servicer"]
130 changes: 97 additions & 33 deletions bentoml/_internal/server/grpc/interceptors/__init__.py
Expand Up @@ -6,78 +6,142 @@
from timeit import default_timer
from typing import TYPE_CHECKING

import grpc
from grpc import aio
from opentelemetry import trace

from ....utils import LazyLoader
from ....utils.grpc import ProtoCodec
from ....utils.grpc import to_http_status
from ....utils.grpc import wrap_rpc_handler
from ....utils.grpc import get_grpc_content_type
from ....utils.grpc.codec import GRPC_CONTENT_TYPE

if TYPE_CHECKING:
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace.span import Span
from grpc.aio._typing import MetadataType

from bentoml.grpc.v1 import service_pb2

from ..types import Request
from ..types import Response
from ..types import HandlerMethod
from ..types import RpcMethodHandler
from ..types import AsyncHandlerMethod
from ..types import HandlerCallDetails
from ..types import BentoServicerContext
from ....utils.grpc.codec import Codec
else:
service_pb2 = LazyLoader("service_pb2", globals(), "bentoml.grpc.v1.service_pb2")

logger = logging.getLogger(__name__)


class AccessLogInterceptor(aio.ServerInterceptor):
class GenericHeadersServerInterceptor(aio.ServerInterceptor):
"""
An asyncio interceptors for access log.
.. TODO:
- Add support for streaming RPCs.
A light header interceptor that provides some initial metadata to the client.
TODO: https://chromium.googlesource.com/external/github.com/grpc/grpc/+/HEAD/doc/PROTOCOL-HTTP2.md
"""

def __init__(self, tracer_provider: TracerProvider) -> None:
self.logger = logging.getLogger("bentoml.access")
self.tracer_provider = tracer_provider
def __init__(self, *, codec: Codec | None = None):
if not codec:
# By default, we use ProtoCodec.
codec = ProtoCodec()
self._codec = codec

def set_trailing_metadata(self, context: BentoServicerContext):
# We want to send some initial metadata to the client.
# gRPC doesn't use `:status` pseudo header to indicate success or failure
# of the current request. gRPC instead uses trailers for this purpose, and
# trailers are sent during `send_trailing_metadata` call
# For now we are sending over the content-type header.
headers = [("content-type", get_grpc_content_type(codec=self._codec))]
context.set_trailing_metadata(headers)

async def intercept_service(
self,
continuation: t.Callable[[HandlerCallDetails], t.Awaitable[RpcMethodHandler]],
handler_call_details: HandlerCallDetails,
) -> RpcMethodHandler:
handler = await continuation(handler_call_details)

if handler and (handler.response_streaming or handler.request_streaming):
return handler

def wrapper(behaviour: AsyncHandlerMethod[Response]):
@functools.wraps(behaviour)
async def new_behaviour(
request: Request, context: BentoServicerContext
) -> Response | t.Awaitable[Response]:
# setup metadata
self.set_trailing_metadata(context)

# for the rpc itself.
resp = behaviour(request, context)
if not hasattr(resp, "__aiter__"):
resp = await resp
return resp

return new_behaviour

return wrap_rpc_handler(wrapper, handler)


class AccessLogServerInterceptor(aio.ServerInterceptor):
"""
An asyncio interceptors for access log.
"""

async def intercept_service(
self,
continuation: t.Callable[[HandlerCallDetails], t.Awaitable[RpcMethodHandler]],
handler_call_details: HandlerCallDetails,
) -> RpcMethodHandler:
logger = logging.getLogger("bentoml.access")
handler = await continuation(handler_call_details)
method_name = handler_call_details.method

if handler and (handler.response_streaming or handler.request_streaming):
return handler

def wrapper(
behaviour: HandlerMethod[Response | t.AsyncGenerator[Response, None]]
) -> t.Callable[..., t.Any]:
def wrapper(behaviour: AsyncHandlerMethod[Response]):
@functools.wraps(behaviour)
async def new_behaviour(
request: Request, context: BentoServicerContext
) -> Response:
) -> Response | t.Awaitable[Response]:

content_type = GRPC_CONTENT_TYPE

tracer = self.tracer_provider.get_tracer(
"opentelemetry.instrumentation.grpc"
)
span: Span = tracer.start_span("grpc")
span_context = span.get_span_context()
kind = str(request.input.WhichOneof("kind"))
trailing_metadata: MetadataType | None = context.trailing_metadata()
if trailing_metadata:
trailing = dict(trailing_metadata)
content_type = trailing.get("content-type", GRPC_CONTENT_TYPE)

start = default_timer()
with trace.use_span(span, end_on_exit=True):
try:
response = behaviour(request, context)
if not hasattr(response, "__aiter__"):
response = await response
latency = max(default_timer() - start, 0)

req_info = f"api_name={request.api_name},type={kind},size={request.input.ByteSize()}"
resp_info = f"status={context.code()},type={kind},size={response.output.ByteSize()}"
trace_and_span = f"trace={span_context.trace_id},span={span_context.span_id},sampled={1 if span_context.trace_flags.sampled else 0}"

self.logger.info(
f"{context.peer()} ({req_info}) ({resp_info}) {latency:.3f}ms ({trace_and_span})"
)

except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
response = service_pb2.Response()
finally:
latency = max(default_timer() - start, 0)
req = [
"scheme=http", # TODO: support https when ssl is added
f"path={method_name}",
f"type={content_type}",
f"size={request.ByteSize()}",
]
resp = [
f"http_status={to_http_status(context.code())}",
f"grpc_status={context.code().value[0]}",
f"type={content_type}",
f"size={response.ByteSize()}",
]

# TODO: fix ports
logger.info(
f"{context.peer()} ({','.join(req)}) ({','.join(resp)}) {latency:.3f}ms"
)
return response

return new_behaviour
Expand Down

0 comments on commit c0b5009

Please sign in to comment.