Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pyspark] Cleanup data processing. #8088

Merged
merged 3 commits into from Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 37 additions & 5 deletions python-package/xgboost/compat.py
@@ -1,13 +1,14 @@
# coding: utf-8
# pylint: disable= invalid-name, unused-import
"""For compatibility and optional dependencies."""
from typing import Any, Type, Dict, Optional, List
from typing import Any, Type, Dict, Optional, List, Sequence, cast
import sys
import types
import importlib.util
import logging
import numpy as np

from ._typing import _T

assert (sys.version_info[0] == 3), 'Python 2 is no longer supported.'


Expand All @@ -16,7 +17,7 @@ def py_str(x: bytes) -> str:
return x.decode('utf-8') # type: ignore


def lazy_isinstance(instance: Type[object], module: str, name: str) -> bool:
def lazy_isinstance(instance: Any, module: str, name: str) -> bool:
"""Use string representation to identify a type."""

# Notice, we use .__class__ as opposed to type() in order
Expand Down Expand Up @@ -104,11 +105,42 @@ def from_json(self, doc: Dict) -> None:
try:
import scipy.sparse as scipy_sparse
from scipy.sparse import csr_matrix as scipy_csr
SCIPY_INSTALLED = True
except ImportError:
scipy_sparse = False
scipy_csr = object
SCIPY_INSTALLED = False


def concat(value: Sequence[_T]) -> _T: # pylint: disable=too-many-return-statements
"""Concatenate row-wise."""
if isinstance(value[0], np.ndarray):
value_arr = cast(Sequence[np.ndarray], value)
return np.concatenate(value_arr, axis=0)
if scipy_sparse and isinstance(value[0], scipy_sparse.csr_matrix):
return scipy_sparse.vstack(value, format="csr")
if scipy_sparse and isinstance(value[0], scipy_sparse.csc_matrix):
return scipy_sparse.vstack(value, format="csc")
if scipy_sparse and isinstance(value[0], scipy_sparse.spmatrix):
# other sparse format will be converted to CSR.
return scipy_sparse.vstack(value, format="csr")
if PANDAS_INSTALLED and isinstance(value[0], (DataFrame, Series)):
return pandas_concat(value, axis=0)
if lazy_isinstance(value[0], "cudf.core.dataframe", "DataFrame") or lazy_isinstance(
value[0], "cudf.core.series", "Series"
):
from cudf import concat as CUDF_concat # pylint: disable=import-error

return CUDF_concat(value, axis=0)
if lazy_isinstance(value[0], "cupy._core.core", "ndarray"):
import cupy # pylint: disable=import-error

# pylint: disable=c-extension-no-member,no-member
d = cupy.cuda.runtime.getDevice()
for v in value:
arr = cast(cupy.ndarray, v)
d_v = arr.device.id
assert d_v == d, "Concatenating arrays on different devices."
return cupy.concatenate(value, axis=0)
raise TypeError("Unknown type.")


# Modified from tensorflow with added caching. There's a `LazyLoader` in
Expand Down
49 changes: 9 additions & 40 deletions python-package/xgboost/dask.py
Expand Up @@ -58,17 +58,9 @@
import numpy

from . import config, rabit
from ._typing import FeatureNames, FeatureTypes
from ._typing import _T, FeatureNames, FeatureTypes
from .callback import TrainingCallback
from .compat import (
PANDAS_INSTALLED,
DataFrame,
LazyLoader,
Series,
lazy_isinstance,
pandas_concat,
scipy_sparse,
)
from .compat import DataFrame, LazyLoader, concat, lazy_isinstance
from .core import (
Booster,
DataIter,
Expand Down Expand Up @@ -234,35 +226,12 @@ def __init__(self, args: List[bytes]) -> None:
)


def concat(value: Any) -> Any: # pylint: disable=too-many-return-statements
"""To be replaced with dask builtin."""
if isinstance(value[0], numpy.ndarray):
return numpy.concatenate(value, axis=0)
if scipy_sparse and isinstance(value[0], scipy_sparse.csr_matrix):
return scipy_sparse.vstack(value, format="csr")
if scipy_sparse and isinstance(value[0], scipy_sparse.csc_matrix):
return scipy_sparse.vstack(value, format="csc")
if scipy_sparse and isinstance(value[0], scipy_sparse.spmatrix):
# other sparse format will be converted to CSR.
return scipy_sparse.vstack(value, format="csr")
if PANDAS_INSTALLED and isinstance(value[0], (DataFrame, Series)):
return pandas_concat(value, axis=0)
if lazy_isinstance(value[0], "cudf.core.dataframe", "DataFrame") or lazy_isinstance(
value[0], "cudf.core.series", "Series"
):
from cudf import concat as CUDF_concat # pylint: disable=import-error

return CUDF_concat(value, axis=0)
if lazy_isinstance(value[0], "cupy._core.core", "ndarray"):
import cupy

# pylint: disable=c-extension-no-member,no-member
d = cupy.cuda.runtime.getDevice()
for v in value:
d_v = v.device.id
assert d_v == d, "Concatenating arrays on different devices."
return cupy.concatenate(value, axis=0)
return dd.multi.concat(list(value), axis=0)
def dconcat(value: Sequence[_T]) -> _T: # pylint: disable=too-many-return-statements
"""Concatenate sequence of partitions."""
try:
return concat(value)
except TypeError:
return dd.multi.concat(list(value), axis=0)


def _xgb_get_client(client: Optional["distributed.Client"]) -> "distributed.Client":
Expand Down Expand Up @@ -797,7 +766,7 @@ def _create_dmatrix(
def concat_or_none(data: Sequence[Optional[T]]) -> Optional[T]:
if any(part is None for part in data):
return None
return concat(data)
return dconcat(data)

unzipped_dict = _get_worker_parts(list_of_parts)
concated_dict: Dict[str, Any] = {}
Expand Down
19 changes: 12 additions & 7 deletions python-package/xgboost/sklearn.py
Expand Up @@ -17,7 +17,9 @@
Type,
cast,
)

import numpy as np
from scipy.special import softmax

from .core import Booster, DMatrix, XGBoostError
from .core import _deprecate_positional_args, _convert_ntree_limit
Expand Down Expand Up @@ -1540,17 +1542,20 @@ def predict_proba(
"""
# custom obj: Do nothing as we don't know what to do.
# softprob: Do nothing, output is proba.
# softmax: Unsupported by predict_proba()
# softmax: Use softmax from scipy
# binary:logistic: Expand the prob vector into 2-class matrix after predict.
# binary:logitraw: Unsupported by predict_proba()
if self.objective == "multi:softmax":
# We need to run a Python implementation of softmax for it. Just ask user to
# use softprob since XGBoost's implementation has mitigation for floating
# point overflow. No need to reinvent the wheel.
raise ValueError(
"multi:softmax doesn't support `predict_proba`. "
"Switch to `multi:softproba` instead"
raw_predt = super().predict(
X=X,
ntree_limit=ntree_limit,
validate_features=validate_features,
base_margin=base_margin,
iteration_range=iteration_range,
output_margin=True
)
class_prob = softmax(raw_predt, axis=1)
return class_prob
class_probs = super().predict(
X=X,
ntree_limit=ntree_limit,
Expand Down