From 532f5e1999f43a8581b72e98f7b199ab6857a1a4 Mon Sep 17 00:00:00 2001 From: bojiang <5886138+bojiang@users.noreply.github.com> Date: Fri, 16 Sep 2022 09:26:39 +0800 Subject: [PATCH] feat(server): add runner metrics; refactoring batch size metrics (#2977) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What does this PR address? Fixes https://github.com/bentoml/Kitchen/issues/40 ## Before submitting: - [ ] Does the Pull Request follow [Conventional Commits specification](https://www.conventionalcommits.org/en/v1.0.0/#summary) naming? Here are [GitHub's guide](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request) on how to create a pull request. - [ ] Does the code follow BentoML's code style, both `make format` and `make lint` script have passed ([instructions](https://github.com/bentoml/BentoML/blob/main/DEVELOPMENT.md#style-check-auto-formatting-type-checking))? - [ ] Did you read through [contribution guidelines](https://github.com/bentoml/BentoML/blob/main/CONTRIBUTING.md#ways-to-contribute) and follow [development guidelines](https://github.com/bentoml/BentoML/blob/main/DEVELOPMENT.md#start-developing)? - [ ] Did your changes require updates to the documentation? Have you updated those accordingly? Here are [documentation guidelines](https://github.com/bentoml/BentoML/tree/main/docs) and [tips on writting docs](https://github.com/bentoml/BentoML/tree/main/docs#writing-documentation). - [ ] Did you write tests to cover your changes? ## Who can help review? Feel free to tag members/contributors who can help review your PR. Co-authored-by: Sean Sheng --- bentoml/_internal/configuration/containers.py | 25 +- .../configuration/default_configuration.yaml | 5 +- bentoml/_internal/context.py | 4 +- bentoml/_internal/marshal/dispatcher.py | 49 ++-- bentoml/_internal/server/instruments.py | 240 +++++++++++++++--- .../_internal/server/metrics/prometheus.py | 6 - bentoml/_internal/server/runner_app.py | 71 +++++- bentoml/_internal/server/service_app.py | 9 +- bentoml/_internal/utils/alg.py | 19 +- bentoml/_internal/utils/metrics.py | 2 +- bentoml/serve.py | 4 + bentoml_cli/worker/http_api_server.py | 2 +- bentoml_cli/worker/http_dev_api_server.py | 2 +- bentoml_cli/worker/runner.py | 17 +- pyproject.toml | 2 +- 15 files changed, 343 insertions(+), 114 deletions(-) diff --git a/bentoml/_internal/configuration/containers.py b/bentoml/_internal/configuration/containers.py index 47bf2631d5..35da7fdfe8 100644 --- a/bentoml/_internal/configuration/containers.py +++ b/bentoml/_internal/configuration/containers.py @@ -78,6 +78,8 @@ def _is_ip_address(addr: str) -> bool: return False +RUNNER_CFG_KEYS = ["batching", "resources", "logging", "metrics", "timeout"] + RUNNER_CFG_SCHEMA = { Optional("batching"): { Optional("enabled"): bool, @@ -97,6 +99,10 @@ def _is_ip_address(addr: str) -> bool: Optional("response_content_type"): Or(bool, None), }, }, + Optional("metrics"): { + "enabled": bool, + "namespace": str, + }, Optional("timeout"): And(int, _larger_than_zero), } @@ -221,12 +227,9 @@ def __init__( override_config = yaml.safe_load(f) config_merger.merge(self.config, override_config) - global_runner_cfg = { - k: self.config["runners"][k] - for k in ("batching", "resources", "logging", "timeout") - } + global_runner_cfg = {k: self.config["runners"][k] for k in RUNNER_CFG_KEYS} for key in self.config["runners"]: - if key not in ["batching", "resources", "logging", "timeout"]: + if key not in RUNNER_CFG_KEYS: runner_cfg = self.config["runners"][key] # key is a runner name @@ -385,14 +388,10 @@ def access_control_options( @staticmethod def metrics_client( multiproc_dir: str = Provide[prometheus_multiproc_dir], - namespace: str = Provide[config.api_server.metrics.namespace], ) -> "PrometheusClient": from ..server.metrics.prometheus import PrometheusClient - return PrometheusClient( - multiproc_dir=multiproc_dir, - namespace=namespace, - ) + return PrometheusClient(multiproc_dir=multiproc_dir) @providers.SingletonFactory @staticmethod @@ -509,8 +508,10 @@ def tracing_excluded_urls( if isinstance(excluded_urls, list): return ExcludeList(excluded_urls) - else: + elif isinstance(excluded_urls, str): return parse_excluded_urls(excluded_urls) + else: + return ExcludeList([]) # Mapping from runner name to RunnerApp file descriptor remote_runner_mapping = providers.Static[t.Dict[str, str]]({}) @@ -519,7 +520,7 @@ def tracing_excluded_urls( @providers.SingletonFactory @staticmethod def duration_buckets( - metrics: dict[str, t.Any] = Provide[config.api_server.metrics], + metrics: dict[str, t.Any] = Provide[config.api_server.metrics] ) -> tuple[float, ...]: """ Returns a tuple of duration buckets in seconds. If not explicitly configured, diff --git a/bentoml/_internal/configuration/default_configuration.yaml b/bentoml/_internal/configuration/default_configuration.yaml index 8daa523629..4d7cba78f4 100644 --- a/bentoml/_internal/configuration/default_configuration.yaml +++ b/bentoml/_internal/configuration/default_configuration.yaml @@ -7,7 +7,7 @@ api_server: max_request_size: 20971520 metrics: enabled: True - namespace: BENTOML + namespace: bentoml_api_server logging: access: enabled: True @@ -37,6 +37,9 @@ runners: request_content_type: True response_content_length: True response_content_type: True + metrics: + enabled: True + namespace: bentoml_runner timeout: 300 tracing: diff --git a/bentoml/_internal/context.py b/bentoml/_internal/context.py index fa114f6719..6ca741a990 100644 --- a/bentoml/_internal/context.py +++ b/bentoml/_internal/context.py @@ -188,8 +188,8 @@ def request_id(self, request_id: t.Optional[int]) -> None: class _ComponentContext: - bento_name: str | None = None - bento_version: str | None = None + bento_name: str = "" + bento_version: str = "not available" component_type: str | None = None component_name: str | None = None component_index: int | None = None diff --git a/bentoml/_internal/marshal/dispatcher.py b/bentoml/_internal/marshal/dispatcher.py index 9b47dfb1f7..25e8ba5e70 100644 --- a/bentoml/_internal/marshal/dispatcher.py +++ b/bentoml/_internal/marshal/dispatcher.py @@ -7,26 +7,17 @@ import functools import traceback import collections -from typing import TYPE_CHECKING import numpy as np -from simple_di import inject -from simple_di import Provide from ..utils import cached_property from ..utils.alg import TokenBucket -from ..utils.metrics import metric_name -from ..utils.metrics import exponential_buckets -from ..configuration.containers import BentoMLContainer - -if TYPE_CHECKING: - from ..server.metrics.prometheus import PrometheusClient logger = logging.getLogger(__name__) class NonBlockSema: - def __init__(self, count): + def __init__(self, count: int): self.sema = count def acquire(self): @@ -56,7 +47,7 @@ def __init__(self): assume the outbound duration follows duration = o_a * n + o_b (all in seconds) """ - self.o_stat = collections.deque( + self.o_stat: collections.deque[tuple[int, float, float]] = collections.deque( maxlen=self.N_KEPT_SAMPLE ) # to store outbound stat data self.o_a = 2 @@ -67,7 +58,7 @@ def __init__(self): self._refresh_tb = TokenBucket(2) # to limit params refresh interval self._outbound_counter = 0 - def log_outbound(self, n, wait, duration): + def log_outbound(self, n: int, wait: float, duration: float): if ( self._outbound_counter <= self.N_SKIPPED_SAMPLE ): # skip inaccurate info at beginning @@ -82,7 +73,9 @@ def log_outbound(self, n, wait, duration): def trigger_refresh(self): x = tuple((i, 1) for i, _, _ in self.o_stat) y = tuple(i for _, i, _ in self.o_stat) - _o_a, _o_b = np.linalg.lstsq(x, y, rcond=None)[0] + + _factors: tuple[float, float] = np.linalg.lstsq(x, y, rcond=None)[0] # type: ignore + _o_a, _o_b = _factors _o_w = sum(w for _, _, w in self.o_stat) * 1.0 / len(self.o_stat) self.o_a, self.o_b = max(0.000001, _o_a), max(0, _o_b) @@ -107,17 +100,12 @@ class CorkDispatcher: The wrapped function should be an async function. """ - @inject def __init__( self, - runner_name: str, - worker_index: int, - method_name: str, max_latency_in_ms: int, max_batch_size: int, shared_sema: t.Optional[NonBlockSema] = None, fallback: t.Optional[t.Callable[[], t.Any]] = None, - metrics_client: PrometheusClient = Provide[BentoMLContainer.metrics_client], ): """ params: @@ -129,25 +117,17 @@ def __init__( * all possible exceptions the decorated function has """ self.max_latency_in_ms = max_latency_in_ms / 1000.0 - self.callback = None self.fallback = fallback self.optimizer = Optimizer() self.max_batch_size = int(max_batch_size) self.tick_interval = 0.001 self._controller = None - self._queue = collections.deque() # TODO(hrmthw): maxlen + self._queue: collections.deque[ + tuple[float, t.Any, asyncio.Future[t.Any]] + ] = collections.deque() # TODO(bojiang): maxlen self._sema = shared_sema if shared_sema else NonBlockSema(1) - self.adaptive_batch_size_hist = metrics_client.Histogram( - name=metric_name( - runner_name, worker_index, method_name, "adaptive_batch_size" - ), - documentation=runner_name + " Runner adaptive batch size", - labelnames=[], # TODO: add service version - buckets=exponential_buckets(1, 2, max_batch_size), - ) - def shutdown(self): if self._controller is not None: self._controller.cancel() @@ -169,13 +149,13 @@ def _wake_event(self): def __call__( self, callback: t.Callable[ - [t.Iterable[T_IN]], t.Coroutine[None, None, t.Iterable[T_OUT]] + [t.Collection[T_IN]], t.Coroutine[None, None, t.Collection[T_OUT]] ], ) -> t.Callable[[T_IN], t.Coroutine[None, None, T_OUT]]: self.callback = callback @functools.wraps(callback) - async def _func(data): + async def _func(data: t.Any) -> t.Any: if self._controller is None: self._controller = self._loop.create_task(self.controller()) try: @@ -232,7 +212,7 @@ async def controller(self): except Exception: # pylint: disable=broad-except logger.error(traceback.format_exc()) - async def inbound_call(self, data): + async def inbound_call(self, data: t.Any): now = time.time() future = self._loop.create_future() input_info = (now, data, future) @@ -241,12 +221,13 @@ async def inbound_call(self, data): self._wake_event.notify_all() return await future - async def outbound_call(self, inputs_info): + async def outbound_call( + self, inputs_info: tuple[tuple[float, t.Any, asyncio.Future[t.Any]]] + ): _time_start = time.time() _done = False batch_size = len(inputs_info) logger.debug("Dynamic batching cork released, batch size: %d", batch_size) - self.adaptive_batch_size_hist.observe(batch_size) try: outputs = await self.callback(tuple(d for _, d, _ in inputs_info)) assert len(outputs) == len(inputs_info) diff --git a/bentoml/_internal/server/instruments.py b/bentoml/_internal/server/instruments.py index 3d1ff9142e..06e8df2359 100644 --- a/bentoml/_internal/server/instruments.py +++ b/bentoml/_internal/server/instruments.py @@ -13,23 +13,24 @@ if TYPE_CHECKING: from .. import external_typing as ext - from ..service import Service from ..server.metrics.prometheus import PrometheusClient logger = logging.getLogger(__name__) START_TIME_VAR: "contextvars.ContextVar[float]" = contextvars.ContextVar( "START_TIME_VAR" ) +STATUS_VAR: "contextvars.ContextVar[int]" = contextvars.ContextVar("STATUS_VAR") +from ..context import component_context -class MetricsMiddleware: +class HTTPTrafficMetricsMiddleware: def __init__( self, app: "ext.ASGIApp", - bento_service: "Service", + namespace: str = "bentoml_api_server", ): self.app = app - self.bento_service = bento_service + self.namespace = namespace self._is_setup = False @inject @@ -41,23 +42,63 @@ def _setup( ], ): self.metrics_client = metrics_client - service_name = self.bento_service.name + + DEFAULT_NAMESPACE = "bentoml_api_server" + if self.namespace == DEFAULT_NAMESPACE: + legacy_namespace = "BENTOML" + else: + legacy_namespace = self.namespace + + # legacy metrics names for bentoml<1.0.6 + self.legacy_metrics_request_duration = metrics_client.Histogram( + namespace=legacy_namespace, + name=metric_name(component_context.bento_name, "request_duration_seconds"), + documentation="Legacy API HTTP request duration in seconds", + labelnames=["endpoint", "service_version", "http_response_code"], + buckets=duration_buckets, + ) + self.legacy_metrics_request_total = metrics_client.Counter( + namespace=legacy_namespace, + name=metric_name(component_context.bento_name, "request_total"), + documentation="Legacy total number of HTTP requests", + labelnames=["endpoint", "service_version", "http_response_code"], + ) + self.legacy_metrics_request_in_progress = metrics_client.Gauge( + namespace=legacy_namespace, + name=metric_name(component_context.bento_name, "request_in_progress"), + documentation="Legacy total number of HTTP requests in progress now", + labelnames=["endpoint", "service_version"], + multiprocess_mode="livesum", + ) self.metrics_request_duration = metrics_client.Histogram( - name=metric_name(service_name, "request_duration_seconds"), + namespace=self.namespace, + name="request_duration_seconds", documentation="API HTTP request duration in seconds", - labelnames=["endpoint", "service_version", "http_response_code"], + labelnames=[ + "endpoint", + "service_name", + "service_version", + "http_response_code", + ], buckets=duration_buckets, ) self.metrics_request_total = metrics_client.Counter( - name=metric_name(service_name, "request_total"), + namespace=self.namespace, + name="request_total", documentation="Total number of HTTP requests", - labelnames=["endpoint", "service_version", "http_response_code"], + labelnames=[ + "endpoint", + "service_name", + "service_version", + "http_response_code", + ], ) self.metrics_request_in_progress = metrics_client.Gauge( - name=metric_name(service_name, "request_in_progress"), + namespace=self.namespace, + name="request_in_progress", documentation="Total number of HTTP requests in progress now", - labelnames=["endpoint", "service_version"], + labelnames=["endpoint", "service_name", "service_version"], multiprocess_mode="livesum", ) self._is_setup = True @@ -85,36 +126,171 @@ async def __call__( await response(scope, receive, send) return - service_version = ( - self.bento_service.tag.version if self.bento_service.tag is not None else "" + endpoint = scope["path"] + START_TIME_VAR.set(default_timer()) + + async def wrapped_send(message: "ext.ASGIMessage") -> None: + if message["type"] == "http.response.start": + STATUS_VAR.set(message["status"]) + elif message["type"] == "http.response.body": + if ("more_body" not in message) or not message["more_body"]: + assert START_TIME_VAR.get() != 0 + assert STATUS_VAR.get() != 0 + + # instrument request total count + self.legacy_metrics_request_total.labels( + endpoint=endpoint, + service_version=component_context.bento_version, + http_response_code=STATUS_VAR.get(), + ).inc() + self.metrics_request_total.labels( + endpoint=endpoint, + service_name=component_context.bento_name, + service_version=component_context.bento_version, + http_response_code=STATUS_VAR.get(), + ).inc() + + # instrument request duration + total_time = max(default_timer() - START_TIME_VAR.get(), 0) + self.legacy_metrics_request_duration.labels( # type: ignore + endpoint=endpoint, + service_version=component_context.bento_version, + http_response_code=STATUS_VAR.get(), + ).observe(total_time) + self.metrics_request_duration.labels( # type: ignore + endpoint=endpoint, + service_name=component_context.bento_name, + service_version=component_context.bento_version, + http_response_code=STATUS_VAR.get(), + ).observe(total_time) + + START_TIME_VAR.set(0) + STATUS_VAR.set(0) + await send(message) + + with self.legacy_metrics_request_in_progress.labels( + endpoint=endpoint, service_version=component_context.bento_version + ).track_inprogress(), self.metrics_request_in_progress.labels( + endpoint=endpoint, + service_name=component_context.bento_name, + service_version=component_context.bento_version, + ).track_inprogress(): + await self.app(scope, receive, wrapped_send) + return + + +class RunnerTrafficMetricsMiddleware: + def __init__( + self, + app: "ext.ASGIApp", + namespace: str = "bentoml_runner", + ): + self.app = app + self.namespace = namespace + self._is_setup = False + + @inject + def _setup( + self, + metrics_client: "PrometheusClient" = Provide[BentoMLContainer.metrics_client], + ): + self.metrics_client = metrics_client + + self.metrics_request_duration = metrics_client.Histogram( + namespace=self.namespace, + name="request_duration_seconds", + documentation="runner RPC duration in seconds", + labelnames=[ + "endpoint", + "service_name", + "service_version", + "http_response_code", + "runner_name", + ], + ) + self.metrics_request_total = metrics_client.Counter( + namespace=self.namespace, + name="request_total", + documentation="Total number of runner RPC", + labelnames=[ + "endpoint", + "service_name", + "service_version", + "http_response_code", + "runner_name", + ], + ) + self.metrics_request_in_progress = metrics_client.Gauge( + namespace=self.namespace, + name="request_in_progress", + documentation="Total number of runner RPC in progress now", + labelnames=["endpoint", "service_name", "service_version", "runner_name"], + multiprocess_mode="livesum", ) + self._is_setup = True + + async def __call__( + self, + scope: "ext.ASGIScope", + receive: "ext.ASGIReceive", + send: "ext.ASGISend", + ) -> None: + if not self._is_setup: + self._setup() + if not scope["type"].startswith("http"): + await self.app(scope, receive, send) + return + + if scope["path"] == "/metrics": + from starlette.responses import Response + + response = Response( + self.metrics_client.generate_latest(), + status_code=200, + media_type=self.metrics_client.CONTENT_TYPE_LATEST, + ) + await response(scope, receive, send) + return + endpoint = scope["path"] START_TIME_VAR.set(default_timer()) async def wrapped_send(message: "ext.ASGIMessage") -> None: if message["type"] == "http.response.start": - status_code = message["status"] - - # instrument request total count - self.metrics_request_total.labels( - endpoint=endpoint, - service_version=service_version, - http_response_code=status_code, - ).inc() - - # instrument request duration - assert START_TIME_VAR.get() != 0 - total_time = max(default_timer() - START_TIME_VAR.get(), 0) - self.metrics_request_duration.labels( # type: ignore - endpoint=endpoint, - service_version=service_version, - http_response_code=status_code, - ).observe(total_time) - START_TIME_VAR.set(0) + STATUS_VAR.set(message["status"]) + elif message["type"] == "http.response.body": + if ("more_body" not in message) or not message["more_body"]: + assert START_TIME_VAR.get() != 0 + assert STATUS_VAR.get() != 0 + + # instrument request total count + self.metrics_request_total.labels( + endpoint=endpoint, + service_name=component_context.bento_name, + service_version=component_context.bento_version, + http_response_code=STATUS_VAR.get(), + runner_name=component_context.component_name, + ).inc() + + # instrument request duration + total_time = max(default_timer() - START_TIME_VAR.get(), 0) + self.metrics_request_duration.labels( # type: ignore + endpoint=endpoint, + service_name=component_context.bento_name, + service_version=component_context.bento_version, + http_response_code=STATUS_VAR.get(), + runner_name=component_context.component_name, + ).observe(total_time) + + START_TIME_VAR.set(0) + STATUS_VAR.set(0) await send(message) with self.metrics_request_in_progress.labels( - endpoint=endpoint, service_version=service_version + endpoint=endpoint, + service_name=component_context.bento_name, + service_version=component_context.bento_version, + runner_name=component_context.component_name, ).track_inprogress(): await self.app(scope, receive, wrapped_send) return diff --git a/bentoml/_internal/server/metrics/prometheus.py b/bentoml/_internal/server/metrics/prometheus.py index 748fc8693e..f18a5084af 100644 --- a/bentoml/_internal/server/metrics/prometheus.py +++ b/bentoml/_internal/server/metrics/prometheus.py @@ -12,7 +12,6 @@ class PrometheusClient: def __init__( self, *, - namespace: str = "", multiproc: bool = True, multiproc_dir: t.Optional[str] = None, ): @@ -28,7 +27,6 @@ def __init__( assert multiproc_dir is not None, "multiproc_dir must be provided" self.multiproc = multiproc - self.namespace = namespace self.multiproc_dir: t.Optional[str] = multiproc_dir self._registry = None self._imported = False @@ -105,7 +103,6 @@ def CONTENT_TYPE_LATEST(self) -> str: def Histogram(self): return partial( self.prometheus_client.Histogram, - namespace=self.namespace, registry=self.registry, ) @@ -113,7 +110,6 @@ def Histogram(self): def Counter(self): return partial( self.prometheus_client.Counter, - namespace=self.namespace, registry=self.registry, ) @@ -121,7 +117,6 @@ def Counter(self): def Summary(self): return partial( self.prometheus_client.Summary, - namespace=self.namespace, registry=self.registry, ) @@ -129,6 +124,5 @@ def Summary(self): def Gauge(self): return partial( self.prometheus_client.Gauge, - namespace=self.namespace, registry=self.registry, ) diff --git a/bentoml/_internal/server/runner_app.py b/bentoml/_internal/server/runner_app.py index 0bf79bef77..f463590ba6 100644 --- a/bentoml/_internal/server/runner_app.py +++ b/bentoml/_internal/server/runner_app.py @@ -8,10 +8,16 @@ from typing import TYPE_CHECKING from functools import partial +from simple_di import inject +from simple_di import Provide + from ..context import trace_context +from ..context import component_context from ..runner.utils import Params from ..runner.utils import PAYLOAD_META_HEADER from ..runner.utils import payload_paramss_to_batch_params +from ..utils.metrics import metric_name +from ..utils.metrics import exponential_buckets from ..server.base_app import BaseAppFactory from ..runner.container import AutoContainer from ..marshal.dispatcher import CorkDispatcher @@ -32,13 +38,16 @@ class RunnerAppFactory(BaseAppFactory): + @inject def __init__( self, runner: Runner, worker_index: int = 0, + enable_metrics: bool = Provide[BentoMLContainer.runners_config.metrics.enabled], ) -> None: self.runner = runner self.worker_index = worker_index + self.enable_metrics = enable_metrics from starlette.responses import Response @@ -49,9 +58,6 @@ def __init__( if not method.config.batchable: continue self.dispatchers[method.name] = CorkDispatcher( - runner_name=runner.name, - worker_index=worker_index, - method_name=method.name, max_latency_in_ms=method.max_latency_ms, max_batch_size=method.max_batch_size, fallback=TooManyRequests, @@ -61,10 +67,48 @@ def __init__( def name(self) -> str: return self.runner.name + def _init_metrics_wrappers(self): + metrics_client = BentoMLContainer.metrics_client.get() + + self.legacy_adaptive_batch_size_hist_map = { + method.name: metrics_client.Histogram( + name=metric_name( + self.runner.name, + self.worker_index, + method.name, + "adaptive_batch_size", + ), + documentation="Legacy runner adaptive batch size", + labelnames=[], + buckets=exponential_buckets(1, 2, method.max_batch_size), + ) + for method in self.runner.runner_methods + } + + max_max_batch_size = max( + method.max_batch_size for method in self.runner.runner_methods + ) + + self.adaptive_batch_size_hist = metrics_client.Histogram( + namespace="bentoml_runner", + name="adaptive_batch_size", + documentation="Runner adaptive batch size", + labelnames=[ + "runner_name", + "worker_index", + "method_name", + "service_version", + "service_name", + ], + buckets=exponential_buckets(1, 2, max_max_batch_size), + ) + @property def on_startup(self) -> t.List[t.Callable[[], None]]: on_startup = super().on_startup on_startup.insert(0, functools.partial(self.runner.init_local, quiet=True)) + on_startup.insert(0, self._init_metrics_wrappers) + return on_startup @property @@ -138,6 +182,11 @@ def client_request_hook(span: Span, _scope: t.Dict[str, t.Any]) -> None: ) ) + if self.enable_metrics: + from .instruments import RunnerTrafficMetricsMiddleware + + middlewares.append(Middleware(RunnerTrafficMetricsMiddleware)) + access_log_config = BentoMLContainer.runners_config.logging.access if access_log_config.enabled.get(): from .access import AccessLogMiddleware @@ -159,11 +208,23 @@ def client_request_hook(span: Span, _scope: t.Dict[str, t.Any]) -> None: def _async_cork_run( self, runner_method: RunnerMethod[t.Any, t.Any, t.Any], - ) -> t.Callable[[t.Iterable[Request]], t.Coroutine[None, None, list[Response]]]: + ) -> t.Callable[[t.Collection[Request]], t.Coroutine[None, None, list[Response]]]: from starlette.responses import Response - async def _run(requests: t.Iterable[Request]) -> list[Response]: + async def _run(requests: t.Collection[Request]) -> list[Response]: assert self._is_ready + + self.legacy_adaptive_batch_size_hist_map[runner_method.name].observe( + len(requests) + ) + self.adaptive_batch_size_hist.labels( + runner_name=self.runner.name, + worker_index=self.worker_index, + method_name=runner_method.name, + service_version=component_context.bento_version, + service_name=component_context.bento_name, + ).observe(len(requests)) + if not requests: return [] params_list: list[Params[t.Any]] = [] diff --git a/bentoml/_internal/server/service_app.py b/bentoml/_internal/server/service_app.py index 070fe01b04..e20737a0ac 100644 --- a/bentoml/_internal/server/service_app.py +++ b/bentoml/_internal/server/service_app.py @@ -217,14 +217,9 @@ def middlewares(self) -> list[Middleware]: # metrics middleware if self.enable_metrics: - from .instruments import MetricsMiddleware + from .instruments import HTTPTrafficMetricsMiddleware - middlewares.append( - Middleware( - MetricsMiddleware, - bento_service=self.bento_service, - ) - ) + middlewares.append(Middleware(HTTPTrafficMetricsMiddleware)) # otel middleware import opentelemetry.instrumentation.asgi as otel_asgi # type: ignore diff --git a/bentoml/_internal/utils/alg.py b/bentoml/_internal/utils/alg.py index 85f53379e8..ce54c5309b 100644 --- a/bentoml/_internal/utils/alg.py +++ b/bentoml/_internal/utils/alg.py @@ -1,18 +1,23 @@ +from __future__ import annotations + import time +import typing as t + +T = t.TypeVar("T") -class FixedBucket: +class FixedBucket(t.Generic[T]): """ Fixed size FIFO container. """ - def __init__(self, size): - self._data = [None] * size + def __init__(self, size: int): + self._data: list[T | None] = [None] * size self._cur = 0 self._size = size self._flag_full = False - def put(self, v): + def put(self, v: T): self._data[self._cur] = v self._cur += 1 if self._cur == self._size: @@ -30,7 +35,7 @@ def __len__(self): return self._cur return self._size - def __getitem__(self, sl): + def __getitem__(self, sl: slice): if not self._flag_full: return self._data[: self._cur][sl] return (self._data[self._cur :] + self._data[: self._cur])[sl] @@ -41,11 +46,11 @@ class TokenBucket: Dynamic token bucket """ - def __init__(self, init_amount=0): + def __init__(self, init_amount: int = 0): self._amount = init_amount self._last_consume_time = time.time() - def consume(self, take_amount, avg_rate, burst_size): + def consume(self, take_amount: int, avg_rate: float, burst_size: int): now = time.time() inc = (now - self._last_consume_time) * avg_rate current_amount = min(inc + self._amount, burst_size) diff --git a/bentoml/_internal/utils/metrics.py b/bentoml/_internal/utils/metrics.py index 45f2994db8..51f10bc8bc 100644 --- a/bentoml/_internal/utils/metrics.py +++ b/bentoml/_internal/utils/metrics.py @@ -23,7 +23,7 @@ MAX_BUCKET_COUNT = 100 -def metric_name(*args: str) -> str: +def metric_name(*args: str | int) -> str: """ Concatenates the given parts into a legal Prometheus metric name. For example, a valid tag name may includes invalid characters, so we need to escape them diff --git a/bentoml/serve.py b/bentoml/serve.py index 6c32bf896d..0d2541cf8f 100644 --- a/bentoml/serve.py +++ b/bentoml/serve.py @@ -262,6 +262,8 @@ def serve_production( "$(CIRCUS.WID)", "--worker-env-map", json.dumps(runner.scheduled_worker_env_map), + "--prometheus-dir", + prometheus_dir, ], copy_env=True, stop_children=True, @@ -305,6 +307,8 @@ def serve_production( "$(circus.wid)", "--worker-env-map", json.dumps(runner.scheduled_worker_env_map), + "--prometheus-dir", + prometheus_dir, ], copy_env=True, stop_children=True, diff --git a/bentoml_cli/worker/http_api_server.py b/bentoml_cli/worker/http_api_server.py index 21114ad266..9041ed132d 100644 --- a/bentoml_cli/worker/http_api_server.py +++ b/bentoml_cli/worker/http_api_server.py @@ -124,7 +124,7 @@ def main( # setup context component_context.component_name = svc.name if svc.tag is None: - component_context.bento_name = f"*{svc.__class__.__name__}" + component_context.bento_name = f"{svc.__class__.__name__}" component_context.bento_version = "not available" else: component_context.bento_name = svc.tag.name diff --git a/bentoml_cli/worker/http_dev_api_server.py b/bentoml_cli/worker/http_dev_api_server.py index 8e68d83f22..f9f21d466f 100644 --- a/bentoml_cli/worker/http_dev_api_server.py +++ b/bentoml_cli/worker/http_dev_api_server.py @@ -94,7 +94,7 @@ def main( # setup context component_context.component_name = svc.name if svc.tag is None: - component_context.bento_name = f"*{svc.__class__.__name__}" + component_context.bento_name = f"{svc.__class__.__name__}" component_context.bento_version = "not available" else: component_context.bento_name = svc.tag.name diff --git a/bentoml_cli/worker/runner.py b/bentoml_cli/worker/runner.py index 3583891be0..628df6269e 100644 --- a/bentoml_cli/worker/runner.py +++ b/bentoml_cli/worker/runner.py @@ -39,6 +39,11 @@ default=None, help="The environment variables to pass to the worker process. The format is a JSON string, e.g. '{0: {\"CUDA_VISIBLE_DEVICES\": 0}}'.", ) +@click.option( + "--prometheus-dir", + type=click.Path(exists=True), + help="Required by prometheus to pass the metrics in multi-process mode", +) def main( bento_identifier: str, runner_name: str, @@ -47,6 +52,7 @@ def main( no_access_log: bool, worker_id: int, worker_env_map: str | None, + prometheus_dir: str | None, ) -> None: """ Start a runner server. @@ -89,9 +95,12 @@ def main( configure_server_logging() import uvicorn # type: ignore - if no_access_log: - from bentoml._internal.configuration.containers import BentoMLContainer + from bentoml._internal.configuration.containers import BentoMLContainer + if prometheus_dir is not None: + BentoMLContainer.prometheus_multiproc_dir.set(prometheus_dir) + + if no_access_log: access_log_config = BentoMLContainer.runners_config.logging.access access_log_config.enabled.set(False) @@ -101,11 +110,11 @@ def main( # setup context if service.tag is None: - component_context.bento_name = f"*{service.__class__}" + component_context.bento_name = f"{service.__class__}" component_context.bento_version = "not available" else: component_context.bento_name = service.tag.name - component_context.bento_version = service.tag.version + component_context.bento_version = service.tag.version or "" for runner in service.runners: if runner.name == runner_name: diff --git a/pyproject.toml b/pyproject.toml index 5dc40e06c8..73cf80c53d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ suggestion-mode="yes" max-line-length=88 [tool.pylint.messages_control] -disable="import-error,print-statement,parameter-unpacking,unpacking-in-except,old-raise-syntax,backtick,import-star-module-level,raw-checker-failed,bad-inline-option,locally-disabled,file-ignored,suppressed-message,useless-suppression,deprecated-pragma,apply-builtin,basestring-builtin,buffer-builtin,cmp-builtin,coerce-builtin,execfile-builtin,file-builtin,long-builtin,raw_input-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,no-absolute-import,old-division,dict-iter-method,dict-view-method,next-method-called,metaclass-assignment,indexing-exception,raising-string,reload-builtin,oct-method,hex-method,nonzero-method,cmp-method,input-builtin,round-builtin,intern-builtin,unichr-builtin,map-builtin-not-iterating,zip-builtin-not-iterating,range-builtin-not-iterating,filter-builtin-not-iterating,using-cmp-argument,div-method,idiv-method,rdiv-method,exception-message-attribute,invalid-str-codec,sys-max-int,bad-python3-import,deprecated-string-function,deprecated-str-translate-call,deprecated-itertools-function,deprecated-types-field,next-method-defined,dict-items-not-iterating,dict-keys-not-iterating,dict-values-not-iterating,deprecated-operator-function,deprecated-urllib-function,xreadlines-attribute,deprecated-sys-function,exception-escape,comprehension-escape,logging-fstring-interpolation,logging-format-interpolation,logging-not-lazy,C,R,fixme,protected-access,no-member,unsubscriptable-object,raise-missing-from,isinstance-second-argument-not-valid-type" +disable="import-error,print-statement,parameter-unpacking,unpacking-in-except,old-raise-syntax,backtick,import-star-module-level,raw-checker-failed,bad-inline-option,locally-disabled,file-ignored,suppressed-message,useless-suppression,deprecated-pragma,apply-builtin,basestring-builtin,buffer-builtin,cmp-builtin,coerce-builtin,execfile-builtin,file-builtin,long-builtin,raw_input-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,no-absolute-import,old-division,dict-iter-method,dict-view-method,next-method-called,metaclass-assignment,indexing-exception,raising-string,reload-builtin,oct-method,hex-method,nonzero-method,cmp-method,input-builtin,round-builtin,intern-builtin,unichr-builtin,map-builtin-not-iterating,zip-builtin-not-iterating,range-builtin-not-iterating,filter-builtin-not-iterating,using-cmp-argument,div-method,idiv-method,rdiv-method,exception-message-attribute,invalid-str-codec,sys-max-int,bad-python3-import,deprecated-string-function,deprecated-str-translate-call,deprecated-itertools-function,deprecated-types-field,next-method-defined,dict-items-not-iterating,dict-keys-not-iterating,dict-values-not-iterating,deprecated-operator-function,deprecated-urllib-function,xreadlines-attribute,deprecated-sys-function,exception-escape,comprehension-escape,logging-fstring-interpolation,logging-format-interpolation,logging-not-lazy,C,R,fixme,protected-access,no-member,unsubscriptable-object,raise-missing-from,isinstance-second-argument-not-valid-type,attribute-defined-outside-init,relative-beyond-top-level" enable="c-extension-no-member" [tool.pylint.reports]