Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make example input and PyFuncInput support csc csr matrix #5016

Merged
19 changes: 14 additions & 5 deletions mlflow/models/utils.py
Expand Up @@ -9,8 +9,9 @@
from mlflow.models import Model
from mlflow.types.utils import TensorsNotSupportedException
from mlflow.utils.proto_json_utils import NumpyEncoder, _dataframe_from_json, parse_tf_serving_input
from scipy.sparse import csr_matrix, csc_matrix
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved

ModelInputExample = Union[pd.DataFrame, np.ndarray, dict, list]
ModelInputExample = Union[pd.DataFrame, np.ndarray, dict, list, csr_matrix, csc_matrix]


class _Example(object):
Expand Down Expand Up @@ -50,25 +51,33 @@ class _Example(object):
encoded strings.
- numpy types: Numpy types are converted to the corresponding python types or their closest
equivalent.
- csc/csr matric: similar to 2 dims numpy array, csc/csr matric are converted to
corresponding python types or their closest equivalent.
"""

def __init__(self, input_example: ModelInputExample):
def _is_scalar(x):
return np.isscalar(x) or x is None

def _is_tensor(x):
return isinstance(x, np.ndarray) or (
isinstance(x, dict) and all([isinstance(ary, np.ndarray) for ary in x.values()])
return (
isinstance(x, np.ndarray)
or (
isinstance(x, dict) and all([isinstance(ary, np.ndarray) for ary in x.values()])
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
)
or isinstance(x, (csr_matrix, csc_matrix))
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
)

def _handle_tensor_input(input_tensor: Union[np.ndarray, dict]):
def _handle_tensor_input(input_tensor: Union[np.ndarray, dict, csr_matrix, csc_matrix]):
if isinstance(input_tensor, dict):
result = {}
for name in input_tensor.keys():
result[name] = input_tensor[name].tolist()
return {"inputs": result}
else:
elif isinstance(input_tensor, np.ndarray):
return {"inputs": input_tensor.tolist()}
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. can we do elif isinstance(input_tesnor,csr_matrix, csc_matrix)?
  2. does this mean we store sparse input as dense vector? or is this stored as array of indices and array of values?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I update code to store sparse input as data/indices/indptr vectors.
I also split csc/csr saving code out of the ndarray saving code.

return {"inputs": input_tensor.toarray().tolist()}

def _handle_dataframe_input(input_ex):
if isinstance(input_ex, dict):
Expand Down
13 changes: 11 additions & 2 deletions mlflow/types/utils.py
Expand Up @@ -30,8 +30,10 @@ def _get_tensor_shape(data: np.ndarray, variable_dimension: Optional[int] = 0) -
:param variable_dimension: An optional integer representing a variable dimension.
:return: tuple : Shape of the inputted data (including a variable dimension)
"""
if not isinstance(data, np.ndarray):
raise TypeError("Expected numpy.ndarray, got '{}'.".format(type(data)))
from scipy.sparse import csr_matrix, csc_matrix

if not isinstance(data, (np.ndarray, csr_matrix, csc_matrix)):
raise TypeError("Expected numpy.ndarray or csc/csr matrix, got '{}'.".format(type(data)))
variable_input_data_shape = data.shape
if variable_dimension is not None:
try:
Expand Down Expand Up @@ -90,6 +92,7 @@ def _infer_schema(data: Any) -> Schema:
- dictionary of { name -> numpy.ndarray}
- numpy.ndarray
- pyspark.sql.DataFrame
- csc/csr matrix

The element types should be mappable to one of :py:class:`mlflow.models.signature.DataType` for
dataframes and to one of numpy types for tensors.
Expand All @@ -98,6 +101,8 @@ def _infer_schema(data: Any) -> Schema:

