Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make example input and PyFuncInput support csc csr matrix #5016

Merged
2 changes: 2 additions & 0 deletions docs/source/conf.py
Expand Up @@ -338,6 +338,8 @@
("py:class", "mlflow.models.model.Model"),
("py:class", "mlflow.models.signature.ModelSignature"),
("py:class", "MlflowInferableDataset"),
("py:class", "scipy.sparse.csr.csr_matrix"),
("py:class", "scipy.sparse.csc.csc_matrix"),
]


Expand Down
60 changes: 48 additions & 12 deletions mlflow/models/utils.py
Expand Up @@ -9,8 +9,9 @@
from mlflow.models import Model
from mlflow.types.utils import TensorsNotSupportedException
from mlflow.utils.proto_json_utils import NumpyEncoder, _dataframe_from_json, parse_tf_serving_input
from scipy.sparse import csr_matrix, csc_matrix
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved

ModelInputExample = Union[pd.DataFrame, np.ndarray, dict, list]
ModelInputExample = Union[pd.DataFrame, np.ndarray, dict, list, csr_matrix, csc_matrix]


class _Example(object):
Expand Down Expand Up @@ -50,31 +51,44 @@ class _Example(object):
encoded strings.
- numpy types: Numpy types are converted to the corresponding python types or their closest
equivalent.
- csc/csr matric: similar to 2 dims numpy array, csc/csr matric are converted to
corresponding python types or their closest equivalent.
"""

def __init__(self, input_example: ModelInputExample):
def _is_scalar(x):
return np.isscalar(x) or x is None

def _is_tensor(x):
def _is_ndarray(x):
return isinstance(x, np.ndarray) or (
isinstance(x, dict) and all([isinstance(ary, np.ndarray) for ary in x.values()])
isinstance(x, dict) and all(isinstance(ary, np.ndarray) for ary in x.values())
)

def _handle_tensor_nans(x: np.ndarray):
def _is_sparse_matrix(x):
return isinstance(x, (csc_matrix, csr_matrix))

def _handle_ndarray_nans(x: np.ndarray):
if np.issubdtype(x.dtype, np.number):
return np.where(np.isnan(x), None, x)
else:
return x

def _handle_tensor_input(input_tensor: Union[np.ndarray, dict]):
if isinstance(input_tensor, dict):
def _handle_ndarray_input(input_array: Union[np.ndarray, dict]):
if isinstance(input_array, dict):
result = {}
for name in input_tensor.keys():
result[name] = _handle_tensor_nans(input_tensor[name]).tolist()
for name in input_array.keys():
result[name] = _handle_ndarray_nans(input_array[name]).tolist()
return {"inputs": result}
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. can we do elif isinstance(input_tesnor,csr_matrix, csc_matrix)?
  2. does this mean we store sparse input as dense vector? or is this stored as array of indices and array of values?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I update code to store sparse input as data/indices/indptr vectors.
I also split csc/csr saving code out of the ndarray saving code.

return {"inputs": _handle_tensor_nans(input_tensor).tolist()}
return {"inputs": _handle_ndarray_nans(input_array).tolist()}

def _handle_sparse_matrix(x: Union[csr_matrix, csc_matrix]):
return {
"data": _handle_ndarray_nans(x.data).tolist(),
"indices": x.indices.tolist(),
"indptr": x.indptr.tolist(),
"shape": list(x.shape),
}

def _handle_dataframe_nans(df: pd.DataFrame):
return df.where(df.notnull(), None)
Expand Down Expand Up @@ -123,13 +137,19 @@ def _handle_dataframe_input(input_ex):
return result

example_filename = "input_example.json"
if _is_tensor(input_example):
self.data = _handle_tensor_input(input_example)
if _is_ndarray(input_example):
self.data = _handle_ndarray_input(input_example)
self.info = {
"artifact_path": example_filename,
"type": "ndarray",
"format": "tf-serving",
}
elif _is_sparse_matrix(input_example):
self.data = _handle_sparse_matrix(input_example)
self.info = {
"artifact_path": example_filename,
"type": "csc" if isinstance(input_example, csc_matrix) else "csr",
}
else:
self.data = _handle_dataframe_input(input_example)
self.info = {
Expand Down Expand Up @@ -178,14 +198,16 @@ def _read_example(mlflow_model: Model, path: str):
if mlflow_model.saved_input_example_info is None:
return None
example_type = mlflow_model.saved_input_example_info["type"]
if example_type not in ["dataframe", "ndarray"]:
if example_type not in ["dataframe", "ndarray", "csc", "csr"]:
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
raise MlflowException(
"This version of mlflow can not load example of type {}".format(example_type)
)
input_schema = mlflow_model.signature.inputs if mlflow_model.signature is not None else None
path = os.path.join(path, mlflow_model.saved_input_example_info["artifact_path"])
if example_type == "ndarray":
return _read_tensor_input_from_json(path, schema=input_schema)
elif example_type in ["csc", "csr"]:
return _read_sparse_matrix_from_json(path, example_type)
else:
return _dataframe_from_json(path, schema=input_schema, precise_float=True)

Expand All @@ -194,3 +216,17 @@ def _read_tensor_input_from_json(path, schema=None):
with open(path, "r") as handle:
inp_dict = json.load(handle)
return parse_tf_serving_input(inp_dict, schema)


def _read_sparse_matrix_from_json(path, example_type):
with open(path, "r") as handle:
matrix_data = json.load(handle)
data = matrix_data["data"]
indices = matrix_data["indices"]
indptr = matrix_data["indptr"]
shape = tuple(matrix_data["shape"])

if example_type == "csc":
return csc_matrix((data, indices, indptr), shape=shape)
else:
return csr_matrix((data, indices, indptr), shape=shape)
14 changes: 10 additions & 4 deletions mlflow/pyfunc/__init__.py
Expand Up @@ -250,6 +250,7 @@
RESOURCE_ALREADY_EXISTS,
RESOURCE_DOES_NOT_EXIST,
)
from scipy.sparse import csc_matrix, csr_matrix

FLAVOR_NAME = "python_function"
MAIN = "loader_module"
Expand All @@ -259,7 +260,7 @@
PY_VERSION = "python_version"

_logger = logging.getLogger(__name__)
PyFuncInput = Union[pandas.DataFrame, np.ndarray, List[Any], Dict[str, Any]]
PyFuncInput = Union[pandas.DataFrame, np.ndarray, csc_matrix, csr_matrix, List[Any], Dict[str, Any]]
PyFuncOutput = Union[pandas.DataFrame, pandas.Series, np.ndarray, list]


Expand Down Expand Up @@ -403,12 +404,17 @@ def all_ints(xs):
)


def _enforce_tensor_spec(values: np.ndarray, tensor_spec: TensorSpec):
def _enforce_tensor_spec(
values: Union[np.ndarray, csc_matrix, csr_matrix], tensor_spec: TensorSpec
):
"""
Enforce the input tensor shape and type matches the provided tensor spec.
"""
expected_shape = tensor_spec.shape
actual_shape = values.shape

actual_type = values.dtype if isinstance(values, np.ndarray) else values.data.dtype

if len(expected_shape) != len(actual_shape):
raise MlflowException(
"Shape of input {0} does not match expected shape {1}.".format(
Expand All @@ -424,7 +430,7 @@ def _enforce_tensor_spec(values: np.ndarray, tensor_spec: TensorSpec):
actual_shape, expected_shape
)
)
if clean_tensor_type(values.dtype) != tensor_spec.type:
if clean_tensor_type(actual_type) != tensor_spec.type:
raise MlflowException(
"dtype of input {0} does not match expected dtype {1}".format(
values.dtype, tensor_spec.type
Expand Down Expand Up @@ -476,7 +482,7 @@ def _enforce_tensor_schema(pfInput: PyFuncInput, input_schema: Schema):
else:
if isinstance(pfInput, pandas.DataFrame):
new_pfInput = _enforce_tensor_spec(pfInput.to_numpy(), input_schema.inputs[0])
elif isinstance(pfInput, np.ndarray):
elif isinstance(pfInput, (np.ndarray, csc_matrix, csr_matrix)):
new_pfInput = _enforce_tensor_spec(pfInput, input_schema.inputs[0])
else:
raise MlflowException(
Expand Down
15 changes: 12 additions & 3 deletions mlflow/types/utils.py
Expand Up @@ -17,7 +17,7 @@ def __init__(self, msg):
)


def _get_tensor_shape(data: np.ndarray, variable_dimension: Optional[int] = 0) -> tuple:
def _get_tensor_shape(data, variable_dimension: Optional[int] = 0) -> tuple:
"""
Infer the shape of the inputted data.

