Skip to content
This repository has been archived by the owner on May 16, 2024. It is now read-only.

refactor(arize): clean codes for the new SDK #13

Merged
merged 7 commits into from Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/actions/setup-repo/action.yml
Expand Up @@ -49,3 +49,6 @@ runs:
- name: Install development dependencies
shell: bash
run: python -m pip install -r requirements/dev-requirements.lock.txt
- name: Install development dependencies
shell: bash
run: python -m pip install git+https://github.com/bentoml/bentoml.git@main
2 changes: 1 addition & 1 deletion monitoring/bentoml-plugins-arize/pyproject.toml
Expand Up @@ -30,7 +30,7 @@ classifiers = [
dynamic = ["version"]

# NOTE: Make sure to add corresponding dependencies for given components here.
dependencies = ["bentoml>=1.0.8", "arize"]
dependencies = ["bentoml", "arize>=5.2.0"]
bojiang marked this conversation as resolved.
Show resolved Hide resolved

# NOTE: If requires additional dependencies for specific components, add them here.
[project.optional-dependencies]
Expand Down
Expand Up @@ -278,10 +278,6 @@ def _map_data(


class ArizeMonitor(MonitorBase[DataType]):
""" """

PRESERVED_COLUMNS = (COLUMN_TIME, COLUMN_RID) = ("timestamp", "request_id")

def __init__(
self,
name: str,
Expand All @@ -296,9 +292,9 @@ def __init__(
model_version: str | None = None,
environment: Environments | None = None,
model_tags: dict[str, str | bool | float | int] | None = None,
**_: t.Any,
**kwargs: t.Any,
):
self.name = name
super().__init__(name, **kwargs)
bojiang marked this conversation as resolved.
Show resolved Hide resolved

# client options
if api_key is None:
Expand Down Expand Up @@ -342,30 +338,11 @@ def _init_client(self):
timeout=self.timeout,
)

def start_record(self) -> None:
"""
Start recording data. This method should be called before logging any data.
"""
self._is_first_column = True

def stop_record(self) -> None:
"""
Stop recording data. This method should be called after logging all data.
"""
if self._is_first_record:
self.export_schema()
self._is_first_record = False

if self._is_first_column:
logger.warning("No data logged in this record. Will skip this record.")
else:
self.export_data()

def export_schema(self):
def export_schema(self, columns_schema: dict[str, dict[str, str]]) -> None:
"""
Export schema of the data. This method should be called right after the first record.
"""
fields = _stat_fields(self._schema.values())
fields = _stat_fields(columns_schema.values())
mapping = _infer_mapping(fields, self.model_type)
self._data_converter = ( # pylint: disable=attribute-defined-outside-init
functools.partial(_map_data, fields=fields, mapping=mapping)
Expand All @@ -376,39 +353,37 @@ def export_schema(self):

if self.model_version is None and self.model_id is None:

self.model_id = component_context.bento_name
self.model_id = f"{component_context.bento_name}:{self.name}"
self.model_version = component_context.bento_version

if self.environment is None:
self.environment = Environments.PRODUCTION

self._init_client()

def export_data(self):
def export_data(self, datas: dict[str, collections.deque[DataType]]) -> None:
"""
Export data. This method should be called after all data is logged.
"""
assert (
len(set(len(q) for q in self._columns.values())) == 1
), "All columns must have the same length"
assert self.model_id is not None
assert self.model_type is not None
assert self.environment is not None

timestamp = datetime.datetime.now().timestamp()
prediction_id = trace_context.request_id
assert isinstance(prediction_id, int)
while True:
try:
record = {k: v.popleft() for k, v in self._columns.items()}
record = {k: v.popleft() for k, v in datas.items()}
bojiang marked this conversation as resolved.
Show resolved Hide resolved
except IndexError:
break
timestamp = record[self.COLUMN_TIME]
assert isinstance(timestamp, float)
prediction_id = record[self.COLUMN_RID]
assert isinstance(prediction_id, int)
(
prediction_label,
actual_label,
features,
embedding_features,
) = self._data_converter(record)

self._client.log(
model_id=self.model_id,
model_type=self.model_type,
Expand All @@ -423,96 +398,3 @@ def export_data(self):
features=features, # type: ignore (invariant types)
embedding_features=embedding_features,
)

def log(
self,
data: DataType,
name: str,
role: str,
data_type: str,
) -> None:
"""
log a data with column name, role and type to the current record
"""
if name in self.PRESERVED_COLUMNS:
raise ValueError(
f"Column name {name} is preserved. Please use a different name."
)

assert role in BENTOML_MONITOR_ROLES, f"Invalid role {role}"
assert data_type in BENTOML_MONITOR_TYPES, f"Invalid data type {data_type}"

if self._is_first_record:
if name not in self._schema:
self._schema[name] = {
"name": name,
"role": role,
"type": data_type,
}
else:
logger.warning(
"Column %s already exists in the schema. Will be ignored.", name
)
if self._is_first_column:
self._is_first_column = False

# universal columns
self._columns[self.COLUMN_TIME].append(datetime.datetime.now().timestamp())
assert trace_context.request_id is not None
self._columns[self.COLUMN_RID].append(trace_context.request_id)

self._columns[name].append(data)

def log_batch(
self,
data_batch: t.Iterable[DataType],
name: str,
role: str,
data_type: str,
) -> None:
"""
Log a batch of data. The data will be logged as a single column.
"""
if name in self.PRESERVED_COLUMNS:
raise ValueError(
f"Column name {name} is preserved. Please use a different name."
)

assert role in BENTOML_MONITOR_ROLES, f"Invalid role {role}"
assert data_type in BENTOML_MONITOR_TYPES, f"Invalid data type {data_type}"

if self._is_first_record:
if name not in self._schema:
self._schema[name] = {
"name": name,
"role": role,
"type": data_type,
}
else:
logger.warning(
"Column %s already exists in the schema. Will be ignored.", name
)

try:
for data in data_batch:
self._columns[name].append(data)
if self._is_first_column:
assert trace_context.request_id is not None
# universal columns
self._columns[self.COLUMN_TIME].append(
datetime.datetime.now().timestamp()
)
self._columns[self.COLUMN_RID].append(trace_context.request_id)
except TypeError:
raise ValueError(
"data_batch is not iterable. Please use log() to log a single data."
) from None
if self._is_first_column:
self._is_first_column = False

def log_table(
self,
data: t.Any, # type: pandas.DataFrame
schema: dict[str, str],
) -> None:
raise NotImplementedError("Not implemented yet")
8 changes: 5 additions & 3 deletions monitoring/bentoml-plugins-arize/tests/test_package.py
Expand Up @@ -69,8 +69,10 @@ def fixture_init_context(monkeypatch: MonkeyPatch) -> None:
def test_mapping(init_context: None) -> None:
from bentoml_plugins.arize import ArizeMonitor

monitor_name = "my_model_1"

monitor = ArizeMonitor(
"my_monitor_1",
monitor_name,
space_key=MY_SPACE_KEY,
api_key=MY_API_KEY,
uri=URI,
Expand All @@ -80,13 +82,13 @@ def test_mapping(init_context: None) -> None:
monitor.stop_record()
assert monitor.model_type == ModelTypes.SCORE_CATEGORICAL
assert Client.INIT_ARGS[-1]["space_key"] == MY_SPACE_KEY
assert Client.LOG_ARGS[-1]["model_id"] == BENTO_NAME
assert Client.LOG_ARGS[-1]["model_id"] == f"{BENTO_NAME}:{monitor_name}"
assert Client.LOG_ARGS[-1]["model_version"] == BENTO_VERSION
assert Client.LOG_ARGS[-1]["prediction_id"] == REQUEST_ID
assert Client.LOG_ARGS[-1]["model_type"] == ModelTypes.SCORE_CATEGORICAL

monitor = ArizeMonitor(
"my_monitor_1",
monitor_name,
space_key="my_space_key",
api_key="my_api_key",
uri="http://localhost:5000/v1",
Expand Down