:return: Schema
"""
from scipy.sparse import csr_matrix, csc_matrix

if isinstance(data, dict):
res = []
for name in data.keys():
Expand All @@ -122,6 +127,10 @@ def _infer_schema(data: Any) -> Schema:
schema = Schema(
[TensorSpec(type=clean_tensor_type(data.dtype), shape=_get_tensor_shape(data))]
)
elif isinstance(data, (csc_matrix, csr_matrix)):
schema = Schema(
[TensorSpec(type=clean_tensor_type(data.data.dtype), shape=_get_tensor_shape(data))]
)
elif _is_spark_df(data):
schema = Schema(
[
Expand Down
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -71,6 +71,7 @@ def package_files(directory):
"Flask",
"gunicorn; platform_system != 'Windows'",
"numpy",
"scipy",
"pandas",
"prometheus-flask-exporter",
"querystring_parser",
Expand Down
20 changes: 20 additions & 0 deletions tests/models/test_model_input_examples.py
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import pandas as pd
import pytest
from scipy.sparse import csr_matrix, csc_matrix

from mlflow.models.signature import infer_signature
from mlflow.models.utils import _Example, _read_tensor_input_from_json
Expand Down Expand Up @@ -44,6 +45,14 @@ def dict_of_ndarrays():
}


@pytest.fixture
def dict_of_sparse_matrix():
return {
"csc": csc_matrix(np.arange(0, 12, 0.5).reshape(3, 8)),
"csr": csr_matrix(np.arange(0, 12, 0.5).reshape(3, 8)),
}


def test_input_examples(pandas_df_with_all_types, dict_of_ndarrays):
sig = infer_signature(pandas_df_with_all_types)
# test setting example with data frame with all supported data types
Expand Down Expand Up @@ -117,3 +126,14 @@ def test_input_examples(pandas_df_with_all_types, dict_of_ndarrays):
filename = x.info["artifact_path"]
parsed_df = _dataframe_from_json(tmp.path(filename))
assert example == parsed_df.to_dict(orient="records")[0]


def test_sparse_matrix_input_examples(dict_of_sparse_matrix):
for col in dict_of_sparse_matrix:
input_example = dict_of_sparse_matrix[col]
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
with TempDir() as tmp:
example = _Example(input_example)
example.save(tmp.path())
filename = example.info["artifact_path"]
parsed_ary = _read_tensor_input_from_json(tmp.path(filename))
assert np.array_equal(parsed_ary, input_example.toarray())
8 changes: 6 additions & 2 deletions tests/sklearn/test_sklearn_autolog.py
Expand Up @@ -16,6 +16,7 @@
import sklearn.pipeline
import sklearn.model_selection
from scipy.stats import uniform
from scipy.sparse import csr_matrix, csc_matrix

from mlflow.exceptions import MlflowException
from mlflow.models import Model
Expand Down Expand Up @@ -849,13 +850,16 @@ def test_parameter_search_handles_large_volume_of_metric_outputs():
assert len(child_run.data.metrics) >= metrics_size


@pytest.mark.parametrize("data_type", [pd.DataFrame, np.array])
@pytest.mark.parametrize("data_type", [pd.DataFrame, np.array, csr_matrix, csc_matrix])
def test_autolog_logs_signature_and_input_example(data_type):
mlflow.sklearn.autolog(log_input_examples=True, log_model_signatures=True)

X, y = get_iris()
X = data_type(X)
y = data_type(y)
if data_type in [csr_matrix, csc_matrix]:
y = np.array(y)
else:
y = data_type(y)
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
model = sklearn.linear_model.LinearRegression()

with mlflow.start_run() as run:
Expand Down
17 changes: 17 additions & 0 deletions tests/types/test_schema.py
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import pandas as pd
import pytest
from scipy.sparse import csr_matrix, csc_matrix

from mlflow.exceptions import MlflowException
from mlflow.pyfunc import _enforce_tensor_spec
Expand Down Expand Up @@ -251,6 +252,22 @@ def test_get_tensor_shape(dict_of_ndarrays):
_infer_schema({"x": 1})


@pytest.fixture
def dict_of_sparse_matrix():
return {
"csc": csc_matrix(np.arange(0, 12, 0.5).reshape(3, 8)),
"csr": csr_matrix(np.arange(0, 12, 0.5).reshape(3, 8)),
}
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved


def test_get_sparse_matrix_data_type_and_shape(dict_of_sparse_matrix):
for col in dict_of_sparse_matrix:
sparse_matrix = dict_of_sparse_matrix[col]
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
schema = _infer_schema(sparse_matrix)
schema.numpy_types() == ["float64"]
_get_tensor_shape(sparse_matrix) == (3, 8)
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved


def test_schema_inference_on_dictionary(dict_of_ndarrays):
# test dictionary
schema = _infer_schema(dict_of_ndarrays)
Expand Down