diff --git a/src/bentoml/_internal/monitoring/api.py b/src/bentoml/_internal/monitoring/api.py index 0f4e6ccf417..094a873dbea 100644 --- a/src/bentoml/_internal/monitoring/api.py +++ b/src/bentoml/_internal/monitoring/api.py @@ -2,24 +2,17 @@ import typing as t import logging -import datetime import contextlib import collections +import contextvars import logging.config -from typing import TYPE_CHECKING -from pathlib import Path -import yaml from simple_di import inject from simple_di import Provide from ..types import LazyType -from ..context import component_context from ..configuration.containers import BentoMLContainer -if TYPE_CHECKING: - from ..types import JSONSerializable - DT = t.TypeVar("DT") MT = t.TypeVar("MT", bound="MonitorBase[t.Any]") @@ -30,257 +23,126 @@ MONITOR_REGISTRY: dict[str, MonitorBase[t.Any]] = {} # cache of monitors +MON_COLUMN_VAR = contextvars.ContextVar["dict[str, dict[str, str]] | None"]( + "MON_COLUMN_VAR", default=None +) +MON_DATAS_VAR = contextvars.ContextVar["dict[str, collections.deque[DT]] | None"]( + "MON_DATAS_VAR", default=None +) -class MonitorBase(t.Generic[DT]): - def __init__(self, name: str, **kwargs: t.Any) -> None: - raise NotImplementedError() - - def start_record(self) -> None: - raise NotImplementedError() - - def stop_record(self) -> None: - raise NotImplementedError() - - def export_schema(self) -> JSONSerializable: - raise NotImplementedError() - - def export_data(self) -> JSONSerializable: - raise NotImplementedError() - - def log(self, data: DT, name: str, role: str, data_type: str) -> None: - raise NotImplementedError() - - def log_batch( - self, - data_batch: t.Iterable[DT], - name: str, - role: str, - data_type: str, - ) -> None: - raise NotImplementedError() - - def log_table( - self, - data: t.Iterable[t.Iterable[DT]], - schema: dict[str, str], - ) -> None: - raise NotImplementedError() - - -class NoOpMonitor(MonitorBase[t.Any]): - def __init__(self, name: str, **kwargs: t.Any) -> None: - pass - - def start_record(self) -> None: - pass - - def stop_record(self) -> None: - pass - - def export_schema(self) -> JSONSerializable: - pass - - def export_data(self) -> JSONSerializable: - pass - - def log(self, data: t.Any, name: str, role: str, data_type: str) -> None: - pass - - def log_batch( - self, - data_batch: t.Iterable[t.Any], - name: str, - role: str, - data_type: str, - ) -> None: - pass - - def log_table( - self, - data: t.Iterable[t.Iterable[t.Any]], - schema: dict[str, str], - ) -> None: - pass - -DEFAULT_CONFIG_YAML = """ -version: 1 -incremental: false -loggers: - bentoml_monitor_data: - level: INFO - handlers: [bentoml_monitor_data] - bentoml_monitor_schema: - level: INFO - handlers: [bentoml_monitor_schema] -handlers: - bentoml_monitor_data: - class: logging.handlers.TimedRotatingFileHandler - level: INFO - formatter: bentoml_json - filename: "{data_filename}" - when: "D" - bentoml_monitor_schema: - class: logging.handlers.RotatingFileHandler - level: INFO - formatter: bentoml_json - filename: "{schema_filename}" -formatters: - bentoml_json: - class: pythonjsonlogger.jsonlogger.JsonFormatter - format: " " -""" - - -class DefaultMonitor(MonitorBase["JSONSerializable"]): +class MonitorBase(t.Generic[DT]): """ - The default monitor implementation. It uses a logger to log data and schema, and will - write monitor data to rotating files. The schema is logged as a JSON object, and the - data is logged as a JSON array. + The base monitor class. All monitors should inherit from this class. + Subclasses should implement the following methods: + - export_schema + - export_data + to export the schema and data to the desired format. """ - PRESERVED_COLUMNS = (COLUMN_TIME, COLUMN_RID) = ("timestamp", "request_id") + PRESERVED_COLUMNS: tuple[str, ...] = () def __init__( self, name: str, - log_path: str, - log_config_file: str | None = None, + **_: t.Any, ) -> None: self.name = name - self.log_config_file = log_config_file - self.log_path = log_path - self._is_first_record = True - self._is_first_column = False - self._column_meta: dict[str, dict[str, str]] = {} - self._columns: dict[ - str, - collections.deque[JSONSerializable], - ] = collections.defaultdict(collections.deque) - - def _init_logger(self) -> None: - if self.log_config_file is None: - logging_config_yaml = DEFAULT_CONFIG_YAML - else: - with open(self.log_config_file, "r", encoding="utf8") as f: - logging_config_yaml = f.read() - - worker_id = component_context.component_index - schema_path = Path(f"{self.log_path}/{self.name}/schema/schema.{worker_id}.log") - data_path = Path(f"{self.log_path}/{self.name}/data/data.{worker_id}.log") - - schema_path.parent.mkdir(parents=True, exist_ok=True) - data_path.parent.mkdir(parents=True, exist_ok=True) + self.columns_schema: dict[str, dict[str, str]] | None = None - logging_config_yaml = logging_config_yaml.format( - schema_filename=str(schema_path.absolute()), - data_filename=str(data_path.absolute()), - worker_id=worker_id, - monitor_name=self.name, - ) - - logging_config = yaml.safe_load(logging_config_yaml) - logging.config.dictConfig(logging_config) - # pylint: disable=attribute-defined-outside-init - self.data_logger = logging.getLogger("bentoml_monitor_data") - self.schema_logger = logging.getLogger("bentoml_monitor_schema") - self.data_logger.propagate = False - self.schema_logger.propagate = False - - def start_record(self) -> None: + def start_record(self): """ Start recording data. This method should be called before logging any data. """ - self._is_first_column = True + if self.columns_schema is None: + assert MON_COLUMN_VAR.get() is None + MON_COLUMN_VAR.set({}) + + assert MON_DATAS_VAR.get() is None + MON_DATAS_VAR.set(collections.defaultdict(collections.deque)) def stop_record(self) -> None: """ Stop recording data. This method should be called after logging all data. """ - if self._is_first_record: - self._init_logger() - self.export_schema() - self._is_first_record = False + datas: dict[str, collections.deque[DT]] = MON_DATAS_VAR.get() # type: ignore + assert datas is not None - if self._is_first_column: - logger.warning("No data logged in this record. Will skip this record.") - else: - self.export_data() + if len(datas) == 0: + logger.warning("No data logged in this record. Will skip output.") + return + + if self.columns_schema is None: + columns = MON_COLUMN_VAR.get() + assert columns is not None + self.columns_schema = columns + self.export_schema(columns) + MON_COLUMN_VAR.set(None) + + if len(set(len(q) for q in datas.values())) != 1: + assert ValueError("All columns should have the same length.") + self.export_data(datas) - def export_schema(self): + MON_DATAS_VAR.set(None) + + def export_schema(self, columns_schema: dict[str, dict[str, str]]) -> None: """ Export schema of the data. This method should be called after all data is logged. """ - from bentoml._internal.context import component_context - - self.schema_logger.info( - dict( - meta_data={ - "bento_name": component_context.bento_name, - "bento_version": component_context.bento_version, - }, - columns=list(self._column_meta.values()), - ) - ) + raise NotImplementedError() - def export_data(self): + def export_data(self, datas: dict[str, collections.deque[DT]]) -> None: """ Export data. This method should be called after all data is logged. """ - assert ( - len(set(len(q) for q in self._columns.values())) == 1 - ), "All columns must have the same length" - while True: - try: - record = {k: v.popleft() for k, v in self._columns.items()} - self.data_logger.info(record) - except IndexError: - break + raise NotImplementedError() def log( self, - data: JSONSerializable, + data: DT, name: str, role: str, data_type: str, - ignore_existing: bool = False, ) -> None: """ log a data with column name, role and type to the current record """ if name in self.PRESERVED_COLUMNS: - raise ValueError( - f"Column name {name} is preserved. Please use a different name." + logger.warning( + "Column name %s is reserved, will be renamed to %s", name, name + "_" ) + name = name + "_" - assert role in BENTOML_MONITOR_ROLES, f"Invalid role {role}" - assert data_type in BENTOML_MONITOR_TYPES, f"Invalid data type {data_type}" + if role not in BENTOML_MONITOR_ROLES: + logger.warning( + "Role {role} is not officially supported, but will be logged anyway." + ) + if data_type not in BENTOML_MONITOR_TYPES: + logger.warning( + "Data type {data_type} is not officially supported, but will be logged anyway." + ) - if self._is_first_record: - if name not in self._column_meta: - self._column_meta[name] = { + if self.columns_schema is None: + columns = MON_COLUMN_VAR.get() + assert columns is not None + if name in columns: + logger.warning( + "Column name %s is duplicated, will be ignored.", + name, + ) + else: + columns[name] = { "name": name, "role": role, "type": data_type, } - elif not ignore_existing: - raise ValueError( - f"Column {name} already exists. Please use a different name." - ) - if self._is_first_column: - self._is_first_column = False - - from ..context import trace_context - - # universal columns - self._columns[self.COLUMN_TIME].append(datetime.datetime.now().isoformat()) - self._columns[self.COLUMN_RID].append(trace_context.request_id) - - self._columns[name].append(data) + datas = MON_DATAS_VAR.get() + assert datas is not None + datas[name].append(data) def log_batch( self, - data_batch: t.Iterable[JSONSerializable], + data_batch: t.Iterable[DT], name: str, role: str, data_type: str, @@ -290,18 +152,57 @@ def log_batch( """ try: for data in data_batch: - self.log(data, name, role, data_type, ignore_existing=True) + self.log(data, name, role, data_type) except TypeError: - raise ValueError( - "data_batch is not iterable. Please use log() to log a single data." - ) from None + logger.warning( + "Data batch is not iterable, will ignore the data batch. Please use log() to log a single data." + ) def log_table( self, - data: t.Iterable[t.Iterable[JSONSerializable]], + data: t.Iterable[t.Iterable[DT]], schema: dict[str, str], ) -> None: - raise NotImplementedError("Not implemented yet") + logger.warning( + "log_table() is not implemented yet. Will ignore the data. Please use log() or log_batch() instead." + ) + return + + +class NoOpMonitor(MonitorBase[t.Any]): + def __init__(self, name: str, **kwargs: t.Any) -> None: + pass + + def start_record(self) -> None: + pass + + def stop_record(self) -> None: + pass + + def export_schema(self, columns_schema: dict[str, dict[str, str]]) -> None: + pass + + def export_data(self, datas: dict[str, collections.deque[t.Any]]) -> None: + pass + + def log(self, data: t.Any, name: str, role: str, data_type: str) -> None: + pass + + def log_batch( + self, + data_batch: t.Iterable[t.Any], + name: str, + role: str, + data_type: str, + ) -> None: + pass + + def log_table( + self, + data: t.Iterable[t.Iterable[t.Any]], + schema: dict[str, str], + ) -> None: + pass @t.overload @@ -387,16 +288,18 @@ def monitor( if not BentoMLContainer.config.monitoring.enabled.get(): monitor_klass = NoOpMonitor elif monitor_class is None or monitor_class == "default": + from .default import DefaultMonitor + monitor_klass = DefaultMonitor elif isinstance(monitor_class, str): monitor_klass = LazyType["MonitorBase[t.Any]"](monitor_class).get_class() elif isinstance(monitor_class, type): monitor_klass = monitor_class else: - raise ValueError( - "monitor_class must be a class, a string or None. " - f"Got {type(monitor_class)}" + logger.warning( + "Invalid monitor class, will disable monitoring. Please check your configuration." ) + monitor_klass = NoOpMonitor if monitor_options is None: monitor_options = {} MONITOR_REGISTRY[name] = monitor_klass(name, **monitor_options) diff --git a/src/bentoml/_internal/monitoring/default.py b/src/bentoml/_internal/monitoring/default.py new file mode 100644 index 00000000000..e8ed6054cfc --- /dev/null +++ b/src/bentoml/_internal/monitoring/default.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +import typing as t +import logging +import datetime +import collections +import logging.config +from typing import TYPE_CHECKING +from pathlib import Path + +import yaml + +from .api import MonitorBase +from ..context import trace_context +from ..context import component_context + +if TYPE_CHECKING: + from ..types import JSONSerializable + + +DEFAULT_CONFIG_YAML = """ +version: 1 +disable_existing_loggers: false +loggers: + bentoml_monitor_data: + level: INFO + handlers: [bentoml_monitor_data] + propagate: false + bentoml_monitor_schema: + level: INFO + handlers: [bentoml_monitor_schema] + propagate: false +handlers: + bentoml_monitor_data: + class: logging.handlers.TimedRotatingFileHandler + level: INFO + formatter: bentoml_json + filename: "{data_filename}" + when: "D" + bentoml_monitor_schema: + class: logging.handlers.RotatingFileHandler + level: INFO + formatter: bentoml_json + filename: "{schema_filename}" +formatters: + bentoml_json: + class: pythonjsonlogger.jsonlogger.JsonFormatter + format: " " +""" + + +class DefaultMonitor(MonitorBase["JSONSerializable"]): + """ + The default monitor implementation. It uses a logger to log data and schema, and will + write monitor data to rotating files. The schema is logged as a JSON object, and the + data is logged as a JSON array. + """ + + PRESERVED_COLUMNS = (COLUMN_TIME, COLUMN_RID) = ("timestamp", "request_id") + + def __init__( + self, + name: str, + log_path: str, + log_config_file: str | None = None, + **_: t.Any, + ) -> None: + super().__init__(name, **_) + self.log_config_file = log_config_file + self.log_path = log_path + self.data_logger = None + self.schema_logger = None + + def _init_logger(self) -> None: + if self.log_config_file is None: + logging_config_yaml = DEFAULT_CONFIG_YAML + else: + with open(self.log_config_file, "r", encoding="utf8") as f: + logging_config_yaml = f.read() + + worker_id = component_context.component_index or 0 + schema_path = Path(f"{self.log_path}/{self.name}/schema/schema.{worker_id}.log") + data_path = Path(f"{self.log_path}/{self.name}/data/data.{worker_id}.log") + + schema_path.parent.mkdir(parents=True, exist_ok=True) + data_path.parent.mkdir(parents=True, exist_ok=True) + + logging_config_yaml = logging_config_yaml.format( + schema_filename=str(schema_path.absolute()), + data_filename=str(data_path.absolute()), + worker_id=worker_id, + monitor_name=self.name, + ) + + logging_config = yaml.safe_load(logging_config_yaml) + logging.config.dictConfig(logging_config) + self.data_logger = logging.getLogger("bentoml_monitor_data") + self.schema_logger = logging.getLogger("bentoml_monitor_schema") + + def export_schema(self, columns_schema: dict[str, dict[str, str]]) -> None: + """ + Export columns_schema of the data. This method should be called after all data is logged. + """ + if self.schema_logger is None: + self._init_logger() + assert self.schema_logger is not None + + self.schema_logger.info( + dict( + meta_data={ + "bento_name": component_context.bento_name, + "bento_version": component_context.bento_version, + }, + columns=list(columns_schema.values()), + ) + ) + + def export_data( + self, + datas: t.Dict[str, collections.deque[JSONSerializable]], + ) -> None: + """ + Export data. This method should be called after all data is logged. + """ + if self.data_logger is None: + self._init_logger() + assert self.data_logger is not None + + extra_columns = dict( + timestamp=datetime.datetime.now().isoformat(), + request_id=str(trace_context.request_id), + ) + while True: + try: + record = {k: v.popleft() for k, v in datas.items()} + record.update(extra_columns) + self.data_logger.info(record) + except IndexError: + break diff --git a/src/bentoml/_internal/server/http_app.py b/src/bentoml/_internal/server/http_app.py index 5d7fe5cd791..871c9a0a6f1 100644 --- a/src/bentoml/_internal/server/http_app.py +++ b/src/bentoml/_internal/server/http_app.py @@ -336,6 +336,10 @@ async def api_func(request: Request) -> Response: output = await run_in_threadpool(api.func, input_data) response = await api.output.to_http_response(output, ctx) + if trace_context.request_id is not None: + response.headers["X-BentoML-Request-ID"] = str( + trace_context.request_id + ) except BentoMLException as e: log_exception(request, sys.exc_info())