Skip to content

Commit

Permalink
Make example input and PyFuncInput support csc csr matrix (#5016)
Browse files Browse the repository at this point in the history
* init

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* update

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* address comments

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* add tests

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* fix lint

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* address comments

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* suppress doc build warning

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* update

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* update

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* update

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* update load example

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* make pyFuncInput support csc/csr

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* fix tests

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* update example type

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>

* fix

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
  • Loading branch information
WeichenXu123 committed Dec 7, 2021
1 parent dfabf6c commit 66820dd
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 22 deletions.
2 changes: 2 additions & 0 deletions docs/source/conf.py
Expand Up @@ -338,6 +338,8 @@
("py:class", "mlflow.models.model.Model"),
("py:class", "mlflow.models.signature.ModelSignature"),
("py:class", "MlflowInferableDataset"),
("py:class", "scipy.sparse.csr.csr_matrix"),
("py:class", "scipy.sparse.csc.csc_matrix"),
]


Expand Down
64 changes: 52 additions & 12 deletions mlflow/models/utils.py
Expand Up @@ -9,8 +9,9 @@
from mlflow.models import Model
from mlflow.types.utils import TensorsNotSupportedException
from mlflow.utils.proto_json_utils import NumpyEncoder, _dataframe_from_json, parse_tf_serving_input
from scipy.sparse import csr_matrix, csc_matrix

ModelInputExample = Union[pd.DataFrame, np.ndarray, dict, list]
ModelInputExample = Union[pd.DataFrame, np.ndarray, dict, list, csr_matrix, csc_matrix]


