Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement model evaluation API (part 1) #5069

Merged
merged 61 commits into from Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
e2c5aff
init
WeichenXu123 Nov 15, 2021
91a6e1c
update
WeichenXu123 Nov 15, 2021
071e86b
update
WeichenXu123 Nov 16, 2021
e7d8a76
update
WeichenXu123 Nov 16, 2021
7499ac6
update
WeichenXu123 Nov 16, 2021
f1819a4
update
WeichenXu123 Nov 16, 2021
aa397e1
update
WeichenXu123 Nov 16, 2021
981164a
update
WeichenXu123 Nov 16, 2021
593bec8
update
WeichenXu123 Nov 17, 2021
2240d0a
update
WeichenXu123 Nov 18, 2021
0e54b97
update
WeichenXu123 Nov 18, 2021
6ee4367
update
WeichenXu123 Nov 19, 2021
15712e0
update
WeichenXu123 Nov 19, 2021
63b1d2b
update
WeichenXu123 Nov 21, 2021
afc52e8
update
WeichenXu123 Nov 21, 2021
feddc1e
rename module
WeichenXu123 Nov 21, 2021
27814cf
address comments
WeichenXu123 Nov 22, 2021
fc9cf1a
address comments
WeichenXu123 Nov 22, 2021
1f0c12d
Merge branch 'master' into eval_api
WeichenXu123 Nov 22, 2021
d642309
revert black change
WeichenXu123 Nov 22, 2021
2a67297
change module path
WeichenXu123 Nov 22, 2021
66f760d
update
WeichenXu123 Nov 22, 2021
f7d6738
update
WeichenXu123 Nov 23, 2021
c7f0360
update
WeichenXu123 Nov 23, 2021
c2b673e
update
WeichenXu123 Nov 23, 2021
b046af3
address comments
WeichenXu123 Nov 25, 2021
d7620e6
Merge branch 'master' into eval_api
WeichenXu123 Nov 27, 2021
14c8e82
fix
WeichenXu123 Nov 28, 2021
25101f9
refactor
WeichenXu123 Nov 28, 2021
a4db525
lazy load pyspark
WeichenXu123 Nov 28, 2021
035346f
revert export
WeichenXu123 Nov 28, 2021
3422781
fix curcit import
WeichenXu123 Nov 28, 2021
0ad527e
update tests
WeichenXu123 Nov 28, 2021
2ea29c6
fix conftest.py
WeichenXu123 Nov 29, 2021
6bcbb0c
Revert "fix conftest.py"
WeichenXu123 Nov 29, 2021
46f3264
fix tests
WeichenXu123 Nov 29, 2021
d310359
update doc
WeichenXu123 Nov 29, 2021
1b3519b
update
WeichenXu123 Nov 30, 2021
09099de
default evaluator
WeichenXu123 Nov 30, 2021
2f3faa6
update
WeichenXu123 Nov 30, 2021
9aff5e8
fix
WeichenXu123 Nov 30, 2021
1dd0617
fix
WeichenXu123 Dec 1, 2021
57012d3
address comments
WeichenXu123 Dec 1, 2021
cf5cb34
merge master
WeichenXu123 Dec 1, 2021
21538cb
fix doc
WeichenXu123 Dec 1, 2021
812f0df
fix doc
WeichenXu123 Dec 1, 2021
cc2ac8e
update import
WeichenXu123 Dec 1, 2021
226d54b
fix doc
WeichenXu123 Dec 1, 2021
c20ccdc
update hash algo
WeichenXu123 Dec 1, 2021
68842fe
update import
WeichenXu123 Dec 2, 2021
755639b
address comment
WeichenXu123 Dec 2, 2021
73cd704
add tests
WeichenXu123 Dec 2, 2021
010f225
fix lint
WeichenXu123 Dec 2, 2021
e2f9c89
add tests
WeichenXu123 Dec 3, 2021
22b0bbd
Merge branch 'master' into eval_api
WeichenXu123 Dec 3, 2021
b1b34f8
add more tests
WeichenXu123 Dec 3, 2021
a51bd1f
add tests
WeichenXu123 Dec 3, 2021
c82eaa6
fix lint
WeichenXu123 Dec 3, 2021
5413290
address comments
WeichenXu123 Dec 6, 2021
7d14373
export EvaluationDataset
WeichenXu123 Dec 6, 2021
779f22e
update _load_content_from_file
WeichenXu123 Dec 6, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
354 changes: 354 additions & 0 deletions mlflow/evaluation.py
@@ -0,0 +1,354 @@
from typing import Dict, Union
import entrypoints
import warnings
import mlflow
import hashlib
import time
import numpy as np
import pandas as pd
import pickle
from mlflow.exceptions import MlflowException
from mlflow.utils.file_utils import TempDir
from mlflow.entities import Metric
from mlflow.tracking.artifact_utils import get_artifact_uri, _download_artifact_from_uri


class EvaluationMetrics(dict):
pass


class EvaluationArtifacts:
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, location, content=None):
self._content = content
self._location = location

def load_content_from_file(self, local_artifact_file):
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
raise NotImplementedError()

