Skip to content
This repository has been archived by the owner on May 16, 2024. It is now read-only.

Commit

Permalink
refactor(arize): clean codes for the new SDK (#13)
Browse files Browse the repository at this point in the history
* refacotring to the new API

* Update monitoring/bentoml-plugins-arize/pyproject.toml

Signed-off-by: bojiang <5886138+bojiang@users.noreply.github.com>

* upgrade

* install main bentoml

* try to workaround

* workaround

* modify test

Signed-off-by: bojiang <5886138+bojiang@users.noreply.github.com>
  • Loading branch information
bojiang committed Nov 4, 2022
1 parent c543dbd commit 7c3972d
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 134 deletions.
3 changes: 3 additions & 0 deletions .github/actions/setup-repo/action.yml
Expand Up @@ -49,3 +49,6 @@ runs:
- name: Install development dependencies
shell: bash
run: python -m pip install -r requirements/dev-requirements.lock.txt
- name: Install development dependencies
shell: bash
run: python -m pip install git+https://github.com/bentoml/bentoml.git@main
2 changes: 1 addition & 1 deletion monitoring/bentoml-plugins-arize/pyproject.toml
Expand Up @@ -30,7 +30,7 @@ classifiers = [
dynamic = ["version"]

# NOTE: Make sure to add corresponding dependencies for given components here.
dependencies = ["bentoml>=1.0.8", "arize"]
dependencies = ["bentoml", "arize>=5.2.0"]

# NOTE: If requires additional dependencies for specific components, add them here.
[project.optional-dependencies]
Expand Down
142 changes: 12 additions & 130 deletions monitoring/bentoml-plugins-arize/src/bentoml_plugins/arize/__init__.py
Expand Up @@ -278,10 +278,6 @@ def _map_data(


class ArizeMonitor(MonitorBase[DataType]):
""" """

PRESERVED_COLUMNS = (COLUMN_TIME, COLUMN_RID) = ("timestamp", "request_id")

def __init__(
self,
name: str,
Expand All @@ -296,9 +292,9 @@ def __init__(
model_version: str | None = None,
environment: Environments | None = None,
model_tags: dict[str, str | bool | float | int] | None = None,
**_: t.Any,
**kwargs: t.Any,
):
self.name = name
super().__init__(name, **kwargs)

# client options
if api_key is None:
Expand Down Expand Up @@ -342,30 +338,11 @@ def _init_client(self):
timeout=self.timeout,
)

def start_record(self) -> None:
"""
Start recording data. This method should be called before logging any data.
"""
self._is_first_column = True

def stop_record(self) -> None:
"""
Stop recording data. This method should be called after logging all data.
"""
if self._is_first_record:
self.export_schema()
self._is_first_record = False

if self._is_first_column:
logger.warning("No data logged in this record. Will skip this record.")
else:
self.export_data()

def export_schema(self):
def export_schema(self, columns_schema: dict[str, dict[str, str]]) -> None:
"""
Export schema of the data. This method should be called right after the first record.
"""
fields = _stat_fields(self._schema.values())
fields = _stat_fields(columns_schema.values())
mapping = _infer_mapping(fields, self.model_type)
self._data_converter = ( # pylint: disable=attribute-defined-outside-init
functools.partial(_map_data, fields=fields, mapping=mapping)
Expand All @@ -376,39 +353,37 @@ def export_schema(self):

if self.model_version is None and self.model_id is None:

self.model_id = component_context.bento_name
self.model_id = f"{component_context.bento_name}:{self.name}"
self.model_version = component_context.bento_version

if self.environment is None:
self.environment = Environments.PRODUCTION

self._init_client()

def export_data(self):
def export_data(self, datas: dict[str, collections.deque[DataType]]) -> None:
"""
Export data. This method should be called after all data is logged.
"""
assert (
len(set(len(q) for q in self._columns.values())) == 1
), "All columns must have the same length"
assert self.model_id is not None
assert self.model_type is not None
assert self.environment is not None

timestamp = datetime.datetime.now().timestamp()
prediction_id = trace_context.request_id
assert isinstance(prediction_id, int)
while True:
try:
record = {k: v.popleft() for k, v in self._columns.items()}
record = {k: v.popleft() for k, v in datas.items()}
except IndexError:
break
timestamp = record[self.COLUMN_TIME]
assert isinstance(timestamp, float)
prediction_id = record[self.COLUMN_RID]
assert isinstance(prediction_id, int)
(
prediction_label,
actual_label,
features,
embedding_features,
) = self._data_converter(record)

self._client.log(
model_id=self.model_id,
model_type=self.model_type,
Expand All @@ -423,96 +398,3 @@ def export_data(self):
features=features, # type: ignore (invariant types)
embedding_features=embedding_features,
)

def log(
self,
data: DataType,
name: str,
role: str,
data_type: str,
) -> None:
"""
log a data with column name, role and type to the current record
"""
if name in self.PRESERVED_COLUMNS:
raise ValueError(
f"Column name {name} is preserved. Please use a different name."
)

assert role in BENTOML_MONITOR_ROLES, f"Invalid role {role}"
assert data_type in BENTOML_MONITOR_TYPES, f"Invalid data type {data_type}"

if self._is_first_record:
if name not in self._schema:
self._schema[name] = {
"name": name,
"role": role,
"type": data_type,
}
else:
logger.warning(
"Column %s already exists in the schema. Will be ignored.", name
)
if self._is_first_column:
self._is_first_column = False

# universal columns
self._columns[self.COLUMN_TIME].append(datetime.datetime.now().timestamp())
assert trace_context.request_id is not None
self._columns[self.COLUMN_RID].append(trace_context.request_id)

self._columns[name].append(data)

def log_batch(
self,
data_batch: t.Iterable[DataType],
name: str,
role: str,
data_type: str,
) -> None:
"""
Log a batch of data. The data will be logged as a single column.
"""
if name in self.PRESERVED_COLUMNS:
raise ValueError(
f"Column name {name} is preserved. Please use a different name."
)

assert role in BENTOML_MONITOR_ROLES, f"Invalid role {role}"
assert data_type in BENTOML_MONITOR_TYPES, f"Invalid data type {data_type}"

if self._is_first_record:
if name not in self._schema:
self._schema[name] = {
"name": name,
"role": role,
"type": data_type,
}
else:
logger.warning(
"Column %s already exists in the schema. Will be ignored.", name
)

try:
for data in data_batch:
self._columns[name].append(data)
if self._is_first_column:
assert trace_context.request_id is not None
# universal columns
self._columns[self.COLUMN_TIME].append(
datetime.datetime.now().timestamp()
)
self._columns[self.COLUMN_RID].append(trace_context.request_id)
except TypeError:
raise ValueError(
"data_batch is not iterable. Please use log() to log a single data."
) from None
if self._is_first_column:
self._is_first_column = False

def log_table(
self,
data: t.Any, # type: pandas.DataFrame
schema: dict[str, str],
) -> None:
raise NotImplementedError("Not implemented yet")
8 changes: 5 additions & 3 deletions monitoring/bentoml-plugins-arize/tests/test_package.py
Expand Up @@ -69,8 +69,10 @@ def fixture_init_context(monkeypatch: MonkeyPatch) -> None:
def test_mapping(init_context: None) -> None:
from bentoml_plugins.arize import ArizeMonitor

monitor_name = "my_model_1"

monitor = ArizeMonitor(
"my_monitor_1",
monitor_name,
space_key=MY_SPACE_KEY,
api_key=MY_API_KEY,
uri=URI,
Expand All @@ -80,13 +82,13 @@ def test_mapping(init_context: None) -> None:
monitor.stop_record()
assert monitor.model_type == ModelTypes.SCORE_CATEGORICAL
assert Client.INIT_ARGS[-1]["space_key"] == MY_SPACE_KEY
assert Client.LOG_ARGS[-1]["model_id"] == BENTO_NAME
assert Client.LOG_ARGS[-1]["model_id"] == f"{BENTO_NAME}:{monitor_name}"
assert Client.LOG_ARGS[-1]["model_version"] == BENTO_VERSION
assert Client.LOG_ARGS[-1]["prediction_id"] == REQUEST_ID
assert Client.LOG_ARGS[-1]["model_type"] == ModelTypes.SCORE_CATEGORICAL

monitor = ArizeMonitor(
"my_monitor_1",
monitor_name,
space_key="my_space_key",
api_key="my_api_key",
uri="http://localhost:5000/v1",
Expand Down

0 comments on commit 7c3972d

Please sign in to comment.