Skip to content

Commit

Permalink
feat: gRPC tracking (#3015)
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Pham <29749331+aarnphm@users.noreply.github.com>

## What does this PR address?

<!--
Thanks for sending a pull request!

Congrats for making it this far! Here's a 🍱 for you. There are still a
few steps ahead.

Please make sure to read the contribution guidelines, then fill out the
blanks below before requesting a code review.

Name your Pull Request with one of the following prefixes, e.g. "feat:
add support for PyTorch", to indicate the type of changes proposed. This
is based on the [Conventional Commits
specification](https://www.conventionalcommits.org/en/v1.0.0/#summary).
  - feat: (new feature for the user, not a new feature for build script)
  - fix: (bug fix for the user, not a fix to a build script)
  - docs: (changes to the documentation)
- style: (formatting, missing semicolons, etc; no production code
change)
  - refactor: (refactoring production code, eg. renaming a variable)
  - perf: (code changes that improve performance)
- test: (adding missing tests, refactoring tests; no production code
change)
  - chore: (updating grunt tasks etc; no production code change)
- build: (changes that affect the build system or external dependencies)
  - ci: (changes to configuration files and scripts)
  - revert: (reverts a previous commit)

Describe your changes in detail. Attach screenshots here if appropriate.

Once you're done with this, someone from BentoML team or community
member will help review your PR (see "Who can help review?" section for
potential reviewers.). If no one has reviewed your PR after a week have
passed, don't hesitate to post a new comment and ping @-the same person.
Notifications sometimes get lost 🥲.
-->

<!-- Remove if not applicable -->

Fixes #2991 
Depends on #2984 


## Before submitting:

<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->
<!--- If you plan to update documentation or tests in follow-up, please
note -->

- [ ] Does the Pull Request follow [Conventional Commits
specification](https://www.conventionalcommits.org/en/v1.0.0/#summary)
naming? Here are [GitHub's

guide](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request)
on how to create a pull request.
- [x] Does the code follow BentoML's code style, both `make format` and
`make lint` script have passed
([instructions](https://github.com/bentoml/BentoML/blob/main/DEVELOPMENT.md#style-check-auto-formatting-type-checking))?
- [x] Did you read through [contribution
guidelines](https://github.com/bentoml/BentoML/blob/main/CONTRIBUTING.md#ways-to-contribute)
and follow [development
guidelines](https://github.com/bentoml/BentoML/blob/main/DEVELOPMENT.md#start-developing)?
- [ ] Did your changes require updates to the documentation? Have you
updated
those accordingly? Here are [documentation
guidelines](https://github.com/bentoml/BentoML/tree/main/docs) and [tips
on writting
docs](https://github.com/bentoml/BentoML/tree/main/docs#writing-documentation).
- [ ] Did you write tests to cover your changes?

Signed-off-by: Aaron Pham <29749331+aarnphm@users.noreply.github.com>
  • Loading branch information
aarnphm committed Sep 27, 2022
1 parent ce99e45 commit 76ade96
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 75 deletions.
9 changes: 4 additions & 5 deletions bentoml/_internal/server/metrics/prometheus.py
Expand Up @@ -34,10 +34,10 @@ def __init__(
assert multiproc_dir is not None, "multiproc_dir must be provided"

self.multiproc = multiproc
self.multiproc_dir: t.Optional[str] = multiproc_dir
self.multiproc_dir: str | None = multiproc_dir
self._registry = None
self._imported = False
self._pid: t.Optional[int] = None
self._pid: int | None = None

@property
def prometheus_client(self):
Expand All @@ -56,6 +56,7 @@ def prometheus_client(self):

# step 2:
import prometheus_client
import prometheus_client.parser
import prometheus_client.multiprocess

self._imported = True
Expand Down Expand Up @@ -106,9 +107,7 @@ def generate_latest(self):
return self.prometheus_client.generate_latest()

def text_string_to_metric_families(self) -> t.Generator[Metric, None, None]:
from prometheus_client.parser import text_string_to_metric_families

yield from text_string_to_metric_families(
yield from self.prometheus_client.parser.text_string_to_metric_families(
self.generate_latest().decode("utf-8")
)

Expand Down
86 changes: 75 additions & 11 deletions bentoml/_internal/utils/__init__.py
Expand Up @@ -14,6 +14,7 @@
from typing import overload
from typing import TYPE_CHECKING
from pathlib import Path
from reprlib import recursive_repr as _recursive_repr
from datetime import date
from datetime import time
from datetime import datetime
Expand All @@ -31,16 +32,18 @@

from .cattr import bentoml_cattr
from ..types import LazyType
from ..types import PathType
from ..types import MetadataDict
from ..types import MetadataType
from .lazy_loader import LazyLoader

if TYPE_CHECKING:
from fs.base import FS
from typing_extensions import Self

from ..types import PathType
from ..types import MetadataDict
from ..types import MetadataType

P = t.ParamSpec("P")
GenericFunction = t.Callable[P, t.Any]
F = t.Callable[P, t.Any]


C = t.TypeVar("C")
Expand All @@ -60,6 +63,7 @@
"display_path_under_home",
"rich_console",
"experimental",
"compose",
]

_EXPERIMENTAL_APIS: set[str] = set()
Expand Down Expand Up @@ -389,13 +393,16 @@ class cached_contextmanager:
Just like contextlib.contextmanager, but will cache the yield value for the same
arguments. When all instances of the same contextmanager exits, the cache value will
be dropped.
Example Usage:
(To reuse the container based on the same image)
>>> @cached_contextmanager("{docker_image.id}")
>>> def start_docker_container_from_image(docker_image, timeout=60):
>>> container = ...
>>> yield container
>>> container.stop()
Example Usage: (To reuse the container based on the same image)
.. code-block:: python
@cached_contextmanager("{docker_image.id}")
def start_docker_container_from_image(docker_image, timeout=60):
container = ...
yield container
container.stop()
"""

def __init__(self, cache_key_template: t.Optional[str] = None):
Expand Down Expand Up @@ -426,3 +433,60 @@ def _func(*args: "P.args", **kwargs: "P.kwargs") -> t.Any:
self._cache.pop(cache_key)

return _func


class compose:
"""
Function composition: compose(f, g)(...) is equivalent to f(g(...)).
Refers to https://github.com/mentalisttraceur/python-compose for original implementation.
Args:
*functions: Functions (or other callables) to compose.
Raises:
TypeError: If no arguments are given, or any argument is not callable.
"""

def __init__(self: Self, *functions: F[t.Any]):
if not functions:
raise TypeError(f"{self!r} needs at least one argument.")
_functions: list[F[t.Any]] = []
for function in reversed(functions):
if not callable(function):
raise TypeError(f"{self!r} arguments must be callable.")
if isinstance(function, compose):
_functions.extend(function.functions)
else:
_functions.append(function)
self.__wrapped__ = _functions[0]
self._wrappers = tuple(_functions[1:])

def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
"""Call the composed function."""
result = self.__wrapped__(*args, **kwargs)
for function in self._wrappers:
result = function(result)
return result

def __get__(self, obj: t.Any, typ_: type | None = None):
"""Get the composed function as a bound method."""
wrapped = self.__wrapped__
try:
bind = type(wrapped).__get__
except AttributeError:
return self
bound_wrapped = bind(wrapped, obj, typ_)
if bound_wrapped is wrapped:
return self
bound_self = type(self)(bound_wrapped)
bound_self._wrappers = self._wrappers
return bound_self

@_recursive_repr("<...>")
def __repr__(self):
return f"{self!r}({','.join(map(repr, reversed(self.functions)))})"

@property
def functions(self):
"""Read-only tuple of the composed callables, in order of execution."""
return (self.__wrapped__,) + tuple(self._wrappers)
14 changes: 13 additions & 1 deletion bentoml/_internal/utils/analytics/schemas.py
Expand Up @@ -179,13 +179,20 @@ class ModelSaveEvent(EventMeta):
model_size_in_kb: float


# serve_kind determines different type of serving scenarios
SERVE_KIND = ["grpc", "http"]
# components are different components to be tracked.
COMPONENT_KIND = ["standalone", "api_server", "runner"]


@attr.define
class ServeInitEvent(EventMeta):
serve_id: str
production: bool
serve_from_bento: bool

bento_creation_timestamp: t.Optional[datetime]
serve_kind: str = attr.field(validator=attr.validators.in_(SERVE_KIND))
num_of_models: int = attr.field(default=0)
num_of_runners: int = attr.field(default=0)
num_of_apis: int = attr.field(default=0)
Expand All @@ -199,9 +206,14 @@ class ServeInitEvent(EventMeta):
class ServeUpdateEvent(EventMeta):
serve_id: str
production: bool
component: str
triggered_at: datetime
duration_in_seconds: int
serve_kind: str = attr.field(validator=attr.validators.in_(SERVE_KIND))
component: str = attr.field(
validator=attr.validators.and_(
attr.validators.instance_of(str), attr.validators.in_(COMPONENT_KIND)
)
)
metrics: t.List[t.Any] = attr.field(factory=list)


Expand Down
101 changes: 73 additions & 28 deletions bentoml/_internal/utils/analytics/usage_stats.py
Expand Up @@ -17,18 +17,22 @@
from simple_di import inject
from simple_di import Provide

from ...utils import compose
from .schemas import EventMeta
from .schemas import ServeInitEvent
from .schemas import TrackingPayload
from .schemas import CommonProperties
from .schemas import ServeUpdateEvent
from ...configuration import get_debug_mode
from ...configuration.containers import BentoMLContainer

if TYPE_CHECKING:
P = t.ParamSpec("P")
T = t.TypeVar("T")
AsyncFunc = t.Callable[P, t.Coroutine[t.Any, t.Any, t.Any]]

from prometheus_client.samples import Sample

from bentoml import Service

from ...server.metrics.prometheus import PrometheusClient
Expand Down Expand Up @@ -62,7 +66,12 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> t.Any:
return func(*args, **kwargs)
except Exception as err: # pylint: disable=broad-except
if _usage_event_debugging():
logger.info(f"Tracking Error: {err}")
if get_debug_mode():
logger.error(
f"Tracking Error: {err}", stack_info=True, stacklevel=3
)
else:
logger.info(f"Tracking Error: {err}")
else:
logger.debug(f"Tracking Error: {err}")

Expand Down Expand Up @@ -116,6 +125,7 @@ def track(event_properties: EventMeta):
def _track_serve_init(
svc: Service,
production: bool,
serve_kind: str,
serve_info: ServeInfo = Provide[BentoMLContainer.serve_info],
):
if svc.bento is not None:
Expand All @@ -124,6 +134,7 @@ def _track_serve_init(
serve_id=serve_info.serve_id,
serve_from_bento=True,
production=production,
serve_kind=serve_kind,
bento_creation_timestamp=bento.info.creation_time,
num_of_models=len(bento.info.models),
num_of_runners=len(svc.runners),
Expand All @@ -138,6 +149,7 @@ def _track_serve_init(
serve_id=serve_info.serve_id,
serve_from_bento=False,
production=production,
serve_kind=serve_kind,
bento_creation_timestamp=None,
num_of_models=len(
set(
Expand All @@ -160,33 +172,61 @@ def _track_serve_init(
EXCLUDE_PATHS = {"/docs.json", "/livez", "/healthz", "/readyz"}


def filter_metrics(
samples: list[Sample], *filters: t.Callable[[list[Sample]], list[Sample]]
):
return [
{**sample.labels, "value": sample.value}
for sample in compose(*filters)(samples)
]


def get_metrics_report(
metrics_client: PrometheusClient,
serve_kind: str,
) -> list[dict[str, str | float]]:
metrics_text = metrics_client.generate_latest().decode("utf-8")
if not metrics_text:
return []

from prometheus_client.parser import (
text_string_to_metric_families, # type: ignore (unfinished prometheus types)
)

for metric in text_string_to_metric_families(metrics_text):
# Searching for the metric BENTOML_{service_name}_request of type Counter
if (
metric.type == "counter"
and metric.name.startswith("BENTOML_")
and metric.name.endswith("_request")
):
return [
{**sample.labels, "value": sample.value}
for sample in metric.samples
if "endpoint" in sample.labels
# exclude common infra paths
and sample.labels["endpoint"] not in EXCLUDE_PATHS
# exclude static_content prefix
and not sample.labels["endpoint"].startswith("/static_content/")
"""
Get Prometheus metrics reports from the metrics client. This will be used to determine tracking events.
If the return metrics are legacy metrics, the metrics will have prefix BENTOML_, otherwise they will have prefix bentoml_
Args:
metrics_client: Instance of bentoml._internal.server.metrics.prometheus.PrometheusClient
grpc: Whether the metrics are for gRPC server.
Returns:
A tuple of a list of metrics and an optional boolean to determine whether the return metrics are legacy metrics.
"""
for metric in metrics_client.text_string_to_metric_families():
metric_type = t.cast("str", metric.type) # type: ignore (we need to cast due to no prometheus types)
metric_name = t.cast("str", metric.name) # type: ignore (we need to cast due to no prometheus types)
metric_samples = t.cast("list[Sample]", metric.samples) # type: ignore (we need to cast due to no prometheus types)
if metric_type != "counter":
continue
# We only care about the counter metrics.
assert metric_type == "counter"
if serve_kind == "grpc":
_filters: list[t.Callable[[list[Sample]], list[Sample]]] = [
lambda samples: [s for s in samples if "api_name" in s.labels]
]
elif serve_kind == "http":
_filters = [
lambda samples: [
s
for s in samples
if not s.labels["endpoint"].startswith("/static_content/")
],
lambda samples: [
s for s in samples if s.labels["endpoint"] not in EXCLUDE_PATHS
],
lambda samples: [s for s in samples if "endpoint" in s.labels],
]
else:
raise NotImplementedError("Unknown serve kind %s" % serve_kind)
# If metrics prefix is BENTOML_, this is legacy metrics
if metric_name.endswith("_request") and (
metric_name.startswith("bentoml_") or metric_name.startswith("BENTOML_")
):
return filter_metrics(metric_samples, *_filters)

return []

Expand All @@ -195,7 +235,9 @@ def get_metrics_report(
@contextlib.contextmanager
def track_serve(
svc: Service,
production: bool,
*,
production: bool = False,
serve_kind: str = "http",
component: str = "standalone",
metrics_client: PrometheusClient = Provide[BentoMLContainer.metrics_client],
serve_info: ServeInfo = Provide[BentoMLContainer.serve_info],
Expand All @@ -204,12 +246,12 @@ def track_serve(
yield
return

_track_serve_init(svc, production)
_track_serve_init(svc=svc, production=production, serve_kind=serve_kind)

if _usage_event_debugging():
tracking_interval = 5
else:
tracking_interval = SERVE_USAGE_TRACKING_INTERVAL_SECONDS
tracking_interval = SERVE_USAGE_TRACKING_INTERVAL_SECONDS # pragma: no cover

stop_event = threading.Event()

Expand All @@ -221,10 +263,13 @@ def loop() -> t.NoReturn: # type: ignore
event_properties = ServeUpdateEvent(
serve_id=serve_info.serve_id,
production=production,
# Note that we are currently only have two tracking jobs: http and grpc
serve_kind=serve_kind,
# Current accept components are "standalone", "api_server" and "runner"
component=component,
triggered_at=now,
duration_in_seconds=int((now - last_tracked_timestamp).total_seconds()),
metrics=get_metrics_report(metrics_client),
metrics=get_metrics_report(metrics_client, serve_kind=serve_kind),
)
last_tracked_timestamp = now
track(event_properties)
Expand Down

0 comments on commit 76ade96

Please sign in to comment.