@property
def content(self):
"""
The content of the artifact (representation varies)
"""
if self._content is None:
with TempDir() as temp_dir:
local_artifact_file = temp_dir.path('local_artifact')
_download_artifact_from_uri(self._location, local_artifact_file)
self._content = self.load_content_from_file(local_artifact_file)

return self._content

@property
def location(self) -> str:
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
"""
The location of the artifact
"""
return self._location

def __getstate__(self, state):
state = state.__dict__.copy()
# skip pickling artifact content
del state['_content']
return state
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved

def __setstate__(self, state):
self.__dict__.update(state)


class EvaluationResult:

def __init__(self, metrics, artifacts):
self._metrics = metrics
self._artifacts = artifacts

@classmethod
def load(cls, path):
"""Load the evaluation results from the specified local filesystem path"""
with open(path, 'r') as f:
obj = pickle.load(f)
return obj

def save(self, path):
"""Write the evaluation results to the specified local filesystem path"""
# We will likely avoid serializing artifacts themselves, just locations.
# Deserialization will resolve locations to artifact contents.
with open(path, 'w') as f:
pickle.dump(self, f)
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved

@property
def metrics(self) -> EvaluationMetrics:
"""
A dictionary mapping scalar metric names to scalar metric values
"""
return self._metrics

@property
def artifacts(self) -> Dict[str, EvaluationArtifacts]:
"""
A dictionary mapping standardized artifact names (e.g. "roc_data") to
artifact content and location information
"""
return self._artifacts


class EvaluationDataset:
"""
Represents an input dataset for model evaluation. This is intended for
use with the `mlflow.evaluate()`API.
"""

NUM_SAMPLE_ROWS_FOR_HASH = 5

def __init__(self, data, labels=None, name=None, path=None):
"""
:param data: One of the following:
- A numpy array or list of evaluation features, excluding labels.
- A Pandas DataFrame, or the path to a serialized DataFrame,
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
containing evaluation features and labels. All columns will be regarded as feature
columns except the "labels" column.

:param labels: One of the following:
- A numpy array or list of evaluation labels, if `data` is also a numpy array or list.
- The string name of a column from `data` that contains evaluation labels, if `data`
is a DataFrame.

:param name: (Optional) The name of the dataset (must not contain ").
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved

:param path: (Optional) the path to a serialized DataFrame
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
(e.g. a delta table, parquet file)
"""
self.name = name
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
self.data = data
self.labels = labels
self.path = path

@staticmethod
def _gen_md5_for_arraylike_obj(md5_gen, data):
md5_gen.update(pickle.dumps(len(data)))
if len(data) < EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH * 2:
md5_gen.update(pickle.dumps(data))
else:
md5_gen.update(pickle.dumps(data[:EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH]))
md5_gen.update(pickle.dumps(data[-EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH:]))

@property
def hash(self):
"""
Compute a hash from the specified dataset by selecting the first 5 records, last 5 records,
dataset size and feeding them through a cheap, low-collision hash function
"""
md5_gen = hashlib.md5()
if isinstance(self.data, np.ndarray):
EvaluationDataset._gen_md5_for_arraylike_obj(md5_gen, self.data)
EvaluationDataset._gen_md5_for_arraylike_obj(md5_gen, self.labels)
elif isinstance(self.data, pd.DataFrame):
EvaluationDataset._gen_md5_for_arraylike_obj(md5_gen, self.data)
md5_gen.update(self.labels.encode('UTF-8'))
return md5_gen.digest()

@property
def metadata(self):
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
return {
'name': self.name,
'hash': self.hash,
'path': self.path,
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
}


class GetOrCreateRunId:
"""
Get or create a run, return a run_id
if user specified a run_id, use it.
otherwise if there's an active run, use it
otherwise create a managed run.
"""
def __init__(self, run_id):
self.managed_run_context = None
if run_id is not None:
self.run_id = run_id
elif mlflow.active_run() is not None:
self.run_id = mlflow.active_run().info.run_id
else:
self.run_id = None

def __enter__(self):
if self.run_id is not None:
return self.run_id
else:
self.managed_run_context = mlflow.start_run()
return self.managed_run_context.__enter__().info.run_id

def __exit__(self, exc_type, exc_val, exc_tb):
if self.managed_run_context is not None:
return self.managed_run_context.__exit__(exc_type, exc_val, exc_tb)


class ModelEvaluator:

def can_evaluate(
self, model_type, evaluator_config=None, **kwargs
) -> bool:
"""
:param model_type: A string describing the model type (e.g., "regressor",
"classifier", …).
:param evaluator_config: A dictionary of additional configurations for
the evaluator.
:param **kwargs: For forwards compatibility, a placeholder for additional
arguments that may be added to the evaluation interface
in the future.
:return: True if the evaluator can evaluate the specified model on the
specified dataset. False otherwise.
"""
raise NotImplementedError()

def compute_metrics(self, predict, dataset):
"""
return an instance of EvaluationMetrics
"""
raise NotImplementedError()

