New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pyspark] Make Xgboost estimator support using sparse matrix as optimization #8145
Changes from all commits
861d008
3c0b30b
131711d
d5ba06b
354660c
466bf8e
cf02d60
ce95f5b
f58d7ff
45cf38b
baec186
0584c5d
56c72d3
1fb08e2
534274e
1408c29
5c11a46
4d22623
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
|
||
import numpy as np | ||
import pandas as pd | ||
from scipy.sparse import csr_matrix | ||
from xgboost.compat import concat | ||
|
||
from xgboost import DataIter, DeviceQuantileDMatrix, DMatrix | ||
|
@@ -101,11 +102,55 @@ def reset(self) -> None: | |
self._iter = 0 | ||
|
||
|
||
def _read_csr_matrix_from_unwrapped_spark_vec(part: pd.DataFrame) -> csr_matrix: | ||
# variables for constructing csr_matrix | ||
csr_indices_list, csr_indptr_list, csr_values_list = [], [0], [] | ||
|
||
n_features = 0 | ||
|
||
for vec_type, vec_size_, vec_indices, vec_values in zip( | ||
part.featureVectorType, | ||
part.featureVectorSize, | ||
part.featureVectorIndices, | ||
part.featureVectorValues, | ||
): | ||
if vec_type == 0: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please correct me if I'm wrong. now that the missing is 0, do we still really need the sparse vector? per my understanding, if one instance has a missing value, then the whole instance will be removed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @trivialfis Is it true ? If so then training on sparse data makes no sense. Almost all instances will be removed ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think no, @wbo4958 You can check my test case |
||
# sparse vector | ||
vec_size = int(vec_size_) | ||
csr_indices = vec_indices | ||
csr_values = vec_values | ||
else: | ||
# dense vector | ||
# Note: According to spark ML VectorUDT format, | ||
# when type field is 1, the size field is also empty. | ||
# we need to check the values field to get vector length. | ||
vec_size = len(vec_values) | ||
csr_indices = np.arange(vec_size, dtype=np.int32) | ||
csr_values = vec_values | ||
|
||
if n_features == 0: | ||
n_features = vec_size | ||
assert n_features == vec_size | ||
|
||
csr_indices_list.append(csr_indices) | ||
csr_indptr_list.append(csr_indptr_list[-1] + len(csr_indices)) | ||
csr_values_list.append(csr_values) | ||
|
||
csr_indptr_arr = np.array(csr_indptr_list) | ||
csr_indices_arr = np.concatenate(csr_indices_list) | ||
csr_values_arr = np.concatenate(csr_values_list) | ||
|
||
return csr_matrix( | ||
(csr_values_arr, csr_indices_arr, csr_indptr_arr), shape=(len(part), n_features) | ||
) | ||
|
||
|
||
def create_dmatrix_from_partitions( | ||
iterator: Iterator[pd.DataFrame], | ||
feature_cols: Optional[Sequence[str]], | ||
gpu_id: Optional[int], | ||
kwargs: Dict[str, Any], # use dict to make sure this parameter is passed. | ||
enable_sparse_data_optim: bool, | ||
) -> Tuple[DMatrix, Optional[DMatrix]]: | ||
"""Create DMatrix from spark data partitions. This is not particularly efficient as | ||
we need to convert the pandas series format to numpy then concatenate all the data. | ||
|
@@ -118,7 +163,7 @@ def create_dmatrix_from_partitions( | |
Metainfo for DMatrix. | ||
|
||
""" | ||
|
||
# pylint: disable=too-many-locals, too-many-statements | ||
train_data: Dict[str, List[np.ndarray]] = defaultdict(list) | ||
valid_data: Dict[str, List[np.ndarray]] = defaultdict(list) | ||
|
||
|
@@ -139,6 +184,23 @@ def append_m(part: pd.DataFrame, name: str, is_valid: bool) -> None: | |
else: | ||
train_data[name].append(array) | ||
|
||
def append_m_sparse(part: pd.DataFrame, name: str, is_valid: bool) -> None: | ||
nonlocal n_features | ||
|
||
if name == alias.data or name in part.columns: | ||
if name == alias.data: | ||
array = _read_csr_matrix_from_unwrapped_spark_vec(part) | ||
if n_features == 0: | ||
n_features = array.shape[1] | ||
assert n_features == array.shape[1] | ||
else: | ||
array = part[name] | ||
|
||
if is_valid: | ||
valid_data[name].append(array) | ||
else: | ||
train_data[name].append(array) | ||
|
||
def append_dqm(part: pd.DataFrame, name: str, is_valid: bool) -> None: | ||
"""Preprocessing for DeviceQuantileDMatrix""" | ||
nonlocal n_features | ||
|
@@ -164,13 +226,19 @@ def make(values: Dict[str, List[np.ndarray]], kwargs: Dict[str, Any]) -> DMatrix | |
label = concat_or_none(values.get(alias.label, None)) | ||
weight = concat_or_none(values.get(alias.weight, None)) | ||
margin = concat_or_none(values.get(alias.margin, None)) | ||
|
||
return DMatrix( | ||
data=data, label=label, weight=weight, base_margin=margin, **kwargs | ||
) | ||
|
||
is_dmatrix = feature_cols is None | ||
if is_dmatrix: | ||
cache_partitions(iterator, append_m) | ||
if enable_sparse_data_optim: | ||
append_fn = append_m_sparse | ||
assert "missing" in kwargs and kwargs["missing"] == 0.0 | ||
else: | ||
append_fn = append_m | ||
cache_partitions(iterator, append_fn) | ||
dtrain = make(train_data, kwargs) | ||
else: | ||
cache_partitions(iterator, append_dqm) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the _fit function is almost 200 lines, which is super huge, could we split this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am working on another Ranker estimator PR. We can do this refactor after these feature PRs merged. Otherwise fixing conflicts is annoying.