Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): add runner metrics; refactoring batch size metrics #2977

Merged
merged 10 commits into from Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 13 additions & 12 deletions bentoml/_internal/configuration/containers.py
Expand Up @@ -78,6 +78,8 @@ def _is_ip_address(addr: str) -> bool:
return False


RUNNER_CFG_KEYS = ["batching", "resources", "logging", "metrics", "timeout"]

RUNNER_CFG_SCHEMA = {
Optional("batching"): {
Optional("enabled"): bool,
Expand All @@ -97,6 +99,10 @@ def _is_ip_address(addr: str) -> bool:
Optional("response_content_type"): Or(bool, None),
},
},
Optional("metrics"): {
"enabled": bool,
"namespace": str,
},
Optional("timeout"): And(int, _larger_than_zero),
}

Expand Down Expand Up @@ -221,12 +227,9 @@ def __init__(
override_config = yaml.safe_load(f)
config_merger.merge(self.config, override_config)

global_runner_cfg = {
k: self.config["runners"][k]
for k in ("batching", "resources", "logging", "timeout")
}
global_runner_cfg = {k: self.config["runners"][k] for k in RUNNER_CFG_KEYS}
for key in self.config["runners"]:
if key not in ["batching", "resources", "logging", "timeout"]:
if key not in RUNNER_CFG_KEYS:
runner_cfg = self.config["runners"][key]

# key is a runner name
Expand Down Expand Up @@ -385,14 +388,10 @@ def access_control_options(
@staticmethod
def metrics_client(
multiproc_dir: str = Provide[prometheus_multiproc_dir],
namespace: str = Provide[config.api_server.metrics.namespace],
) -> "PrometheusClient":
from ..server.metrics.prometheus import PrometheusClient

return PrometheusClient(
multiproc_dir=multiproc_dir,
namespace=namespace,
)
return PrometheusClient(multiproc_dir=multiproc_dir)

@providers.SingletonFactory
@staticmethod
Expand Down Expand Up @@ -509,8 +508,10 @@ def tracing_excluded_urls(

if isinstance(excluded_urls, list):
return ExcludeList(excluded_urls)
else:
elif isinstance(excluded_urls, str):
return parse_excluded_urls(excluded_urls)
else:
return ExcludeList([])

# Mapping from runner name to RunnerApp file descriptor
remote_runner_mapping = providers.Static[t.Dict[str, str]]({})
Expand All @@ -519,7 +520,7 @@ def tracing_excluded_urls(
@providers.SingletonFactory
@staticmethod
def duration_buckets(
metrics: dict[str, t.Any] = Provide[config.api_server.metrics],
metrics: dict[str, t.Any] = Provide[config.api_server.metrics]
) -> tuple[float, ...]:
"""
Returns a tuple of duration buckets in seconds. If not explicitly configured,
Expand Down
Expand Up @@ -7,7 +7,7 @@ api_server:
max_request_size: 20971520
metrics:
enabled: True
namespace: BENTOML
namespace: bentoml_api_server
logging:
access:
enabled: True
Expand Down Expand Up @@ -37,6 +37,9 @@ runners:
request_content_type: True
response_content_length: True
response_content_type: True
metrics:
enabled: True
namespace: bentoml_runner
timeout: 300

tracing:
Expand Down
4 changes: 2 additions & 2 deletions bentoml/_internal/context.py
Expand Up @@ -188,8 +188,8 @@ def request_id(self, request_id: t.Optional[int]) -> None:


class _ComponentContext:
bento_name: str | None = None
bento_version: str | None = None
bento_name: str = ""
bento_version: str = "not available"
component_type: str | None = None
component_name: str | None = None
component_index: int | None = None
Expand Down
49 changes: 15 additions & 34 deletions bentoml/_internal/marshal/dispatcher.py
Expand Up @@ -7,26 +7,17 @@
import functools
import traceback
import collections
from typing import TYPE_CHECKING

import numpy as np
from simple_di import inject
from simple_di import Provide

from ..utils import cached_property
from ..utils.alg import TokenBucket
from ..utils.metrics import metric_name
from ..utils.metrics import exponential_buckets
from ..configuration.containers import BentoMLContainer

if TYPE_CHECKING:
from ..server.metrics.prometheus import PrometheusClient

logger = logging.getLogger(__name__)


class NonBlockSema:
def __init__(self, count):
def __init__(self, count: int):
self.sema = count

def acquire(self):
Expand Down Expand Up @@ -56,7 +47,7 @@ def __init__(self):
assume the outbound duration follows duration = o_a * n + o_b
(all in seconds)
"""
self.o_stat = collections.deque(
self.o_stat: collections.deque[tuple[int, float, float]] = collections.deque(
maxlen=self.N_KEPT_SAMPLE
) # to store outbound stat data
self.o_a = 2
Expand All @@ -67,7 +58,7 @@ def __init__(self):
self._refresh_tb = TokenBucket(2) # to limit params refresh interval
self._outbound_counter = 0

def log_outbound(self, n, wait, duration):
def log_outbound(self, n: int, wait: float, duration: float):
if (
self._outbound_counter <= self.N_SKIPPED_SAMPLE
): # skip inaccurate info at beginning
Expand All @@ -82,7 +73,9 @@ def log_outbound(self, n, wait, duration):
def trigger_refresh(self):
x = tuple((i, 1) for i, _, _ in self.o_stat)
y = tuple(i for _, i, _ in self.o_stat)
_o_a, _o_b = np.linalg.lstsq(x, y, rcond=None)[0]

_factors: tuple[float, float] = np.linalg.lstsq(x, y, rcond=None)[0] # type: ignore
_o_a, _o_b = _factors
_o_w = sum(w for _, _, w in self.o_stat) * 1.0 / len(self.o_stat)

self.o_a, self.o_b = max(0.000001, _o_a), max(0, _o_b)
Expand All @@ -107,17 +100,12 @@ class CorkDispatcher:
The wrapped function should be an async function.
"""

@inject
def __init__(
self,
runner_name: str,
worker_index: int,
method_name: str,
max_latency_in_ms: int,
max_batch_size: int,
shared_sema: t.Optional[NonBlockSema] = None,
fallback: t.Optional[t.Callable[[], t.Any]] = None,
metrics_client: PrometheusClient = Provide[BentoMLContainer.metrics_client],
):
"""
params:
Expand All @@ -129,25 +117,17 @@ def __init__(
* all possible exceptions the decorated function has
"""
self.max_latency_in_ms = max_latency_in_ms / 1000.0
self.callback = None
self.fallback = fallback
self.optimizer = Optimizer()
self.max_batch_size = int(max_batch_size)
self.tick_interval = 0.001

self._controller = None
self._queue = collections.deque() # TODO(hrmthw): maxlen
self._queue: collections.deque[
tuple[float, t.Any, asyncio.Future[t.Any]]
] = collections.deque() # TODO(bojiang): maxlen
self._sema = shared_sema if shared_sema else NonBlockSema(1)

self.adaptive_batch_size_hist = metrics_client.Histogram(
name=metric_name(
runner_name, worker_index, method_name, "adaptive_batch_size"
),
documentation=runner_name + " Runner adaptive batch size",
labelnames=[], # TODO: add service version
buckets=exponential_buckets(1, 2, max_batch_size),
)

def shutdown(self):
if self._controller is not None:
self._controller.cancel()
Expand All @@ -169,13 +149,13 @@ def _wake_event(self):
def __call__(
self,
callback: t.Callable[
[t.Iterable[T_IN]], t.Coroutine[None, None, t.Iterable[T_OUT]]
[t.Collection[T_IN]], t.Coroutine[None, None, t.Collection[T_OUT]]
],
) -> t.Callable[[T_IN], t.Coroutine[None, None, T_OUT]]:
self.callback = callback

@functools.wraps(callback)
async def _func(data):
async def _func(data: t.Any) -> t.Any:
if self._controller is None:
self._controller = self._loop.create_task(self.controller())
try:
Expand Down Expand Up @@ -232,7 +212,7 @@ async def controller(self):
except Exception: # pylint: disable=broad-except
logger.error(traceback.format_exc())

async def inbound_call(self, data):
async def inbound_call(self, data: t.Any):
now = time.time()
future = self._loop.create_future()
input_info = (now, data, future)
Expand All @@ -241,12 +221,13 @@ async def inbound_call(self, data):
self._wake_event.notify_all()
return await future

async def outbound_call(self, inputs_info):
async def outbound_call(
self, inputs_info: tuple[tuple[float, t.Any, asyncio.Future[t.Any]]]
):
_time_start = time.time()
_done = False
batch_size = len(inputs_info)
logger.debug("Dynamic batching cork released, batch size: %d", batch_size)
self.adaptive_batch_size_hist.observe(batch_size)
try:
outputs = await self.callback(tuple(d for _, d, _ in inputs_info))
assert len(outputs) == len(inputs_info)
Expand Down