diff --git a/.github/actions/setup-repo/action.yml b/.github/actions/setup-repo/action.yml index 4d9cd9c..1bcbebf 100644 --- a/.github/actions/setup-repo/action.yml +++ b/.github/actions/setup-repo/action.yml @@ -49,3 +49,6 @@ runs: - name: Install development dependencies shell: bash run: python -m pip install -r requirements/dev-requirements.lock.txt + - name: Install development dependencies + shell: bash + run: python -m pip install git+https://github.com/bentoml/bentoml.git@main diff --git a/monitoring/bentoml-plugins-arize/pyproject.toml b/monitoring/bentoml-plugins-arize/pyproject.toml index c6e3e4a..ae0a1e0 100644 --- a/monitoring/bentoml-plugins-arize/pyproject.toml +++ b/monitoring/bentoml-plugins-arize/pyproject.toml @@ -30,7 +30,7 @@ classifiers = [ dynamic = ["version"] # NOTE: Make sure to add corresponding dependencies for given components here. -dependencies = ["bentoml>=1.0.8", "arize"] +dependencies = ["bentoml", "arize>=5.2.0"] # NOTE: If requires additional dependencies for specific components, add them here. [project.optional-dependencies] diff --git a/monitoring/bentoml-plugins-arize/src/bentoml_plugins/arize/__init__.py b/monitoring/bentoml-plugins-arize/src/bentoml_plugins/arize/__init__.py index c3cb7e6..1be438e 100644 --- a/monitoring/bentoml-plugins-arize/src/bentoml_plugins/arize/__init__.py +++ b/monitoring/bentoml-plugins-arize/src/bentoml_plugins/arize/__init__.py @@ -278,10 +278,6 @@ def _map_data( class ArizeMonitor(MonitorBase[DataType]): - """ """ - - PRESERVED_COLUMNS = (COLUMN_TIME, COLUMN_RID) = ("timestamp", "request_id") - def __init__( self, name: str, @@ -296,9 +292,9 @@ def __init__( model_version: str | None = None, environment: Environments | None = None, model_tags: dict[str, str | bool | float | int] | None = None, - **_: t.Any, + **kwargs: t.Any, ): - self.name = name + super().__init__(name, **kwargs) # client options if api_key is None: @@ -342,30 +338,11 @@ def _init_client(self): timeout=self.timeout, ) - def start_record(self) -> None: - """ - Start recording data. This method should be called before logging any data. - """ - self._is_first_column = True - - def stop_record(self) -> None: - """ - Stop recording data. This method should be called after logging all data. - """ - if self._is_first_record: - self.export_schema() - self._is_first_record = False - - if self._is_first_column: - logger.warning("No data logged in this record. Will skip this record.") - else: - self.export_data() - - def export_schema(self): + def export_schema(self, columns_schema: dict[str, dict[str, str]]) -> None: """ Export schema of the data. This method should be called right after the first record. """ - fields = _stat_fields(self._schema.values()) + fields = _stat_fields(columns_schema.values()) mapping = _infer_mapping(fields, self.model_type) self._data_converter = ( # pylint: disable=attribute-defined-outside-init functools.partial(_map_data, fields=fields, mapping=mapping) @@ -376,7 +353,7 @@ def export_schema(self): if self.model_version is None and self.model_id is None: - self.model_id = component_context.bento_name + self.model_id = f"{component_context.bento_name}:{self.name}" self.model_version = component_context.bento_version if self.environment is None: @@ -384,31 +361,29 @@ def export_schema(self): self._init_client() - def export_data(self): + def export_data(self, datas: dict[str, collections.deque[DataType]]) -> None: """ Export data. This method should be called after all data is logged. """ - assert ( - len(set(len(q) for q in self._columns.values())) == 1 - ), "All columns must have the same length" assert self.model_id is not None assert self.model_type is not None assert self.environment is not None + + timestamp = datetime.datetime.now().timestamp() + prediction_id = trace_context.request_id + assert isinstance(prediction_id, int) while True: try: - record = {k: v.popleft() for k, v in self._columns.items()} + record = {k: v.popleft() for k, v in datas.items()} except IndexError: break - timestamp = record[self.COLUMN_TIME] - assert isinstance(timestamp, float) - prediction_id = record[self.COLUMN_RID] - assert isinstance(prediction_id, int) ( prediction_label, actual_label, features, embedding_features, ) = self._data_converter(record) + self._client.log( model_id=self.model_id, model_type=self.model_type, @@ -423,96 +398,3 @@ def export_data(self): features=features, # type: ignore (invariant types) embedding_features=embedding_features, ) - - def log( - self, - data: DataType, - name: str, - role: str, - data_type: str, - ) -> None: - """ - log a data with column name, role and type to the current record - """ - if name in self.PRESERVED_COLUMNS: - raise ValueError( - f"Column name {name} is preserved. Please use a different name." - ) - - assert role in BENTOML_MONITOR_ROLES, f"Invalid role {role}" - assert data_type in BENTOML_MONITOR_TYPES, f"Invalid data type {data_type}" - - if self._is_first_record: - if name not in self._schema: - self._schema[name] = { - "name": name, - "role": role, - "type": data_type, - } - else: - logger.warning( - "Column %s already exists in the schema. Will be ignored.", name - ) - if self._is_first_column: - self._is_first_column = False - - # universal columns - self._columns[self.COLUMN_TIME].append(datetime.datetime.now().timestamp()) - assert trace_context.request_id is not None - self._columns[self.COLUMN_RID].append(trace_context.request_id) - - self._columns[name].append(data) - - def log_batch( - self, - data_batch: t.Iterable[DataType], - name: str, - role: str, - data_type: str, - ) -> None: - """ - Log a batch of data. The data will be logged as a single column. - """ - if name in self.PRESERVED_COLUMNS: - raise ValueError( - f"Column name {name} is preserved. Please use a different name." - ) - - assert role in BENTOML_MONITOR_ROLES, f"Invalid role {role}" - assert data_type in BENTOML_MONITOR_TYPES, f"Invalid data type {data_type}" - - if self._is_first_record: - if name not in self._schema: - self._schema[name] = { - "name": name, - "role": role, - "type": data_type, - } - else: - logger.warning( - "Column %s already exists in the schema. Will be ignored.", name - ) - - try: - for data in data_batch: - self._columns[name].append(data) - if self._is_first_column: - assert trace_context.request_id is not None - # universal columns - self._columns[self.COLUMN_TIME].append( - datetime.datetime.now().timestamp() - ) - self._columns[self.COLUMN_RID].append(trace_context.request_id) - except TypeError: - raise ValueError( - "data_batch is not iterable. Please use log() to log a single data." - ) from None - if self._is_first_column: - self._is_first_column = False - - def log_table( - self, - data: t.Any, # type: pandas.DataFrame - schema: dict[str, str], - ) -> None: - raise NotImplementedError("Not implemented yet") diff --git a/monitoring/bentoml-plugins-arize/tests/test_package.py b/monitoring/bentoml-plugins-arize/tests/test_package.py index 071481a..723fd61 100644 --- a/monitoring/bentoml-plugins-arize/tests/test_package.py +++ b/monitoring/bentoml-plugins-arize/tests/test_package.py @@ -69,8 +69,10 @@ def fixture_init_context(monkeypatch: MonkeyPatch) -> None: def test_mapping(init_context: None) -> None: from bentoml_plugins.arize import ArizeMonitor + monitor_name = "my_model_1" + monitor = ArizeMonitor( - "my_monitor_1", + monitor_name, space_key=MY_SPACE_KEY, api_key=MY_API_KEY, uri=URI, @@ -80,13 +82,13 @@ def test_mapping(init_context: None) -> None: monitor.stop_record() assert monitor.model_type == ModelTypes.SCORE_CATEGORICAL assert Client.INIT_ARGS[-1]["space_key"] == MY_SPACE_KEY - assert Client.LOG_ARGS[-1]["model_id"] == BENTO_NAME + assert Client.LOG_ARGS[-1]["model_id"] == f"{BENTO_NAME}:{monitor_name}" assert Client.LOG_ARGS[-1]["model_version"] == BENTO_VERSION assert Client.LOG_ARGS[-1]["prediction_id"] == REQUEST_ID assert Client.LOG_ARGS[-1]["model_type"] == ModelTypes.SCORE_CATEGORICAL monitor = ArizeMonitor( - "my_monitor_1", + monitor_name, space_key="my_space_key", api_key="my_api_key", uri="http://localhost:5000/v1",