Skip to content
This repository has been archived by the owner on May 16, 2024. It is now read-only.

refactor(arize): clean codes for the new SDK #13

Merged
merged 7 commits into from Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion monitoring/bentoml-plugins-arize/pyproject.toml
Expand Up @@ -30,7 +30,7 @@ classifiers = [
dynamic = ["version"]

# NOTE: Make sure to add corresponding dependencies for given components here.
dependencies = ["bentoml>=1.0.8", "arize"]
dependencies = ["bentoml>=1.0.9", "arize"]
bojiang marked this conversation as resolved.
Show resolved Hide resolved

# NOTE: If requires additional dependencies for specific components, add them here.
[project.optional-dependencies]
Expand Down
Expand Up @@ -296,9 +296,9 @@ def __init__(
model_version: str | None = None,
environment: Environments | None = None,
model_tags: dict[str, str | bool | float | int] | None = None,
**_: t.Any,
**kwargs: t.Any,
):
self.name = name
super().__init__(name, **kwargs)
bojiang marked this conversation as resolved.
Show resolved Hide resolved

# client options
if api_key is None:
Expand Down Expand Up @@ -342,30 +342,11 @@ def _init_client(self):
timeout=self.timeout,
)

def start_record(self) -> None:
"""
Start recording data. This method should be called before logging any data.
"""
self._is_first_column = True

def stop_record(self) -> None:
"""
Stop recording data. This method should be called after logging all data.
"""
if self._is_first_record:
self.export_schema()
self._is_first_record = False

if self._is_first_column:
logger.warning("No data logged in this record. Will skip this record.")
else:
self.export_data()

def export_schema(self):
def export_schema(self, columns_schema: dict[str, dict[str, str]]) -> None:
"""
Export schema of the data. This method should be called right after the first record.
"""
fields = _stat_fields(self._schema.values())
fields = _stat_fields(columns_schema.values())
mapping = _infer_mapping(fields, self.model_type)
self._data_converter = ( # pylint: disable=attribute-defined-outside-init
functools.partial(_map_data, fields=fields, mapping=mapping)
Expand All @@ -384,31 +365,29 @@ def export_schema(self):

self._init_client()

def export_data(self):
def export_data(self, datas: dict[str, collections.deque[DataType]]) -> None:
"""
Export data. This method should be called after all data is logged.
"""
assert (
len(set(len(q) for q in self._columns.values())) == 1
), "All columns must have the same length"
assert self.model_id is not None
assert self.model_type is not None
assert self.environment is not None

timestamp = datetime.datetime.now().timestamp()
prediction_id = trace_context.request_id
assert isinstance(prediction_id, int)
while True:
try:
record = {k: v.popleft() for k, v in self._columns.items()}
record = {k: v.popleft() for k, v in datas.items()}
bojiang marked this conversation as resolved.
Show resolved Hide resolved
except IndexError:
break
timestamp = record[self.COLUMN_TIME]
assert isinstance(timestamp, float)
prediction_id = record[self.COLUMN_RID]
assert isinstance(prediction_id, int)
(
prediction_label,
actual_label,
features,
embedding_features,
) = self._data_converter(record)

self._client.log(
model_id=self.model_id,
model_type=self.model_type,
Expand All @@ -423,96 +402,3 @@ def export_data(self):
features=features, # type: ignore (invariant types)
embedding_features=embedding_features,
)

def log(
self,
data: DataType,
name: str,
role: str,
data_type: str,
) -> None:
"""
log a data with column name, role and type to the current record
"""
if name in self.PRESERVED_COLUMNS:
raise ValueError(
f"Column name {name} is preserved. Please use a different name."
)

assert role in BENTOML_MONITOR_ROLES, f"Invalid role {role}"
assert data_type in BENTOML_MONITOR_TYPES, f"Invalid data type {data_type}"

if self._is_first_record:
if name not in self._schema:
self._schema[name] = {
"name": name,
"role": role,
"type": data_type,
}
else:
logger.warning(
"Column %s already exists in the schema. Will be ignored.", name
)
if self._is_first_column:
self._is_first_column = False

# universal columns
self._columns[self.COLUMN_TIME].append(datetime.datetime.now().timestamp())
assert trace_context.request_id is not None
self._columns[self.COLUMN_RID].append(trace_context.request_id)

self._columns[name].append(data)

def log_batch(
self,
data_batch: t.Iterable[DataType],
name: str,
role: str,
data_type: str,
) -> None:
"""
Log a batch of data. The data will be logged as a single column.
"""
if name in self.PRESERVED_COLUMNS:
raise ValueError(
f"Column name {name} is preserved. Please use a different name."
)

assert role in BENTOML_MONITOR_ROLES, f"Invalid role {role}"
assert data_type in BENTOML_MONITOR_TYPES, f"Invalid data type {data_type}"

if self._is_first_record:
if name not in self._schema:
self._schema[name] = {
"name": name,
"role": role,
"type": data_type,
}
else:
logger.warning(
"Column %s already exists in the schema. Will be ignored.", name
)

try:
for data in data_batch:
self._columns[name].append(data)
if self._is_first_column:
assert trace_context.request_id is not None
# universal columns
self._columns[self.COLUMN_TIME].append(
datetime.datetime.now().timestamp()
)
self._columns[self.COLUMN_RID].append(trace_context.request_id)
except TypeError:
raise ValueError(
"data_batch is not iterable. Please use log() to log a single data."
) from None
if self._is_first_column:
self._is_first_column = False

def log_table(
self,
data: t.Any, # type: pandas.DataFrame
schema: dict[str, str],
) -> None:
raise NotImplementedError("Not implemented yet")