diff --git a/bentoml/_internal/server/metrics/prometheus.py b/bentoml/_internal/server/metrics/prometheus.py index ce2230e066..88bd9eff64 100644 --- a/bentoml/_internal/server/metrics/prometheus.py +++ b/bentoml/_internal/server/metrics/prometheus.py @@ -34,10 +34,10 @@ def __init__( assert multiproc_dir is not None, "multiproc_dir must be provided" self.multiproc = multiproc - self.multiproc_dir: t.Optional[str] = multiproc_dir + self.multiproc_dir: str | None = multiproc_dir self._registry = None self._imported = False - self._pid: t.Optional[int] = None + self._pid: int | None = None @property def prometheus_client(self): @@ -56,6 +56,7 @@ def prometheus_client(self): # step 2: import prometheus_client + import prometheus_client.parser import prometheus_client.multiprocess self._imported = True @@ -106,9 +107,7 @@ def generate_latest(self): return self.prometheus_client.generate_latest() def text_string_to_metric_families(self) -> t.Generator[Metric, None, None]: - from prometheus_client.parser import text_string_to_metric_families - - yield from text_string_to_metric_families( + yield from self.prometheus_client.parser.text_string_to_metric_families( self.generate_latest().decode("utf-8") ) diff --git a/bentoml/_internal/utils/__init__.py b/bentoml/_internal/utils/__init__.py index df5e4acfa4..30b923880b 100644 --- a/bentoml/_internal/utils/__init__.py +++ b/bentoml/_internal/utils/__init__.py @@ -14,6 +14,7 @@ from typing import overload from typing import TYPE_CHECKING from pathlib import Path +from reprlib import recursive_repr as _recursive_repr from datetime import date from datetime import time from datetime import datetime @@ -31,16 +32,18 @@ from .cattr import bentoml_cattr from ..types import LazyType -from ..types import PathType -from ..types import MetadataDict -from ..types import MetadataType from .lazy_loader import LazyLoader if TYPE_CHECKING: from fs.base import FS + from typing_extensions import Self + + from ..types import PathType + from ..types import MetadataDict + from ..types import MetadataType P = t.ParamSpec("P") - GenericFunction = t.Callable[P, t.Any] + F = t.Callable[P, t.Any] C = t.TypeVar("C") @@ -60,6 +63,7 @@ "display_path_under_home", "rich_console", "experimental", + "compose", ] _EXPERIMENTAL_APIS: set[str] = set() @@ -389,13 +393,16 @@ class cached_contextmanager: Just like contextlib.contextmanager, but will cache the yield value for the same arguments. When all instances of the same contextmanager exits, the cache value will be dropped. - Example Usage: - (To reuse the container based on the same image) - >>> @cached_contextmanager("{docker_image.id}") - >>> def start_docker_container_from_image(docker_image, timeout=60): - >>> container = ... - >>> yield container - >>> container.stop() + + Example Usage: (To reuse the container based on the same image) + + .. code-block:: python + + @cached_contextmanager("{docker_image.id}") + def start_docker_container_from_image(docker_image, timeout=60): + container = ... + yield container + container.stop() """ def __init__(self, cache_key_template: t.Optional[str] = None): @@ -426,3 +433,60 @@ def _func(*args: "P.args", **kwargs: "P.kwargs") -> t.Any: self._cache.pop(cache_key) return _func + + +class compose: + """ + Function composition: compose(f, g)(...) is equivalent to f(g(...)). + Refers to https://github.com/mentalisttraceur/python-compose for original implementation. + + Args: + *functions: Functions (or other callables) to compose. + + Raises: + TypeError: If no arguments are given, or any argument is not callable. + """ + + def __init__(self: Self, *functions: F[t.Any]): + if not functions: + raise TypeError(f"{self!r} needs at least one argument.") + _functions: list[F[t.Any]] = [] + for function in reversed(functions): + if not callable(function): + raise TypeError(f"{self!r} arguments must be callable.") + if isinstance(function, compose): + _functions.extend(function.functions) + else: + _functions.append(function) + self.__wrapped__ = _functions[0] + self._wrappers = tuple(_functions[1:]) + + def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Any: + """Call the composed function.""" + result = self.__wrapped__(*args, **kwargs) + for function in self._wrappers: + result = function(result) + return result + + def __get__(self, obj: t.Any, typ_: type | None = None): + """Get the composed function as a bound method.""" + wrapped = self.__wrapped__ + try: + bind = type(wrapped).__get__ + except AttributeError: + return self + bound_wrapped = bind(wrapped, obj, typ_) + if bound_wrapped is wrapped: + return self + bound_self = type(self)(bound_wrapped) + bound_self._wrappers = self._wrappers + return bound_self + + @_recursive_repr("<...>") + def __repr__(self): + return f"{self!r}({','.join(map(repr, reversed(self.functions)))})" + + @property + def functions(self): + """Read-only tuple of the composed callables, in order of execution.""" + return (self.__wrapped__,) + tuple(self._wrappers) diff --git a/bentoml/_internal/utils/analytics/schemas.py b/bentoml/_internal/utils/analytics/schemas.py index a34e814975..bf46ead3b5 100644 --- a/bentoml/_internal/utils/analytics/schemas.py +++ b/bentoml/_internal/utils/analytics/schemas.py @@ -179,6 +179,12 @@ class ModelSaveEvent(EventMeta): model_size_in_kb: float +# serve_kind determines different type of serving scenarios +SERVE_KIND = ["grpc", "http"] +# components are different components to be tracked. +COMPONENT_KIND = ["standalone", "api_server", "runner"] + + @attr.define class ServeInitEvent(EventMeta): serve_id: str @@ -186,6 +192,7 @@ class ServeInitEvent(EventMeta): serve_from_bento: bool bento_creation_timestamp: t.Optional[datetime] + serve_kind: str = attr.field(validator=attr.validators.in_(SERVE_KIND)) num_of_models: int = attr.field(default=0) num_of_runners: int = attr.field(default=0) num_of_apis: int = attr.field(default=0) @@ -199,9 +206,14 @@ class ServeInitEvent(EventMeta): class ServeUpdateEvent(EventMeta): serve_id: str production: bool - component: str triggered_at: datetime duration_in_seconds: int + serve_kind: str = attr.field(validator=attr.validators.in_(SERVE_KIND)) + component: str = attr.field( + validator=attr.validators.and_( + attr.validators.instance_of(str), attr.validators.in_(COMPONENT_KIND) + ) + ) metrics: t.List[t.Any] = attr.field(factory=list) diff --git a/bentoml/_internal/utils/analytics/usage_stats.py b/bentoml/_internal/utils/analytics/usage_stats.py index d9e28fab8a..c49460dc3a 100644 --- a/bentoml/_internal/utils/analytics/usage_stats.py +++ b/bentoml/_internal/utils/analytics/usage_stats.py @@ -17,11 +17,13 @@ from simple_di import inject from simple_di import Provide +from ...utils import compose from .schemas import EventMeta from .schemas import ServeInitEvent from .schemas import TrackingPayload from .schemas import CommonProperties from .schemas import ServeUpdateEvent +from ...configuration import get_debug_mode from ...configuration.containers import BentoMLContainer if TYPE_CHECKING: @@ -29,6 +31,8 @@ T = t.TypeVar("T") AsyncFunc = t.Callable[P, t.Coroutine[t.Any, t.Any, t.Any]] + from prometheus_client.samples import Sample + from bentoml import Service from ...server.metrics.prometheus import PrometheusClient @@ -62,7 +66,12 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> t.Any: return func(*args, **kwargs) except Exception as err: # pylint: disable=broad-except if _usage_event_debugging(): - logger.info(f"Tracking Error: {err}") + if get_debug_mode(): + logger.error( + f"Tracking Error: {err}", stack_info=True, stacklevel=3 + ) + else: + logger.info(f"Tracking Error: {err}") else: logger.debug(f"Tracking Error: {err}") @@ -116,6 +125,7 @@ def track(event_properties: EventMeta): def _track_serve_init( svc: Service, production: bool, + serve_kind: str, serve_info: ServeInfo = Provide[BentoMLContainer.serve_info], ): if svc.bento is not None: @@ -124,6 +134,7 @@ def _track_serve_init( serve_id=serve_info.serve_id, serve_from_bento=True, production=production, + serve_kind=serve_kind, bento_creation_timestamp=bento.info.creation_time, num_of_models=len(bento.info.models), num_of_runners=len(svc.runners), @@ -138,6 +149,7 @@ def _track_serve_init( serve_id=serve_info.serve_id, serve_from_bento=False, production=production, + serve_kind=serve_kind, bento_creation_timestamp=None, num_of_models=len( set( @@ -160,33 +172,61 @@ def _track_serve_init( EXCLUDE_PATHS = {"/docs.json", "/livez", "/healthz", "/readyz"} +def filter_metrics( + samples: list[Sample], *filters: t.Callable[[list[Sample]], list[Sample]] +): + return [ + {**sample.labels, "value": sample.value} + for sample in compose(*filters)(samples) + ] + + def get_metrics_report( metrics_client: PrometheusClient, + serve_kind: str, ) -> list[dict[str, str | float]]: - metrics_text = metrics_client.generate_latest().decode("utf-8") - if not metrics_text: - return [] - - from prometheus_client.parser import ( - text_string_to_metric_families, # type: ignore (unfinished prometheus types) - ) - - for metric in text_string_to_metric_families(metrics_text): - # Searching for the metric BENTOML_{service_name}_request of type Counter - if ( - metric.type == "counter" - and metric.name.startswith("BENTOML_") - and metric.name.endswith("_request") - ): - return [ - {**sample.labels, "value": sample.value} - for sample in metric.samples - if "endpoint" in sample.labels - # exclude common infra paths - and sample.labels["endpoint"] not in EXCLUDE_PATHS - # exclude static_content prefix - and not sample.labels["endpoint"].startswith("/static_content/") + """ + Get Prometheus metrics reports from the metrics client. This will be used to determine tracking events. + If the return metrics are legacy metrics, the metrics will have prefix BENTOML_, otherwise they will have prefix bentoml_ + + Args: + metrics_client: Instance of bentoml._internal.server.metrics.prometheus.PrometheusClient + grpc: Whether the metrics are for gRPC server. + + Returns: + A tuple of a list of metrics and an optional boolean to determine whether the return metrics are legacy metrics. + """ + for metric in metrics_client.text_string_to_metric_families(): + metric_type = t.cast("str", metric.type) # type: ignore (we need to cast due to no prometheus types) + metric_name = t.cast("str", metric.name) # type: ignore (we need to cast due to no prometheus types) + metric_samples = t.cast("list[Sample]", metric.samples) # type: ignore (we need to cast due to no prometheus types) + if metric_type != "counter": + continue + # We only care about the counter metrics. + assert metric_type == "counter" + if serve_kind == "grpc": + _filters: list[t.Callable[[list[Sample]], list[Sample]]] = [ + lambda samples: [s for s in samples if "api_name" in s.labels] + ] + elif serve_kind == "http": + _filters = [ + lambda samples: [ + s + for s in samples + if not s.labels["endpoint"].startswith("/static_content/") + ], + lambda samples: [ + s for s in samples if s.labels["endpoint"] not in EXCLUDE_PATHS + ], + lambda samples: [s for s in samples if "endpoint" in s.labels], ] + else: + raise NotImplementedError("Unknown serve kind %s" % serve_kind) + # If metrics prefix is BENTOML_, this is legacy metrics + if metric_name.endswith("_request") and ( + metric_name.startswith("bentoml_") or metric_name.startswith("BENTOML_") + ): + return filter_metrics(metric_samples, *_filters) return [] @@ -195,7 +235,9 @@ def get_metrics_report( @contextlib.contextmanager def track_serve( svc: Service, - production: bool, + *, + production: bool = False, + serve_kind: str = "http", component: str = "standalone", metrics_client: PrometheusClient = Provide[BentoMLContainer.metrics_client], serve_info: ServeInfo = Provide[BentoMLContainer.serve_info], @@ -204,12 +246,12 @@ def track_serve( yield return - _track_serve_init(svc, production) + _track_serve_init(svc=svc, production=production, serve_kind=serve_kind) if _usage_event_debugging(): tracking_interval = 5 else: - tracking_interval = SERVE_USAGE_TRACKING_INTERVAL_SECONDS + tracking_interval = SERVE_USAGE_TRACKING_INTERVAL_SECONDS # pragma: no cover stop_event = threading.Event() @@ -221,10 +263,13 @@ def loop() -> t.NoReturn: # type: ignore event_properties = ServeUpdateEvent( serve_id=serve_info.serve_id, production=production, + # Note that we are currently only have two tracking jobs: http and grpc + serve_kind=serve_kind, + # Current accept components are "standalone", "api_server" and "runner" component=component, triggered_at=now, duration_in_seconds=int((now - last_tracked_timestamp).total_seconds()), - metrics=get_metrics_report(metrics_client), + metrics=get_metrics_report(metrics_client, serve_kind=serve_kind), ) last_tracked_timestamp = now track(event_properties) diff --git a/bentoml/serve.py b/bentoml/serve.py index 092e02facc..ad6b620bc7 100644 --- a/bentoml/serve.py +++ b/bentoml/serve.py @@ -267,7 +267,7 @@ def serve_http_development( loglevel="WARNING", ) - with track_serve(svc, production=False): + with track_serve(svc): arbiter.start( cb=lambda _: logger.info( # type: ignore 'Starting development %s BentoServer from "%s" running on http://%s:%d (Press CTRL+C to quit)', @@ -630,7 +630,7 @@ def serve_grpc_development( loglevel="ERROR", ) - with track_serve(svc, production=False): + with track_serve(svc, serve_kind="grpc"): arbiter.start( cb=lambda _: logger.info( # type: ignore 'Starting development %s BentoServer from "%s" listening on %s:%d (Press CTRL+C to quit)', @@ -854,7 +854,7 @@ def serve_grpc_production( watchers=watchers, sockets=list(circus_socket_map.values()) ) - with track_serve(svc, production=True): + with track_serve(svc, production=True, serve_kind="grpc"): try: arbiter.start( cb=lambda _: logger.info( # type: ignore diff --git a/bentoml/start.py b/bentoml/start.py index 1541f61e89..03d083b7c8 100644 --- a/bentoml/start.py +++ b/bentoml/start.py @@ -347,7 +347,7 @@ def start_grpc_server( arbiter = create_standalone_arbiter( watchers=watchers, sockets=list(circus_socket_map.values()) ) - with track_serve(svc, production=True, component=API_SERVER): + with track_serve(svc, production=True, component=API_SERVER, serve_kind="grpc"): arbiter.start( cb=lambda _: logger.info( # type: ignore 'Starting bare %s BentoServer from "%s" running on http://%s:%d (Press CTRL+C to quit)', diff --git a/pyproject.toml b/pyproject.toml index c0f5ff9177..f72035845c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -186,6 +186,7 @@ exclude = ''' | dist | typings | bentoml/grpc/v1alpha1 + | grpc-client/thirdparty )/ | bentoml/_version.py ) @@ -193,7 +194,7 @@ exclude = ''' extend-exclude = "(_pb2.py$|_pb2_grpc.py$)" [tool.pytest.ini_options] -addopts = "-rfEX -p pytester -p no:warnings -x --capture=tee-sys" +addopts = "-rfEX -p pytester -p no:warnings -x --capture=tee-sys --cov-report=term-missing --cov-append" python_files = ["test_*.py", "*_test.py"] testpaths = ["tests"] markers = ["gpus", "disable-tf-eager-execution"] @@ -342,6 +343,9 @@ skip_glob = [ "**/*_pb2_grpc.py", "venv/*", "lib/*", + 'bentoml/grpc/v1alpha1/', + "grpc-client/thirdparty", + "bazel-*", ] [tool.pyright] @@ -354,6 +358,8 @@ exclude = [ 'bentoml/grpc/v1alpha1/', '**/*_pb2.py', "**/*_pb2_grpc.py", + "grpc-client/thirdparty", + "bazel-*", ] useLibraryCodeForTypes = true strictListInference = true diff --git a/tests/unit/_internal/utils/test_analytics.py b/tests/unit/_internal/utils/test_analytics.py index 5c170f44d6..6e3de4e122 100644 --- a/tests/unit/_internal/utils/test_analytics.py +++ b/tests/unit/_internal/utils/test_analytics.py @@ -1,5 +1,7 @@ +# pylint: disable=unused-argument from __future__ import annotations +import typing as t import logging from typing import TYPE_CHECKING from unittest.mock import Mock @@ -9,6 +11,9 @@ from schema import Or from schema import And from schema import Schema +from prometheus_client.parser import ( + text_string_to_metric_families, # type: ignore (no prometheus types) +) import bentoml from bentoml._internal.utils import analytics @@ -18,6 +23,7 @@ from _pytest.logging import LogCaptureFixture from _pytest.monkeypatch import MonkeyPatch + from prometheus_client.metrics_core import Metric from bentoml import Service @@ -142,6 +148,7 @@ def test_track_serve_init( noop_service, production=production, serve_info=analytics.usage_stats.get_serve_info(), + serve_kind="http", ) assert mock_do_not_track.called @@ -153,6 +160,7 @@ def test_track_serve_init( noop_service, production=production, serve_info=analytics.usage_stats.get_serve_info(), + serve_kind="http", ) assert "model_types" in caplog.text @@ -176,24 +184,37 @@ def test_track_serve_init_no_bento( bentoml.Service("test"), production=False, serve_info=analytics.usage_stats.get_serve_info(), + serve_kind="http", ) assert "model_types" not in caplog.text @patch("bentoml._internal.server.metrics.prometheus.PrometheusClient") @pytest.mark.parametrize( - "mock_output", + "mock_output,expected", [ - b"", - b"""# HELP BENTOML_noop_request_total Multiprocess metric""", + (b"", []), + ( + b"""# HELP BENTOML_noop_request_total Multiprocess metric""", + [], + ), ], ) -def test_get_metrics_report_filtered( - mock_prometheus_client: MagicMock, mock_output: bytes +@pytest.mark.parametrize("serve_kind", ["grpc", "http"]) +def test_filter_metrics_report( + mock_prometheus_client: MagicMock, + mock_output: bytes, + expected: tuple[list[t.Any], bool | None], + serve_kind: str, ): mock_prometheus_client.multiproc.return_value = False mock_prometheus_client.generate_latest.return_value = mock_output - assert analytics.usage_stats.get_metrics_report(mock_prometheus_client) == [] + assert ( + analytics.usage_stats.get_metrics_report( + mock_prometheus_client, serve_kind=serve_kind + ) + == expected + ) @patch("bentoml._internal.utils.analytics.usage_stats.do_not_track") @@ -212,33 +233,33 @@ def test_track_serve_do_not_track(mock_do_not_track: MagicMock, noop_service: Se @patch("bentoml._internal.utils.analytics.usage_stats.do_not_track") @patch("bentoml._internal.server.metrics.prometheus.PrometheusClient") -def test_get_metrics_report( +def test_legacy_get_metrics_report( mock_prometheus_client: MagicMock, mock_do_not_track: MagicMock, noop_service: Service, ): mock_do_not_track.return_value = True mock_prometheus_client.multiproc.return_value = False - mock_prometheus_client.generate_latest.return_value = b"""\ -# HELP BENTOML_noop_request_total Multiprocess metric -# TYPE BENTOML_noop_request_total counter -BENTOML_noop_request_total{endpoint="/docs.json",http_response_code="200",service_version=""} 2.0 -BENTOML_noop_request_total{endpoint="/classify",http_response_code="200",service_version=""} 9.0 -BENTOML_noop_request_total{endpoint="/",http_response_code="200",service_version=""} 1.0 -# HELP BENTOML_noop_request_in_progress Multiprocess metric -# TYPE BENTOML_noop_request_in_progress gauge -BENTOML_noop_request_in_progress{endpoint="/",service_version=""} 0.0 -BENTOML_noop_request_in_progress{endpoint="/docs.json",service_version=""} 0.0 -BENTOML_noop_request_in_progress{endpoint="/classify",service_version=""} 0.0 -# HELP BENTOML_noop_request_duration_seconds Multiprocess metric -# TYPE BENTOML_noop_request_duration_seconds histogram - """ - output = analytics.usage_stats.get_metrics_report(mock_prometheus_client) + mock_prometheus_client.text_string_to_metric_families.return_value = text_string_to_metric_families( + b"""\ +# HELP BENTOML_noop_service_request_in_progress Multiprocess metric +# TYPE BENTOML_noop_service_request_in_progress gauge +BENTOML_noop_service_request_in_progress{endpoint="/predict",service_version="not available"} 0.0 +# HELP BENTOML_noop_service_request_total Multiprocess metric +# TYPE BENTOML_noop_service_request_total counter +BENTOML_noop_service_request_total{endpoint="/predict",http_response_code="200",service_version="not available"} 8.0 +""".decode( + "utf-8" + ) + ) + output = analytics.usage_stats.get_metrics_report( + mock_prometheus_client, serve_kind="http" + ) assert { - "endpoint": "/classify", + "endpoint": "/predict", "http_response_code": "200", - "service_version": "", - "value": 9.0, + "service_version": "not available", + "value": 8.0, } in output endpoints = [filtered["endpoint"] for filtered in output] @@ -246,6 +267,58 @@ def test_get_metrics_report( assert not any(x in endpoints for x in analytics.usage_stats.EXCLUDE_PATHS) +@patch("bentoml._internal.server.metrics.prometheus.PrometheusClient") +@pytest.mark.parametrize( + "serve_kind,expected", + [ + ( + "grpc", + { + "api_name": "pred_json", + "http_response_code": "200", + "service_name": "noop_service", + "service_version": "not available", + "value": 15.0, + }, + ), + ("http", None), + ], +) +@pytest.mark.parametrize( + "generated_metrics", + [ + text_string_to_metric_families( + b"""\ + # HELP bentoml_api_server_request_total Multiprocess metric + # TYPE bentoml_api_server_request_total counter + bentoml_api_server_request_total{api_name="pred_json",http_response_code="200",service_name="noop_service",service_version="not available"} 15.0 + # HELP bentoml_api_server_request_in_progress Multiprocess metric + # TYPE bentoml_api_server_request_in_progress gauge + bentoml_api_server_request_in_progress{api_name="pred_json",service_name="noop_service",service_version="not available"} 0.0 + """.decode( + "utf-8" + ) + ) + ], +) +def test_get_metrics_report( + mock_prometheus_client: MagicMock, + noop_service: Service, + serve_kind: str, + expected: dict[str, str | float] | None, + generated_metrics: t.Generator[Metric, None, None], +): + mock_prometheus_client.multiproc.return_value = False + mock_prometheus_client.text_string_to_metric_families.return_value = ( + generated_metrics + ) + output = analytics.usage_stats.get_metrics_report( + mock_prometheus_client, serve_kind=serve_kind + ) + if expected: + assert expected in output + + @patch("bentoml._internal.utils.analytics.usage_stats.do_not_track") @patch("bentoml._internal.utils.analytics.usage_stats.requests.post") @patch("bentoml._internal.utils.analytics.usage_stats._track_serve_init")