Skip to content

Commit

Permalink
[pyspark] Cleanup data processing. (#8088)
Browse files Browse the repository at this point in the history
- Use numpy stack for handling list of arrays.
- Reuse concat function from dask.
- Prepare for `QuantileDMatrix`.
- Remove unused code.
- Use iterator for prediction to avoid initializing xgboost model
  • Loading branch information
trivialfis committed Jul 26, 2022
1 parent 3970e4e commit 546de5e
Show file tree
Hide file tree
Showing 9 changed files with 433 additions and 489 deletions.
42 changes: 37 additions & 5 deletions python-package/xgboost/compat.py
@@ -1,13 +1,14 @@
# coding: utf-8
# pylint: disable= invalid-name, unused-import
"""For compatibility and optional dependencies."""
from typing import Any, Type, Dict, Optional, List
from typing import Any, Type, Dict, Optional, List, Sequence, cast
import sys
import types
import importlib.util
import logging
import numpy as np

from ._typing import _T

assert (sys.version_info[0] == 3), 'Python 2 is no longer supported.'


Expand All @@ -16,7 +17,7 @@ def py_str(x: bytes) -> str:
return x.decode('utf-8') # type: ignore


def lazy_isinstance(instance: Type[object], module: str, name: str) -> bool:
def lazy_isinstance(instance: Any, module: str, name: str) -> bool:
"""Use string representation to identify a type."""

# Notice, we use .__class__ as opposed to type() in order
Expand Down Expand Up @@ -104,11 +105,42 @@ def from_json(self, doc: Dict) -> None:
try:
import scipy.sparse as scipy_sparse
from scipy.sparse import csr_matrix as scipy_csr
SCIPY_INSTALLED = True
except ImportError:
scipy_sparse = False
scipy_csr = object
SCIPY_INSTALLED = False


def concat(value: Sequence[_T]) -> _T: # pylint: disable=too-many-return-statements
"""Concatenate row-wise."""
if isinstance(value[0], np.ndarray):
value_arr = cast(Sequence[np.ndarray], value)
return np.concatenate(value_arr, axis=0)
if scipy_sparse and isinstance(value[0], scipy_sparse.csr_matrix):
return scipy_sparse.vstack(value, format="csr")
if scipy_sparse and isinstance(value[0], scipy_sparse.csc_matrix):
return scipy_sparse.vstack(value, format="csc")
if scipy_sparse and isinstance(value[0], scipy_sparse.spmatrix):
# other sparse format will be converted to CSR.
return scipy_sparse.vstack(value, format="csr")
if PANDAS_INSTALLED and isinstance(value[0], (DataFrame, Series)):
return pandas_concat(value, axis=0)
if lazy_isinstance(value[0], "cudf.core.dataframe", "DataFrame") or lazy_isinstance(
value[0], "cudf.core.series", "Series"
):
from cudf import concat as CUDF_concat # pylint: disable=import-error

return CUDF_concat(value, axis=0)
if lazy_isinstance(value[0], "cupy._core.core", "ndarray"):
import cupy # pylint: disable=import-error

# pylint: disable=c-extension-no-member,no-member
d = cupy.cuda.runtime.getDevice()
for v in value:
arr = cast(cupy.ndarray, v)
d_v = arr.device.id
assert d_v == d, "Concatenating arrays on different devices."
return cupy.concatenate(value, axis=0)
raise TypeError("Unknown type.")


# Modified from tensorflow with added caching. There's a `LazyLoader` in
Expand Down
49 changes: 9 additions & 40 deletions python-package/xgboost/dask.py
Expand Up @@ -58,17 +58,9 @@
import numpy

from . import config, rabit
from ._typing import FeatureNames, FeatureTypes
from ._typing import _T, FeatureNames, FeatureTypes
from .callback import TrainingCallback
from .compat import (
PANDAS_INSTALLED,
DataFrame,
LazyLoader,
Series,
lazy_isinstance,
pandas_concat,
scipy_sparse,
)
from .compat import DataFrame, LazyLoader, concat, lazy_isinstance
from .core import (
Booster,
DataIter,
Expand Down Expand Up @@ -234,35 +226,12 @@ def __init__(self, args: List[bytes]) -> None:
)


def concat(value: Any) -> Any: # pylint: disable=too-many-return-statements
"""To be replaced with dask builtin."""
if isinstance(value[0], numpy.ndarray):
return numpy.concatenate(value, axis=0)
if scipy_sparse and isinstance(value[0], scipy_sparse.csr_matrix):
return scipy_sparse.vstack(value, format="csr")
if scipy_sparse and isinstance(value[0], scipy_sparse.csc_matrix):
return scipy_sparse.vstack(value, format="csc")
if scipy_sparse and isinstance(value[0], scipy_sparse.spmatrix):
# other sparse format will be converted to CSR.
return scipy_sparse.vstack(value, format="csr")
if PANDAS_INSTALLED and isinstance(value[0], (DataFrame, Series)):
return pandas_concat(value, axis=0)
if lazy_isinstance(value[0], "cudf.core.dataframe", "DataFrame") or lazy_isinstance(
value[0], "cudf.core.series", "Series"
):
from cudf import concat as CUDF_concat # pylint: disable=import-error

return CUDF_concat(value, axis=0)
if lazy_isinstance(value[0], "cupy._core.core", "ndarray"):
import cupy

# pylint: disable=c-extension-no-member,no-member
d = cupy.cuda.runtime.getDevice()
for v in value:
d_v = v.device.id
assert d_v == d, "Concatenating arrays on different devices."
return cupy.concatenate(value, axis=0)
return dd.multi.concat(list(value), axis=0)
def dconcat(value: Sequence[_T]) -> _T: # pylint: disable=too-many-return-statements
"""Concatenate sequence of partitions."""
try:
return concat(value)
except TypeError:
return dd.multi.concat(list(value), axis=0)


def _xgb_get_client(client: Optional["distributed.Client"]) -> "distributed.Client":
Expand Down Expand Up @@ -797,7 +766,7 @@ def _create_dmatrix(
def concat_or_none(data: Sequence[Optional[T]]) -> Optional[T]:
if any(part is None for part in data):
return None
return concat(data)
return dconcat(data)

unzipped_dict = _get_worker_parts(list_of_parts)
concated_dict: Dict[str, Any] = {}
Expand Down
19 changes: 12 additions & 7 deletions python-package/xgboost/sklearn.py
Expand Up @@ -17,7 +17,9 @@
Type,
cast,
)

import numpy as np
from scipy.special import softmax

from .core import Booster, DMatrix, XGBoostError
from .core import _deprecate_positional_args, _convert_ntree_limit
Expand Down Expand Up @@ -1540,17 +1542,20 @@ def predict_proba(
"""
# custom obj: Do nothing as we don't know what to do.
# softprob: Do nothing, output is proba.
# softmax: Unsupported by predict_proba()
# softmax: Use softmax from scipy
# binary:logistic: Expand the prob vector into 2-class matrix after predict.
# binary:logitraw: Unsupported by predict_proba()
if self.objective == "multi:softmax":
# We need to run a Python implementation of softmax for it. Just ask user to
# use softprob since XGBoost's implementation has mitigation for floating
# point overflow. No need to reinvent the wheel.
raise ValueError(
"multi:softmax doesn't support `predict_proba`. "
"Switch to `multi:softproba` instead"
raw_predt = super().predict(
X=X,
ntree_limit=ntree_limit,
validate_features=validate_features,
base_margin=base_margin,
iteration_range=iteration_range,
output_margin=True
)
class_prob = softmax(raw_predt, axis=1)
return class_prob
class_probs = super().predict(
X=X,
ntree_limit=ntree_limit,
Expand Down

0 comments on commit 546de5e

Please sign in to comment.