Skip to content

Commit

Permalink
feat(server): add runner metrics; refactoring batch size metrics (#2977)
Browse files Browse the repository at this point in the history
## What does this PR address?
<!--
Thanks for sending a pull request!

Congrats for making it this far! Here's a 馃嵄 for you. There are still a
few steps ahead.

Please make sure to read the contribution guidelines, then fill out the
blanks below before requesting a code review.

Name your Pull Request with one of the following prefixes, e.g. "feat:
add support for PyTorch", to indicate the type of changes proposed. This
is based on the [Conventional Commits
specification](https://www.conventionalcommits.org/en/v1.0.0/#summary).
  - feat: (new feature for the user, not a new feature for build script)
  - fix: (bug fix for the user, not a fix to a build script)
  - docs: (changes to the documentation)
- style: (formatting, missing semicolons, etc; no production code
change)
  - refactor: (refactoring production code, eg. renaming a variable)
  - perf: (code changes that improve performance)
- test: (adding missing tests, refactoring tests; no production code
change)
  - chore: (updating grunt tasks etc; no production code change)
- build: (changes that affect the build system or external dependencies)
  - ci: (changes to configuration files and scripts)
  - revert: (reverts a previous commit)

Describe your changes in detail. Attach screenshots here if appropriate.

Once you're done with this, someone from BentoML team or community
member will help review your PR (see "Who can help review?" section for
potential reviewers.). If no one has reviewed your PR after a week have
passed, don't hesitate to post a new comment and ping @-the same person.
Notifications sometimes get lost 馃ゲ.
-->

<!-- Remove if not applicable -->
Fixes bentoml/Kitchen#40

## Before submitting:
<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->
<!--- If you plan to update documentation or tests in follow-up, please
note -->
- [ ] Does the Pull Request follow [Conventional Commits
specification](https://www.conventionalcommits.org/en/v1.0.0/#summary)
naming? Here are [GitHub's

guide](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request)
on how to create a pull request.
- [ ] Does the code follow BentoML's code style, both `make format` and
`make lint` script have passed
([instructions](https://github.com/bentoml/BentoML/blob/main/DEVELOPMENT.md#style-check-auto-formatting-type-checking))?
- [ ] Did you read through [contribution
guidelines](https://github.com/bentoml/BentoML/blob/main/CONTRIBUTING.md#ways-to-contribute)
and follow [development
guidelines](https://github.com/bentoml/BentoML/blob/main/DEVELOPMENT.md#start-developing)?
- [ ] Did your changes require updates to the documentation? Have you
updated
those accordingly? Here are [documentation
guidelines](https://github.com/bentoml/BentoML/tree/main/docs) and [tips
on writting
docs](https://github.com/bentoml/BentoML/tree/main/docs#writing-documentation).
- [ ] Did you write tests to cover your changes?

## Who can help review?

Feel free to tag members/contributors who can help review your PR.
<!--
Feel free to ping any of the BentoML members for help on your issue, but
don't ping more than three people 馃槉.
If you know how to use git blame, that is probably the easiest way.

Team members that you can ping:
- @parano
- @yubozhao
- @bojiang
- @ssheng
- @aarnphm
- @sauyon
- @larme
- @yetone
- @jjmachan
-->

Co-authored-by: Sean Sheng <s3sheng@gmail.com>
  • Loading branch information
bojiang and ssheng committed Sep 16, 2022
1 parent bfaa53a commit 532f5e1
Show file tree
Hide file tree
Showing 15 changed files with 343 additions and 114 deletions.
25 changes: 13 additions & 12 deletions bentoml/_internal/configuration/containers.py
Expand Up @@ -78,6 +78,8 @@ def _is_ip_address(addr: str) -> bool:
return False


RUNNER_CFG_KEYS = ["batching", "resources", "logging", "metrics", "timeout"]

RUNNER_CFG_SCHEMA = {
Optional("batching"): {
Optional("enabled"): bool,
Expand All @@ -97,6 +99,10 @@ def _is_ip_address(addr: str) -> bool:
Optional("response_content_type"): Or(bool, None),
},
},
Optional("metrics"): {
"enabled": bool,
"namespace": str,
},
Optional("timeout"): And(int, _larger_than_zero),
}

Expand Down Expand Up @@ -221,12 +227,9 @@ def __init__(
override_config = yaml.safe_load(f)
config_merger.merge(self.config, override_config)

global_runner_cfg = {
k: self.config["runners"][k]
for k in ("batching", "resources", "logging", "timeout")
}
global_runner_cfg = {k: self.config["runners"][k] for k in RUNNER_CFG_KEYS}
for key in self.config["runners"]:
if key not in ["batching", "resources", "logging", "timeout"]:
if key not in RUNNER_CFG_KEYS:
runner_cfg = self.config["runners"][key]

# key is a runner name
Expand Down Expand Up @@ -385,14 +388,10 @@ def access_control_options(
@staticmethod
def metrics_client(
multiproc_dir: str = Provide[prometheus_multiproc_dir],
namespace: str = Provide[config.api_server.metrics.namespace],
) -> "PrometheusClient":
from ..server.metrics.prometheus import PrometheusClient

return PrometheusClient(
multiproc_dir=multiproc_dir,
namespace=namespace,
)
return PrometheusClient(multiproc_dir=multiproc_dir)

@providers.SingletonFactory
@staticmethod
Expand Down Expand Up @@ -509,8 +508,10 @@ def tracing_excluded_urls(

if isinstance(excluded_urls, list):
return ExcludeList(excluded_urls)
else:
elif isinstance(excluded_urls, str):
return parse_excluded_urls(excluded_urls)
else:
return ExcludeList([])

# Mapping from runner name to RunnerApp file descriptor
remote_runner_mapping = providers.Static[t.Dict[str, str]]({})
Expand All @@ -519,7 +520,7 @@ def tracing_excluded_urls(
@providers.SingletonFactory
@staticmethod
def duration_buckets(
metrics: dict[str, t.Any] = Provide[config.api_server.metrics],
metrics: dict[str, t.Any] = Provide[config.api_server.metrics]
) -> tuple[float, ...]:
"""
Returns a tuple of duration buckets in seconds. If not explicitly configured,
Expand Down
5 changes: 4 additions & 1 deletion bentoml/_internal/configuration/default_configuration.yaml
Expand Up @@ -7,7 +7,7 @@ api_server:
max_request_size: 20971520
metrics:
enabled: True
namespace: BENTOML
namespace: bentoml_api_server
logging:
access:
enabled: True
Expand Down Expand Up @@ -37,6 +37,9 @@ runners:
request_content_type: True
response_content_length: True
response_content_type: True
metrics:
enabled: True
namespace: bentoml_runner
timeout: 300

tracing:
Expand Down
4 changes: 2 additions & 2 deletions bentoml/_internal/context.py
Expand Up @@ -188,8 +188,8 @@ def request_id(self, request_id: t.Optional[int]) -> None:


class _ComponentContext:
bento_name: str | None = None
bento_version: str | None = None
bento_name: str = ""
bento_version: str = "not available"
component_type: str | None = None
component_name: str | None = None
component_index: int | None = None
Expand Down
49 changes: 15 additions & 34 deletions bentoml/_internal/marshal/dispatcher.py
Expand Up @@ -7,26 +7,17 @@
import functools
import traceback
import collections
from typing import TYPE_CHECKING

import numpy as np
from simple_di import inject
from simple_di import Provide

from ..utils import cached_property
from ..utils.alg import TokenBucket
from ..utils.metrics import metric_name
from ..utils.metrics import exponential_buckets
from ..configuration.containers import BentoMLContainer

if TYPE_CHECKING:
from ..server.metrics.prometheus import PrometheusClient

logger = logging.getLogger(__name__)


class NonBlockSema:
def __init__(self, count):
def __init__(self, count: int):
self.sema = count

def acquire(self):
Expand Down Expand Up @@ -56,7 +47,7 @@ def __init__(self):
assume the outbound duration follows duration = o_a * n + o_b
(all in seconds)
"""
self.o_stat = collections.deque(
self.o_stat: collections.deque[tuple[int, float, float]] = collections.deque(
maxlen=self.N_KEPT_SAMPLE
) # to store outbound stat data
self.o_a = 2
Expand All @@ -67,7 +58,7 @@ def __init__(self):
self._refresh_tb = TokenBucket(2) # to limit params refresh interval
self._outbound_counter = 0

def log_outbound(self, n, wait, duration):
def log_outbound(self, n: int, wait: float, duration: float):
if (
self._outbound_counter <= self.N_SKIPPED_SAMPLE
): # skip inaccurate info at beginning
Expand All @@ -82,7 +73,9 @@ def log_outbound(self, n, wait, duration):
def trigger_refresh(self):
x = tuple((i, 1) for i, _, _ in self.o_stat)
y = tuple(i for _, i, _ in self.o_stat)
_o_a, _o_b = np.linalg.lstsq(x, y, rcond=None)[0]

_factors: tuple[float, float] = np.linalg.lstsq(x, y, rcond=None)[0] # type: ignore
_o_a, _o_b = _factors
_o_w = sum(w for _, _, w in self.o_stat) * 1.0 / len(self.o_stat)

self.o_a, self.o_b = max(0.000001, _o_a), max(0, _o_b)
Expand All @@ -107,17 +100,12 @@ class CorkDispatcher:
The wrapped function should be an async function.
"""

@inject
def __init__(
self,
runner_name: str,
worker_index: int,
method_name: str,
max_latency_in_ms: int,
max_batch_size: int,
shared_sema: t.Optional[NonBlockSema] = None,
fallback: t.Optional[t.Callable[[], t.Any]] = None,
metrics_client: PrometheusClient = Provide[BentoMLContainer.metrics_client],
):
"""
params:
Expand All @@ -129,25 +117,17 @@ def __init__(
* all possible exceptions the decorated function has
"""
self.max_latency_in_ms = max_latency_in_ms / 1000.0
self.callback = None
self.fallback = fallback
self.optimizer = Optimizer()
self.max_batch_size = int(max_batch_size)
self.tick_interval = 0.001

self._controller = None
self._queue = collections.deque() # TODO(hrmthw): maxlen
self._queue: collections.deque[
tuple[float, t.Any, asyncio.Future[t.Any]]
] = collections.deque() # TODO(bojiang): maxlen
self._sema = shared_sema if shared_sema else NonBlockSema(1)

self.adaptive_batch_size_hist = metrics_client.Histogram(
name=metric_name(
runner_name, worker_index, method_name, "adaptive_batch_size"
),
documentation=runner_name + " Runner adaptive batch size",
labelnames=[], # TODO: add service version
buckets=exponential_buckets(1, 2, max_batch_size),
)

def shutdown(self):
if self._controller is not None:
self._controller.cancel()
Expand All @@ -169,13 +149,13 @@ def _wake_event(self):
def __call__(
self,
callback: t.Callable[
[t.Iterable[T_IN]], t.Coroutine[None, None, t.Iterable[T_OUT]]
[t.Collection[T_IN]], t.Coroutine[None, None, t.Collection[T_OUT]]
],
) -> t.Callable[[T_IN], t.Coroutine[None, None, T_OUT]]:
self.callback = callback

@functools.wraps(callback)
async def _func(data):
async def _func(data: t.Any) -> t.Any:
if self._controller is None:
self._controller = self._loop.create_task(self.controller())
try:
Expand Down Expand Up @@ -232,7 +212,7 @@ async def controller(self):
except Exception: # pylint: disable=broad-except
logger.error(traceback.format_exc())

async def inbound_call(self, data):
async def inbound_call(self, data: t.Any):
now = time.time()
future = self._loop.create_future()
input_info = (now, data, future)
Expand All @@ -241,12 +221,13 @@ async def inbound_call(self, data):
self._wake_event.notify_all()
return await future

async def outbound_call(self, inputs_info):
async def outbound_call(
self, inputs_info: tuple[tuple[float, t.Any, asyncio.Future[t.Any]]]
):
_time_start = time.time()
_done = False
batch_size = len(inputs_info)
logger.debug("Dynamic batching cork released, batch size: %d", batch_size)
self.adaptive_batch_size_hist.observe(batch_size)
try:
outputs = await self.callback(tuple(d for _, d, _ in inputs_info))
assert len(outputs) == len(inputs_info)
Expand Down

0 comments on commit 532f5e1

Please sign in to comment.