class _Example(object):
Expand Down Expand Up @@ -50,31 +51,44 @@ class _Example(object):
encoded strings.
- numpy types: Numpy types are converted to the corresponding python types or their closest
equivalent.
- csc/csr matric: similar to 2 dims numpy array, csc/csr matric are converted to
corresponding python types or their closest equivalent.
"""

def __init__(self, input_example: ModelInputExample):
def _is_scalar(x):
return np.isscalar(x) or x is None

def _is_tensor(x):
def _is_ndarray(x):
return isinstance(x, np.ndarray) or (
isinstance(x, dict) and all([isinstance(ary, np.ndarray) for ary in x.values()])
isinstance(x, dict) and all(isinstance(ary, np.ndarray) for ary in x.values())
)

def _handle_tensor_nans(x: np.ndarray):
def _is_sparse_matrix(x):
return isinstance(x, (csc_matrix, csr_matrix))

def _handle_ndarray_nans(x: np.ndarray):
if np.issubdtype(x.dtype, np.number):
return np.where(np.isnan(x), None, x)
else:
return x

def _handle_tensor_input(input_tensor: Union[np.ndarray, dict]):
if isinstance(input_tensor, dict):
def _handle_ndarray_input(input_array: Union[np.ndarray, dict]):
if isinstance(input_array, dict):
result = {}
for name in input_tensor.keys():
result[name] = _handle_tensor_nans(input_tensor[name]).tolist()
for name in input_array.keys():
result[name] = _handle_ndarray_nans(input_array[name]).tolist()
return {"inputs": result}
else:
return {"inputs": _handle_tensor_nans(input_tensor).tolist()}
return {"inputs": _handle_ndarray_nans(input_array).tolist()}

def _handle_sparse_matrix(x: Union[csr_matrix, csc_matrix]):
return {
"data": _handle_ndarray_nans(x.data).tolist(),
"indices": x.indices.tolist(),
"indptr": x.indptr.tolist(),
"shape": list(x.shape),
}

def _handle_dataframe_nans(df: pd.DataFrame):
return df.where(df.notnull(), None)
Expand Down Expand Up @@ -123,13 +137,23 @@ def _handle_dataframe_input(input_ex):
return result

example_filename = "input_example.json"
if _is_tensor(input_example):
self.data = _handle_tensor_input(input_example)
if _is_ndarray(input_example):
self.data = _handle_ndarray_input(input_example)
self.info = {
"artifact_path": example_filename,
"type": "ndarray",
"format": "tf-serving",
}
elif _is_sparse_matrix(input_example):
self.data = _handle_sparse_matrix(input_example)
if isinstance(input_example, csc_matrix):
example_type = "sparse_matrix_csc"
else:
example_type = "sparse_matrix_csr"
self.info = {
"artifact_path": example_filename,
"type": example_type,
}
else:
self.data = _handle_dataframe_input(input_example)
self.info = {
Expand Down Expand Up @@ -178,14 +202,16 @@ def _read_example(mlflow_model: Model, path: str):
if mlflow_model.saved_input_example_info is None:
return None
example_type = mlflow_model.saved_input_example_info["type"]
if example_type not in ["dataframe", "ndarray"]:
if example_type not in ["dataframe", "ndarray", "sparse_matrix_csc", "sparse_matrix_csr"]:
raise MlflowException(
"This version of mlflow can not load example of type {}".format(example_type)
)
input_schema = mlflow_model.signature.inputs if mlflow_model.signature is not None else None
path = os.path.join(path, mlflow_model.saved_input_example_info["artifact_path"])
if example_type == "ndarray":
return _read_tensor_input_from_json(path, schema=input_schema)
elif example_type in ["sparse_matrix_csc", "sparse_matrix_csr"]:
return _read_sparse_matrix_from_json(path, example_type)
else:
return _dataframe_from_json(path, schema=input_schema, precise_float=True)

Expand All @@ -194,3 +220,17 @@ def _read_tensor_input_from_json(path, schema=None):
with open(path, "r") as handle:
inp_dict = json.load(handle)
return parse_tf_serving_input(inp_dict, schema)


def _read_sparse_matrix_from_json(path, example_type):
with open(path, "r") as handle:
matrix_data = json.load(handle)
data = matrix_data["data"]
indices = matrix_data["indices"]
indptr = matrix_data["indptr"]
shape = tuple(matrix_data["shape"])

if example_type == "sparse_matrix_csc":
return csc_matrix((data, indices, indptr), shape=shape)
else:
return csr_matrix((data, indices, indptr), shape=shape)
14 changes: 10 additions & 4 deletions mlflow/pyfunc/__init__.py
Expand Up @@ -250,6 +250,7 @@
RESOURCE_ALREADY_EXISTS,
RESOURCE_DOES_NOT_EXIST,
)
from scipy.sparse import csc_matrix, csr_matrix

FLAVOR_NAME = "python_function"
MAIN = "loader_module"
Expand All @@ -259,7 +260,7 @@
PY_VERSION = "python_version"

_logger = logging.getLogger(__name__)
PyFuncInput = Union[pandas.DataFrame, np.ndarray, List[Any], Dict[str, Any]]
PyFuncInput = Union[pandas.DataFrame, np.ndarray, csc_matrix, csr_matrix, List[Any], Dict[str, Any]]
PyFuncOutput = Union[pandas.DataFrame, pandas.Series, np.ndarray, list]


Expand Down Expand Up @@ -403,12 +404,17 @@ def all_ints(xs):
)


def _enforce_tensor_spec(values: np.ndarray, tensor_spec: TensorSpec):
def _enforce_tensor_spec(
values: Union[np.ndarray, csc_matrix, csr_matrix], tensor_spec: TensorSpec
):
"""
Enforce the input tensor shape and type matches the provided tensor spec.
"""
expected_shape = tensor_spec.shape
actual_shape = values.shape

actual_type = values.dtype if isinstance(values, np.ndarray) else values.data.dtype

if len(expected_shape) != len(actual_shape):
raise MlflowException(
"Shape of input {0} does not match expected shape {1}.".format(
Expand All @@ -424,7 +430,7 @@ def _enforce_tensor_spec(values: np.ndarray, tensor_spec: TensorSpec):
actual_shape, expected_shape
)
)
if clean_tensor_type(values.dtype) != tensor_spec.type:
if clean_tensor_type(actual_type) != tensor_spec.type:
raise MlflowException(
"dtype of input {0} does not match expected dtype {1}".format(
values.dtype, tensor_spec.type
Expand Down Expand Up @@ -476,7 +482,7 @@ def _enforce_tensor_schema(pfInput: PyFuncInput, input_schema: Schema):
else:
if isinstance(pfInput, pandas.DataFrame):
new_pfInput = _enforce_tensor_spec(pfInput.to_numpy(), input_schema.inputs[0])
elif isinstance(pfInput, np.ndarray):
elif isinstance(pfInput, (np.ndarray, csc_matrix, csr_matrix)):
new_pfInput = _enforce_tensor_spec(pfInput, input_schema.inputs[0])
else:
raise MlflowException(
Expand Down
15 changes: 12 additions & 3 deletions mlflow/types/utils.py
Expand Up @@ -17,7 +17,7 @@ def __init__(self, msg):
)


def _get_tensor_shape(data: np.ndarray, variable_dimension: Optional[int] = 0) -> tuple:
def _get_tensor_shape(data, variable_dimension: Optional[int] = 0) -> tuple:
"""
Infer the shape of the inputted data.
Expand All @@ -30,8 +30,10 @@ def _get_tensor_shape(data: np.ndarray, variable_dimension: Optional[int] = 0) -
:param variable_dimension: An optional integer representing a variable dimension.
:return: tuple : Shape of the inputted data (including a variable dimension)
"""
if not isinstance(data, np.ndarray):
raise TypeError("Expected numpy.ndarray, got '{}'.".format(type(data)))
from scipy.sparse import csr_matrix, csc_matrix

if not isinstance(data, (np.ndarray, csr_matrix, csc_matrix)):
raise TypeError("Expected numpy.ndarray or csc/csr matrix, got '{}'.".format(type(data)))
variable_input_data_shape = data.shape
if variable_dimension is not None:
try:
Expand Down Expand Up @@ -90,6 +92,7 @@ def _infer_schema(data: Any) -> Schema:
- dictionary of { name -> numpy.ndarray}
- numpy.ndarray
- pyspark.sql.DataFrame
- csc/csr matrix
The element types should be mappable to one of :py:class:`mlflow.models.signature.DataType` for
dataframes and to one of numpy types for tensors.
Expand All @@ -98,6 +101,8 @@ def _infer_schema(data: Any) -> Schema:
:return: Schema
"""
from scipy.sparse import csr_matrix, csc_matrix

