-
Notifications
You must be signed in to change notification settings - Fork 756
/
prometheus.py
144 lines (118 loc) · 5.22 KB
/
prometheus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from __future__ import annotations
import typing as t
import logging
import functools
import contextvars
from timeit import default_timer
from typing import TYPE_CHECKING
from simple_di import inject
from simple_di import Provide
from bentoml.grpc.utils import to_http_status
from bentoml.grpc.utils import wrap_rpc_handler
from bentoml._internal.configuration.containers import BentoMLContainer
START_TIME_VAR: contextvars.ContextVar[float] = contextvars.ContextVar("START_TIME_VAR")
if TYPE_CHECKING:
import grpc
from grpc import aio
from bentoml.grpc.types import Request
from bentoml.grpc.types import Response
from bentoml.grpc.types import RpcMethodHandler
from bentoml.grpc.types import AsyncHandlerMethod
from bentoml.grpc.types import HandlerCallDetails
from bentoml.grpc.types import BentoServicerContext
from bentoml.grpc.v1alpha1 import service_pb2 as pb
from bentoml._internal.service import Service
from bentoml._internal.server.metrics.prometheus import PrometheusClient
else:
from bentoml.grpc.utils import import_grpc
from bentoml.grpc.utils import import_generated_stubs
pb, _ = import_generated_stubs()
grpc, aio = import_grpc()
logger = logging.getLogger(__name__)
class PrometheusServerInterceptor(aio.ServerInterceptor):
"""
An async interceptor for Prometheus metrics.
"""
def __init__(self, bento_service: Service):
self._is_setup = False
self.bento_service = bento_service
@inject
def _setup(
self,
metrics_client: PrometheusClient = Provide[BentoMLContainer.metrics_client],
): # pylint: disable=attribute-defined-outside-init
self.metrics_request_duration = metrics_client.Histogram(
name="request_duration_seconds",
documentation="API GRPC request duration in seconds",
labelnames=["api_name", "service_version", "http_response_code", "service"],
)
self.metrics_request_total = metrics_client.Counter(
name="request_total",
documentation="Total number of GRPC requests",
labelnames=["api_name", "service_version", "http_response_code", "service"],
)
self.metrics_request_in_progress = metrics_client.Gauge(
name="request_in_progress",
documentation="Total number of GRPC requests in progress now",
labelnames=["api_name", "service_version", "service"],
multiprocess_mode="livesum",
)
self._is_setup = True
async def intercept_service(
self,
continuation: t.Callable[[HandlerCallDetails], t.Awaitable[RpcMethodHandler]],
handler_call_details: HandlerCallDetails,
) -> RpcMethodHandler:
if not self._is_setup:
self._setup()
handler = await continuation(handler_call_details)
if handler and (handler.response_streaming or handler.request_streaming):
return handler
service_version = (
self.bento_service.tag.version if self.bento_service.tag else ""
)
# a valid tag name may includes invalid characters, so we need to escape them
# ref: https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
service_name = self.bento_service.name.replace("-", ":").replace(".", "::")
START_TIME_VAR.set(default_timer())
def wrapper(behaviour: AsyncHandlerMethod[Response]):
@functools.wraps(behaviour)
async def new_behaviour(
request: Request, context: BentoServicerContext
) -> Response | t.Awaitable[Response]:
if not isinstance(request, pb.Request):
return await behaviour(request, context)
api_name = request.api_name
# instrument request total count
self.metrics_request_total.labels(
api_name=api_name,
service_version=service_version,
http_response_code=to_http_status(
t.cast(grpc.StatusCode, context.code())
),
service=service_name,
).inc()
# instrument request duration
assert START_TIME_VAR.get() != 0
total_time = max(default_timer() - START_TIME_VAR.get(), 0)
self.metrics_request_duration.labels( # type: ignore (unfinished prometheus types)
api_name=api_name,
service_version=service_version,
http_response_code=to_http_status(
t.cast(grpc.StatusCode, context.code())
),
service=service_name,
).observe(
total_time
)
START_TIME_VAR.set(0)
# instrument request in progress
with self.metrics_request_in_progress.labels(
api_name=api_name,
service_version=service_version,
service=service_name,
).track_inprogress():
response = await behaviour(request, context)
return response
return new_behaviour
return wrap_rpc_handler(wrapper, handler)