def compute_and_log_artifacts(self, predict, dataset, run_id, mlflow_client):
"""
compute and log artifact, and return a dict of
artifact_name -> instance_of_EvaluationArtifacts
"""
raise NotImplementedError()

def evaluate(
self, predict, dataset, run_id=None, evaluator_config=None, **kwargs
) -> EvaluationResult:
"""
:param predict: A function used to compute model predictions. Predict
accepts features from the specified `dataset` and
feeds them to the model, producing output predictions.
:param dataset: An instance of `EvaluationDataset` containing features
and labels (optional) for model evaluation.
:param run_id: The ID of the MLflow Run to which to log results.
:param evaluator_config: A dictionary of additional configurations for
the evaluator.
:param **kwargs: For forwards compatibility, a placeholder for additional
arguments that may be added to the evaluation interface
in the future.
:return: An `EvaluationResult` instance containing evaluation results.
"""
client = mlflow.tracking.MlflowClient()
with GetOrCreateRunId(run_id) as run_id:
metrics_dict = self.compute_metrics(predict, dataset)
timestamp = int(time.time() * 1000)
dataset_id = dataset.name if dataset.name is not None else dataset.hash
# TODO: log tags of dataset metadata
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
client.log_batch(
run_id,
metrics=[
Metric(key=f'{key}_on_{dataset_id}', value=value, timestamp=timestamp, step=0)
for key, value in metrics_dict
],
)
artifact_dict = self.compute_and_log_artifact(predict, dataset, run_id, client)
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
return EvaluationResult(metrics_dict, artifact_dict)


class ModelEvaluatorRegistry:
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
"""
Scheme-based registry for model evaluator implementations
"""

def __init__(self):
self._registry = {}

def register(self, scheme, evaluator):
"""Register model evaluator provided by other packages"""
self._registry[scheme] = evaluator

def register_entrypoints(self):
# Register artifact repositories provided by other packages
for entrypoint in entrypoints.get_group_all("mlflow.model_evaluator"):
try:
self.register(entrypoint.name, entrypoint.load())
except (AttributeError, ImportError) as exc:
warnings.warn(
'Failure attempting to register model evaluator for scheme "{}": {}'.format(
entrypoint.name, str(exc)
),
stacklevel=2,
)

def get_evaluator(self, evaluator_name):
"""
Get an evaluator instance from the registry based on the name of evaluator
"""
evaluator_cls = self._registry.get(evaluator_name)
if evaluator_cls is None:
raise MlflowException(
"Could not find a registered model evaluator for: {}. "
"Currently registered evaluator names are: {}".format(
evaluator_name, list(self._registry.keys())
)
)
return evaluator_cls()


_model_evaluation_registry = ModelEvaluatorRegistry()
_model_evaluation_registry.register_entrypoints()


def evaluate(
model, model_type, dataset, run_id=None, evaluators=None, evaluator_config=None
) -> Union[EvaluationResult, Dict[str, EvaluationResult]]:
"""
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
:param model: A model supported by the specified `evaluator`, or a URI
referring to such a model. The default evaluator supports the
following:

- A pyfunc model instance (an instance of class `PyFuncModel`)

:param model_type: A string describing the model type. The default evaluator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to add model_type to Model class, so that we can make it optional arg here? @dbczumar

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this in future, but not now.

supports "regressor" and "classifier" as model types.
:param dataset: An instance of `EvaluationDataset` containing features
labels (optional) for model evaluation.
:param run_id: The ID of the MLflow Run to which to log results. If
unspecified, behavior depends on the specified `evaluator`.
When `run_id` is unspecified, the default evaluator logs
results to the current active run, creating a new active run if
one does not exist.
:param evaluators: The name of the evaluator to use for model evaluations, or
a list of evaluator names. If unspecified, all evaluators
capable of evaluating the specified model on the specified
dataset are used.
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
:param evaluator_config: A dictionary of additional configurations to supply
to the evaluator. If multiple evaluators are
specified, each configuration should be supplied as
a nested dictionary whose key is the evaluator name.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put it clear it should be always a nested dict containing KVs with evaluator name to dict to maintain consistency.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rephrase your description here ? @jinzhang21

:return: An `EvaluationResult` instance containing evaluation results.
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
"""
if evaluators is None:
evaluators = 'default_evaluator'

if not isinstance(evaluators, list):
evaluators = [evaluators]
evaluator_config = {evaluators[0]: evaluator_config}

if isinstance(model, str):
model = mlflow.pyfunc.load_model(model)

predict = model.predict

eval_results = []
for evaluator_name in evaluators:
config = evaluator_config[evaluator_name]
try:
evaluator = _model_evaluation_registry.get_evaluator(evaluator_name)
except MlflowException:
continue

if evaluator.can_evaluate(model_type, config):
result = evaluator.evaluate(predict, dataset, run_id, config)
eval_results.append(result)

merged_eval_result = EvaluationResult(EvaluationMetrics(), dict())
for eval_result in eval_results:
merged_eval_result.metrics.update(eval_result.metrics)
merged_eval_result.artifacts.update(eval_result.artifacts)

return merged_eval_result