Expand All @@ -30,8 +30,10 @@ def _get_tensor_shape(data: np.ndarray, variable_dimension: Optional[int] = 0) -
:param variable_dimension: An optional integer representing a variable dimension.
:return: tuple : Shape of the inputted data (including a variable dimension)
"""
if not isinstance(data, np.ndarray):
raise TypeError("Expected numpy.ndarray, got '{}'.".format(type(data)))
from scipy.sparse import csr_matrix, csc_matrix

if not isinstance(data, (np.ndarray, csr_matrix, csc_matrix)):
raise TypeError("Expected numpy.ndarray or csc/csr matrix, got '{}'.".format(type(data)))
variable_input_data_shape = data.shape
if variable_dimension is not None:
try:
Expand Down Expand Up @@ -90,6 +92,7 @@ def _infer_schema(data: Any) -> Schema:
- dictionary of { name -> numpy.ndarray}
- numpy.ndarray
- pyspark.sql.DataFrame
- csc/csr matrix

The element types should be mappable to one of :py:class:`mlflow.models.signature.DataType` for
dataframes and to one of numpy types for tensors.
Expand All @@ -98,6 +101,8 @@ def _infer_schema(data: Any) -> Schema:

:return: Schema
"""
from scipy.sparse import csr_matrix, csc_matrix