if isinstance(data, dict):
res = []
for name in data.keys():
Expand All @@ -122,6 +127,10 @@ def _infer_schema(data: Any) -> Schema:
schema = Schema(
[TensorSpec(type=clean_tensor_type(data.dtype), shape=_get_tensor_shape(data))]
)
elif isinstance(data, (csc_matrix, csr_matrix)):
schema = Schema(
[TensorSpec(type=clean_tensor_type(data.data.dtype), shape=_get_tensor_shape(data))]
)
elif _is_spark_df(data):
schema = Schema(
[
Expand Down
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -71,6 +71,7 @@ def package_files(directory):
"Flask",
"gunicorn; platform_system != 'Windows'",
"numpy",
"scipy",
"pandas",
"prometheus-flask-exporter",
"querystring_parser",
Expand Down
25 changes: 24 additions & 1 deletion tests/models/test_model_input_examples.py
Expand Up @@ -3,9 +3,14 @@
import numpy as np
import pandas as pd
import pytest
from scipy.sparse import csr_matrix, csc_matrix

from mlflow.models.signature import infer_signature
from mlflow.models.utils import _Example, _read_tensor_input_from_json
from mlflow.models.utils import (
_Example,
_read_tensor_input_from_json,
_read_sparse_matrix_from_json,
)
from mlflow.types.utils import TensorsNotSupportedException
from mlflow.utils.file_utils import TempDir
from mlflow.utils.proto_json_utils import _dataframe_from_json
Expand Down Expand Up @@ -68,6 +73,14 @@ def dict_of_ndarrays_with_nans():
}


@pytest.fixture
def dict_of_sparse_matrix():
return {
"sparse_matrix_csc": csc_matrix(np.arange(0, 12, 0.5).reshape(3, 8)),
"sparse_matrix_csr": csr_matrix(np.arange(0, 12, 0.5).reshape(3, 8)),
}


def test_input_examples(pandas_df_with_all_types, dict_of_ndarrays):
sig = infer_signature(pandas_df_with_all_types)
# test setting example with data frame with all supported data types
Expand Down Expand Up @@ -143,6 +156,16 @@ def test_input_examples(pandas_df_with_all_types, dict_of_ndarrays):
assert example == parsed_df.to_dict(orient="records")[0]


def test_sparse_matrix_input_examples(dict_of_sparse_matrix):
for example_type, input_example in dict_of_sparse_matrix.items():
with TempDir() as tmp:
example = _Example(input_example)
example.save(tmp.path())
filename = example.info["artifact_path"]
parsed_matrix = _read_sparse_matrix_from_json(tmp.path(filename), example_type)
assert np.array_equal(parsed_matrix.toarray(), input_example.toarray())


def test_input_examples_with_nan(df_with_nan, dict_of_ndarrays_with_nans):
# test setting example with data frame with NaN values in it
sig = infer_signature(df_with_nan)
Expand Down
8 changes: 6 additions & 2 deletions tests/sklearn/test_sklearn_autolog.py
Expand Up @@ -16,6 +16,7 @@
import sklearn.pipeline
import sklearn.model_selection
from scipy.stats import uniform
from scipy.sparse import csr_matrix, csc_matrix

from mlflow.exceptions import MlflowException
from mlflow.models import Model
Expand Down Expand Up @@ -857,13 +858,16 @@ def test_parameter_search_handles_large_volume_of_metric_outputs():
assert len(child_run.data.metrics) >= metrics_size


@pytest.mark.parametrize("data_type", [pd.DataFrame, np.array])
@pytest.mark.parametrize("data_type", [pd.DataFrame, np.array, csr_matrix, csc_matrix])
def test_autolog_logs_signature_and_input_example(data_type):
mlflow.sklearn.autolog(log_input_examples=True, log_model_signatures=True)

X, y = get_iris()
X = data_type(X)
y = data_type(y)
if data_type in [csr_matrix, csc_matrix]:
y = np.array(y)
else:
y = data_type(y)
model = sklearn.linear_model.LinearRegression()

with mlflow.start_run() as run:
Expand Down
16 changes: 16 additions & 0 deletions tests/types/test_schema.py
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import pandas as pd
import pytest
from scipy.sparse import csr_matrix, csc_matrix

from mlflow.exceptions import MlflowException
from mlflow.pyfunc import _enforce_tensor_spec
Expand Down Expand Up @@ -253,6 +254,21 @@ def test_get_tensor_shape(dict_of_ndarrays):
_infer_schema({"x": 1})


@pytest.fixture
def dict_of_sparse_matrix():
return {
"csc": csc_matrix(np.arange(0, 12, 0.5).reshape(3, 8)),
"csr": csr_matrix(np.arange(0, 12, 0.5).reshape(3, 8)),
}


def test_get_sparse_matrix_data_type_and_shape(dict_of_sparse_matrix):
for sparse_matrix in dict_of_sparse_matrix.values():
schema = _infer_schema(sparse_matrix)
assert schema.numpy_types() == ["float64"]
assert _get_tensor_shape(sparse_matrix) == (-1, 8)


def test_schema_inference_on_dictionary(dict_of_ndarrays):
# test dictionary
schema = _infer_schema(dict_of_ndarrays)
Expand Down

0 comments on commit 66820dd

Please sign in to comment.