From 01bc401a78a7468326a02ad04d7d55a1f933878e Mon Sep 17 00:00:00 2001 From: Aaron Pham <29749331+aarnphm@users.noreply.github.com> Date: Mon, 19 Sep 2022 16:41:43 -0700 Subject: [PATCH] fix: metrics service_name incorrect setup Signed-off-by: Aaron Pham <29749331+aarnphm@users.noreply.github.com> --- .../_internal/server/metrics/prometheus.py | 9 +- bentoml/_internal/utils/__init__.py | 86 ++++++++++++-- bentoml/_internal/utils/analytics/schemas.py | 15 ++- .../_internal/utils/analytics/usage_stats.py | 110 +++++++++++++----- bentoml/serve.py | 6 +- bentoml/start.py | 2 +- tests/unit/_internal/utils/test_analytics.py | 4 +- 7 files changed, 180 insertions(+), 52 deletions(-) diff --git a/bentoml/_internal/server/metrics/prometheus.py b/bentoml/_internal/server/metrics/prometheus.py index ce2230e0668..88bd9eff649 100644 --- a/bentoml/_internal/server/metrics/prometheus.py +++ b/bentoml/_internal/server/metrics/prometheus.py @@ -34,10 +34,10 @@ def __init__( assert multiproc_dir is not None, "multiproc_dir must be provided" self.multiproc = multiproc - self.multiproc_dir: t.Optional[str] = multiproc_dir + self.multiproc_dir: str | None = multiproc_dir self._registry = None self._imported = False - self._pid: t.Optional[int] = None + self._pid: int | None = None @property def prometheus_client(self): @@ -56,6 +56,7 @@ def prometheus_client(self): # step 2: import prometheus_client + import prometheus_client.parser import prometheus_client.multiprocess self._imported = True @@ -106,9 +107,7 @@ def generate_latest(self): return self.prometheus_client.generate_latest() def text_string_to_metric_families(self) -> t.Generator[Metric, None, None]: - from prometheus_client.parser import text_string_to_metric_families - - yield from text_string_to_metric_families( + yield from self.prometheus_client.parser.text_string_to_metric_families( self.generate_latest().decode("utf-8") ) diff --git a/bentoml/_internal/utils/__init__.py b/bentoml/_internal/utils/__init__.py index 71ad30f259a..ca15f91fc61 100644 --- a/bentoml/_internal/utils/__init__.py +++ b/bentoml/_internal/utils/__init__.py @@ -13,6 +13,7 @@ from typing import overload from typing import TYPE_CHECKING from pathlib import Path +from reprlib import recursive_repr as _recursive_repr from datetime import date from datetime import time from datetime import datetime @@ -30,16 +31,18 @@ from .cattr import bentoml_cattr from ..types import LazyType -from ..types import PathType -from ..types import MetadataDict -from ..types import MetadataType from .lazy_loader import LazyLoader if TYPE_CHECKING: from fs.base import FS + from typing_extensions import Self + + from ..types import PathType + from ..types import MetadataDict + from ..types import MetadataType P = t.ParamSpec("P") - GenericFunction = t.Callable[P, t.Any] + F = t.Callable[P, t.Any] C = t.TypeVar("C") @@ -59,6 +62,7 @@ "display_path_under_home", "rich_console", "experimental", + "compose", ] _EXPERIMENTAL_APIS: set[str] = set() @@ -350,13 +354,16 @@ class cached_contextmanager: Just like contextlib.contextmanager, but will cache the yield value for the same arguments. When all instances of the same contextmanager exits, the cache value will be dropped. - Example Usage: - (To reuse the container based on the same image) - >>> @cached_contextmanager("{docker_image.id}") - >>> def start_docker_container_from_image(docker_image, timeout=60): - >>> container = ... - >>> yield container - >>> container.stop() + + Example Usage: (To reuse the container based on the same image) + + .. code-block:: python + + @cached_contextmanager("{docker_image.id}") + def start_docker_container_from_image(docker_image, timeout=60): + container = ... + yield container + container.stop() """ def __init__(self, cache_key_template: t.Optional[str] = None): @@ -387,3 +394,60 @@ def _func(*args: "P.args", **kwargs: "P.kwargs") -> t.Any: self._cache.pop(cache_key) return _func + + +class compose: + """ + Function composition: compose(f, g)(...) is equivalent to f(g(...)). + Refers to https://github.com/mentalisttraceur/python-compose for original implementation. + + Args: + *functions: Functions (or other callables) to compose. + + Raises: + TypeError: If no arguments are given, or any argument is not callable. + """ + + def __init__(self: Self, *functions: F[t.Any]): + if not functions: + raise TypeError(f"{self!r} needs at least one argument.") + _functions: list[F[t.Any]] = [] + for function in reversed(functions): + if not callable(function): + raise TypeError(f"{self!r} arguments must be callable.") + if isinstance(function, compose): + _functions.extend(function.functions) + else: + _functions.append(function) + self.__wrapped__ = _functions[0] + self._wrappers = tuple(_functions[1:]) + + def __call__(self, /, *args: t.Any, **kwargs: t.Any) -> t.Any: + """Call the composed function.""" + result = self.__wrapped__(*args, **kwargs) + for function in self._wrappers: + result = function(result) + return result + + def __get__(self, obj: t.Any, typ_: type | None = None): + """Get the composed function as a bound method.""" + wrapped = self.__wrapped__ + try: + bind = type(wrapped).__get__ + except AttributeError: + return self + bound_wrapped = bind(wrapped, obj, typ_) + if bound_wrapped is wrapped: + return self + bound_self = type(self)(bound_wrapped) + bound_self._wrappers = self._wrappers + return bound_self + + @_recursive_repr("<...>") + def __repr__(self): + return f"{self!r}({','.join(map(repr, reversed(self.functions)))})" + + @property + def functions(self): + """Read-only tuple of the composed callables, in order of execution.""" + return (self.__wrapped__,) + tuple(self._wrappers) diff --git a/bentoml/_internal/utils/analytics/schemas.py b/bentoml/_internal/utils/analytics/schemas.py index a34e814975a..dca1362eb57 100644 --- a/bentoml/_internal/utils/analytics/schemas.py +++ b/bentoml/_internal/utils/analytics/schemas.py @@ -183,6 +183,7 @@ class ModelSaveEvent(EventMeta): class ServeInitEvent(EventMeta): serve_id: str production: bool + grpc: bool serve_from_bento: bool bento_creation_timestamp: t.Optional[datetime] @@ -199,9 +200,21 @@ class ServeInitEvent(EventMeta): class ServeUpdateEvent(EventMeta): serve_id: str production: bool - component: str + grpc: bool triggered_at: datetime duration_in_seconds: int + component: str = attr.field( + validator=attr.validators.and_( + attr.validators.instance_of(str), + attr.validators.in_(["standalone", "api_server", "runner"]), + ) + ) + metrics_type: str = attr.field( + validator=attr.validators.and_( + attr.validators.instance_of(str), + attr.validators.in_(["not_available", "legacy", "current"]), + ), + ) metrics: t.List[t.Any] = attr.field(factory=list) diff --git a/bentoml/_internal/utils/analytics/usage_stats.py b/bentoml/_internal/utils/analytics/usage_stats.py index d9e28fab8ad..2db4978da23 100644 --- a/bentoml/_internal/utils/analytics/usage_stats.py +++ b/bentoml/_internal/utils/analytics/usage_stats.py @@ -17,11 +17,13 @@ from simple_di import inject from simple_di import Provide +from ...utils import compose from .schemas import EventMeta from .schemas import ServeInitEvent from .schemas import TrackingPayload from .schemas import CommonProperties from .schemas import ServeUpdateEvent +from ...configuration import get_debug_mode from ...configuration.containers import BentoMLContainer if TYPE_CHECKING: @@ -29,6 +31,8 @@ T = t.TypeVar("T") AsyncFunc = t.Callable[P, t.Coroutine[t.Any, t.Any, t.Any]] + from prometheus_client.samples import Sample + from bentoml import Service from ...server.metrics.prometheus import PrometheusClient @@ -62,7 +66,12 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> t.Any: return func(*args, **kwargs) except Exception as err: # pylint: disable=broad-except if _usage_event_debugging(): - logger.info(f"Tracking Error: {err}") + if get_debug_mode(): + logger.error( + f"Tracking Error: {err}", stack_info=True, stacklevel=3 + ) + else: + logger.info(f"Tracking Error: {err}") else: logger.debug(f"Tracking Error: {err}") @@ -116,6 +125,7 @@ def track(event_properties: EventMeta): def _track_serve_init( svc: Service, production: bool, + grpc: bool, serve_info: ServeInfo = Provide[BentoMLContainer.serve_info], ): if svc.bento is not None: @@ -124,6 +134,7 @@ def _track_serve_init( serve_id=serve_info.serve_id, serve_from_bento=True, production=production, + grpc=grpc, bento_creation_timestamp=bento.info.creation_time, num_of_models=len(bento.info.models), num_of_runners=len(svc.runners), @@ -138,6 +149,7 @@ def _track_serve_init( serve_id=serve_info.serve_id, serve_from_bento=False, production=production, + grpc=grpc, bento_creation_timestamp=None, num_of_models=len( set( @@ -160,42 +172,76 @@ def _track_serve_init( EXCLUDE_PATHS = {"/docs.json", "/livez", "/healthz", "/readyz"} -def get_metrics_report( - metrics_client: PrometheusClient, -) -> list[dict[str, str | float]]: - metrics_text = metrics_client.generate_latest().decode("utf-8") - if not metrics_text: - return [] +def filter_metrics( + samples: list[Sample], *filters: t.Callable[[list[Sample]], list[Sample]] +): + return [ + {**sample.labels, "value": sample.value} + for sample in compose(*filters)(samples) + ] - from prometheus_client.parser import ( - text_string_to_metric_families, # type: ignore (unfinished prometheus types) - ) - for metric in text_string_to_metric_families(metrics_text): - # Searching for the metric BENTOML_{service_name}_request of type Counter - if ( - metric.type == "counter" - and metric.name.startswith("BENTOML_") - and metric.name.endswith("_request") - ): - return [ - {**sample.labels, "value": sample.value} - for sample in metric.samples - if "endpoint" in sample.labels - # exclude common infra paths - and sample.labels["endpoint"] not in EXCLUDE_PATHS - # exclude static_content prefix - and not sample.labels["endpoint"].startswith("/static_content/") +@inject +def get_metrics_report( + metrics_client: PrometheusClient, + grpc: bool = False, +) -> tuple[list[dict[str, str | float]], bool | None]: + """ + Get Prometheus metrics reports from the metrics client. This will be used to determine tracking events. + If the return metrics are legacy metrics, the metrics will have prefix BENTOML_, otherwise they will have prefix bentoml_ + + Args: + metrics_client: Instance of bentoml._internal.server.metrics.prometheus.PrometheusClient + grpc: Whether the metrics are for gRPC server. + + Returns: + A tuple of a list of metrics and an optional boolean to determine whether the return metrics are legacy metrics. + """ + for metric in metrics_client.text_string_to_metric_families(): + metric_type = t.cast("str", metric.type) # type: ignore (we need to cast due to no prometheus types) + metric_name = t.cast("str", metric.name) # type: ignore (we need to cast due to no prometheus types) + metric_samples = t.cast("list[Sample]", metric.samples) # type: ignore (we need to cast due to no prometheus types) + if metric_type != "counter": + continue + # We only care about the counter metrics. + assert metric_type == "counter" + if grpc: + _filters: list[t.Callable[[list[Sample]], list[Sample]]] = [ + lambda samples: [s for s in samples if "api_name" in s.labels] + ] + else: + _filters = [ + lambda samples: [ + s + for s in samples + if not s.labels["endpoint"].startswith("/static_content/") + ], + lambda samples: [ + s for s in samples if s.labels["endpoint"] not in EXCLUDE_PATHS + ], + lambda samples: [s for s in samples if "endpoint" in s.labels], ] + # If metrics prefix is BENTOML_, this is legacy metrics + if metric_name.endswith("_request"): + if metric_name.startswith("BENTOML_"): + # This is the case where we have legacy metrics with + # newer metrics. We will ignore the newer metrics. + return filter_metrics(metric_samples, *_filters), True + else: + # This is the case where we only have newer metrics + assert metric_name.startswith("bentoml_") + return filter_metrics(metric_samples, *_filters), False - return [] + return [], None @inject @contextlib.contextmanager def track_serve( svc: Service, - production: bool, + *, + production: bool = False, + grpc: bool = False, component: str = "standalone", metrics_client: PrometheusClient = Provide[BentoMLContainer.metrics_client], serve_info: ServeInfo = Provide[BentoMLContainer.serve_info], @@ -204,7 +250,7 @@ def track_serve( yield return - _track_serve_init(svc, production) + _track_serve_init(svc=svc, production=production, grpc=grpc) if _usage_event_debugging(): tracking_interval = 5 @@ -217,14 +263,20 @@ def track_serve( def loop() -> t.NoReturn: # type: ignore last_tracked_timestamp: datetime = serve_info.serve_started_timestamp while not stop_event.wait(tracking_interval): # pragma: no cover + metrics_type = "not_available" + metrics, use_legacy_metrics = get_metrics_report(metrics_client, grpc=grpc) + if use_legacy_metrics is not None: + metrics_type = "legacy" if use_legacy_metrics else "current" now = datetime.now(timezone.utc) event_properties = ServeUpdateEvent( serve_id=serve_info.serve_id, production=production, + grpc=grpc, component=component, triggered_at=now, duration_in_seconds=int((now - last_tracked_timestamp).total_seconds()), - metrics=get_metrics_report(metrics_client), + metrics_type=metrics_type, + metrics=metrics, ) last_tracked_timestamp = now track(event_properties) diff --git a/bentoml/serve.py b/bentoml/serve.py index fe347415772..18842d7582d 100644 --- a/bentoml/serve.py +++ b/bentoml/serve.py @@ -265,7 +265,7 @@ def serve_http_development( loglevel="WARNING", ) - with track_serve(svc, production=False): + with track_serve(svc): arbiter.start( cb=lambda _: logger.info( # type: ignore 'Starting development %s BentoServer from "%s" running on http://%s:%d (Press CTRL+C to quit)', @@ -624,7 +624,7 @@ def serve_grpc_development( loglevel="ERROR", ) - with track_serve(svc, production=False): + with track_serve(svc, grpc=True): arbiter.start( cb=lambda _: logger.info( # type: ignore 'Starting development %s BentoServer from "%s" running on http://%s:%d (Press CTRL+C to quit)', @@ -846,7 +846,7 @@ def serve_grpc_production( watchers=watchers, sockets=list(circus_socket_map.values()) ) - with track_serve(svc, production=True): + with track_serve(svc, production=True, grpc=True): try: arbiter.start( cb=lambda _: logger.info( # type: ignore diff --git a/bentoml/start.py b/bentoml/start.py index 1541f61e89d..905935f689d 100644 --- a/bentoml/start.py +++ b/bentoml/start.py @@ -347,7 +347,7 @@ def start_grpc_server( arbiter = create_standalone_arbiter( watchers=watchers, sockets=list(circus_socket_map.values()) ) - with track_serve(svc, production=True, component=API_SERVER): + with track_serve(svc, production=True, component=API_SERVER, grpc=True): arbiter.start( cb=lambda _: logger.info( # type: ignore 'Starting bare %s BentoServer from "%s" running on http://%s:%d (Press CTRL+C to quit)', diff --git a/tests/unit/_internal/utils/test_analytics.py b/tests/unit/_internal/utils/test_analytics.py index 5c170f44d64..dad6baa8b21 100644 --- a/tests/unit/_internal/utils/test_analytics.py +++ b/tests/unit/_internal/utils/test_analytics.py @@ -212,7 +212,7 @@ def test_track_serve_do_not_track(mock_do_not_track: MagicMock, noop_service: Se @patch("bentoml._internal.utils.analytics.usage_stats.do_not_track") @patch("bentoml._internal.server.metrics.prometheus.PrometheusClient") -def test_get_metrics_report( +def test_legacy_get_metrics_report( mock_prometheus_client: MagicMock, mock_do_not_track: MagicMock, noop_service: Service, @@ -233,7 +233,7 @@ def test_get_metrics_report( # HELP BENTOML_noop_request_duration_seconds Multiprocess metric # TYPE BENTOML_noop_request_duration_seconds histogram """ - output = analytics.usage_stats.get_metrics_report(mock_prometheus_client) + output, _ = analytics.usage_stats.get_metrics_report(mock_prometheus_client) assert { "endpoint": "/classify", "http_response_code": "200",