if isinstance(data, dict):
res = []
for name in data.keys():
Expand All @@ -122,6 +127,10 @@ def _infer_schema(data: Any) -> Schema:
schema = Schema(
[TensorSpec(type=clean_tensor_type(data.dtype), shape=_get_tensor_shape(data))]
)
elif isinstance(data, (csc_matrix, csr_matrix)):
schema = Schema(
[TensorSpec(type=clean_tensor_type(data.data.dtype), shape=_get_tensor_shape(data))]
)
elif _is_spark_df(data):
schema = Schema(
[
Expand Down
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -71,6 +71,7 @@ def package_files(directory):
"Flask",
"gunicorn; platform_system != 'Windows'",
"numpy",
"scipy",
"pandas",
"prometheus-flask-exporter",
"querystring_parser",
Expand Down
25 changes: 24 additions & 1 deletion tests/models/test_model_input_examples.py
Expand Up @@ -3,9 +3,14 @@
import numpy as np
import pandas as pd
import pytest
from scipy.sparse import csr_matrix, csc_matrix

from mlflow.models.signature import infer_signature
from mlflow.models.utils import _Example, _read_tensor_input_from_json
from mlflow.models.utils import (
_Example,
_read_tensor_input_from_json,
_read_sparse_matrix_from_json,
)
from mlflow.types.utils import TensorsNotSupportedException
from mlflow.utils.file_utils import TempDir
from mlflow.utils.proto_json_utils import _dataframe_from_json
Expand Down Expand Up @@ -68,6 +73,14 @@ def dict_of_ndarrays_with_nans():
}


@pytest.fixture
def dict_of_sparse_matrix():
return {
"csc": csc_matrix(np.arange(0, 12, 0.5).reshape(3, 8)),
"csr": csr_matrix(np.arange(0, 12, 0.5).reshape(3, 8)),
}


def test_input_examples(pandas_df_with_all_types, dict_of_ndarrays):
sig = infer_signature(pandas_df_with_all_types)
# test setting example with data frame with all supported data types
Expand Down Expand Up @@ -143,6 +156,16 @@ def test_input_examples(pandas_df_with_all_types, dict_of_ndarrays):
assert example == parsed_df.to_dict(orient="records")[0]


def test_sparse_matrix_input_examples(dict_of_sparse_matrix):
for example_type, input_example in dict_of_sparse_matrix.items():
with TempDir() as tmp:
example = _Example(input_example)
example.save(tmp.path())
filename = example.info["artifact_path"]
parsed_matrix = _read_sparse_matrix_from_json(tmp.path(filename), example_type)
assert np.array_equal(parsed_matrix.toarray(), input_example.toarray())


def test_input_examples_with_nan(df_with_nan, dict_of_ndarrays_with_nans):
# test setting example with data frame with NaN values in it
sig = infer_signature(df_with_nan)
Expand Down
8 changes: 6 additions & 2 deletions tests/sklearn/test_sklearn_autolog.py
Expand Up @@ -16,6 +16,7 @@
import sklearn.pipeline
import sklearn.model_selection
from scipy.stats import uniform
from scipy.sparse import csr_matrix, csc_matrix

from mlflow.exceptions import MlflowException
from mlflow.models import Model
Expand Down Expand Up @@ -857,13 +858,16 @@ def test_parameter_search_handles_large_volume_of_metric_outputs():
assert len(child_run.data.metrics) >= metrics_size


@pytest.mark.parametrize("data_type", [pd.DataFrame, np.array])
@pytest.mark.parametrize("data_type", [pd.DataFrame, np.array, csr_matrix, csc_matrix])
def test_autolog_logs_signature_and_input_example(data_type):
mlflow.sklearn.autolog(log_input_examples=True, log_model_signatures=True)

X, y = get_iris()
X = data_type(X)
y = data_type(y)
if data_type in [csr_matrix, csc_matrix]:
y = np.array(y)
else:
y = data_type(y)
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
model = sklearn.linear_model.LinearRegression()

with mlflow.start_run() as run:
Expand Down
16 changes: 16 additions & 0 deletions tests/types/test_schema.py
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import pandas as pd
import pytest
from scipy.sparse import csr_matrix, csc_matrix

from mlflow.exceptions import MlflowException
from mlflow.pyfunc import _enforce_tensor_spec
Expand Down Expand Up @@ -253,6 +254,21 @@ def test_get_tensor_shape(dict_of_ndarrays):
_infer_schema({"x": 1})


@pytest.fixture
def dict_of_sparse_matrix():
return {
"csc": csc_matrix(np.arange(0, 12, 0.5).reshape(3, 8)),
"csr": csr_matrix(np.arange(0, 12, 0.5).reshape(3, 8)),
}
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved


def test_get_sparse_matrix_data_type_and_shape(dict_of_sparse_matrix):
for sparse_matrix in dict_of_sparse_matrix.values():
schema = _infer_schema(sparse_matrix)
assert schema.numpy_types() == ["float64"]
assert _get_tensor_shape(sparse_matrix) == (-1, 8)


def test_schema_inference_on_dictionary(dict_of_ndarrays):
# test dictionary
schema = _infer_schema(dict_of_ndarrays)
Expand Down