Skip to content

Commit

Permalink
[pyspark] Use quantile dmatrix. (#8284)
Browse files Browse the repository at this point in the history
  • Loading branch information
trivialfis committed Oct 12, 2022
1 parent ce0382d commit 97a5b08
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 121 deletions.
2 changes: 1 addition & 1 deletion doc/parameter.rst
Expand Up @@ -349,7 +349,7 @@ Specify the learning task and the corresponding learning objective. The objectiv
- ``reg:squaredlogerror``: regression with squared log loss :math:`\frac{1}{2}[log(pred + 1) - log(label + 1)]^2`. All input labels are required to be greater than -1. Also, see metric ``rmsle`` for possible issue with this objective.
- ``reg:logistic``: logistic regression.
- ``reg:pseudohubererror``: regression with Pseudo Huber loss, a twice differentiable alternative to absolute loss.
- ``reg:absoluteerror``: Regression with L1 error. When tree model is used, leaf value is refreshed after tree construction.
- ``reg:absoluteerror``: Regression with L1 error. When tree model is used, leaf value is refreshed after tree construction. If used in distributed training, the leaf value is calculated as the mean value from all workers, which is not guaranteed to be optimal.
- ``binary:logistic``: logistic regression for binary classification, output probability
- ``binary:logitraw``: logistic regression for binary classification, output score before logistic transformation
- ``binary:hinge``: hinge loss for binary classification. This makes predictions of 0 or 1, rather than producing probabilities.
Expand Down
43 changes: 31 additions & 12 deletions python-package/xgboost/core.py
Expand Up @@ -105,6 +105,11 @@ def from_cstr_to_pystr(data: CStrPptr, length: c_bst_ulong) -> List[str]:
return res


def make_jcargs(**kwargs: Any) -> bytes:
"Make JSON-based arguments for C functions."
return from_pystr_to_cstr(json.dumps(kwargs))


IterRange = TypeVar("IterRange", Optional[Tuple[int, int]], Tuple[int, int])


Expand Down Expand Up @@ -1256,7 +1261,7 @@ def __init__(self) -> None: # pylint: disable=super-init-not-called
def _set_data_from_cuda_interface(self, data: DataType) -> None:
"""Set data from CUDA array interface."""
interface = data.__cuda_array_interface__
interface_str = bytes(json.dumps(interface, indent=2), "utf-8")
interface_str = bytes(json.dumps(interface), "utf-8")
_check_call(
_LIB.XGProxyDMatrixSetDataCudaArrayInterface(self.handle, interface_str)
)
Expand Down Expand Up @@ -1357,6 +1362,26 @@ def __init__( # pylint: disable=super-init-not-called
"Only one of the eval_qid or eval_group for each evaluation "
"dataset should be provided."
)
if isinstance(data, DataIter):
if any(
info is not None
for info in (
label,
weight,
base_margin,
feature_names,
feature_types,
group,
qid,
label_lower_bound,
label_upper_bound,
feature_weights,
)
):
raise ValueError(
"If data iterator is used as input, data like label should be "
"specified as batch argument."
)

self._init(
data,
Expand Down Expand Up @@ -1405,12 +1430,9 @@ def _init(
"in iterator to fix this error."
)

args = {
"nthread": self.nthread,
"missing": self.missing,
"max_bin": self.max_bin,
}
config = from_pystr_to_cstr(json.dumps(args))
config = make_jcargs(
nthread=self.nthread, missing=self.missing, max_bin=self.max_bin
)
ret = _LIB.XGQuantileDMatrixCreateFromCallback(
None,
it.proxy.handle,
Expand Down Expand Up @@ -2375,7 +2397,7 @@ def save_raw(self, raw_format: str = "deprecated") -> bytearray:
"""
length = c_bst_ulong()
cptr = ctypes.POINTER(ctypes.c_char)()
config = from_pystr_to_cstr(json.dumps({"format": raw_format}))
config = make_jcargs(format=raw_format)
_check_call(
_LIB.XGBoosterSaveModelToBuffer(
self.handle, config, ctypes.byref(length), ctypes.byref(cptr)
Expand Down Expand Up @@ -2570,9 +2592,6 @@ def get_score(
`n_classes`, otherwise they're scalars.
"""
fmap = os.fspath(os.path.expanduser(fmap))
args = from_pystr_to_cstr(
json.dumps({"importance_type": importance_type, "feature_map": fmap})
)
features = ctypes.POINTER(ctypes.c_char_p)()
scores = ctypes.POINTER(ctypes.c_float)()
n_out_features = c_bst_ulong()
Expand All @@ -2582,7 +2601,7 @@ def get_score(
_check_call(
_LIB.XGBoosterFeatureScore(
self.handle,
args,
make_jcargs(importance_type=importance_type, feature_map=fmap),
ctypes.byref(n_out_features),
ctypes.byref(features),
ctypes.byref(out_dim),
Expand Down
13 changes: 9 additions & 4 deletions python-package/xgboost/dask.py
Expand Up @@ -573,6 +573,7 @@ def __init__(
label_upper_bound: Optional[List[Any]] = None,
feature_names: Optional[FeatureNames] = None,
feature_types: Optional[Union[Any, List[Any]]] = None,
feature_weights: Optional[Any] = None,
) -> None:
self._data = data
self._label = label
Expand All @@ -583,6 +584,7 @@ def __init__(
self._label_upper_bound = label_upper_bound
self._feature_names = feature_names
self._feature_types = feature_types
self._feature_weights = feature_weights

assert isinstance(self._data, collections.abc.Sequence)

Expand Down Expand Up @@ -633,6 +635,7 @@ def next(self, input_data: Callable) -> int:
label_upper_bound=self._get("_label_upper_bound"),
feature_names=feature_names,
feature_types=self._feature_types,
feature_weights=self._feature_weights,
)
self._iter += 1
return 1
Expand Down Expand Up @@ -731,19 +734,21 @@ def _create_quantile_dmatrix(
return d

unzipped_dict = _get_worker_parts(parts)
it = DaskPartitionIter(**unzipped_dict)
it = DaskPartitionIter(
**unzipped_dict,
feature_types=feature_types,
feature_names=feature_names,
feature_weights=feature_weights,
)

dmatrix = QuantileDMatrix(
it,
missing=missing,
feature_names=feature_names,
feature_types=feature_types,
nthread=nthread,
max_bin=max_bin,
ref=ref,
enable_categorical=enable_categorical,
)
dmatrix.set_info(feature_weights=feature_weights)
return dmatrix


Expand Down
15 changes: 7 additions & 8 deletions python-package/xgboost/spark/core.py
Expand Up @@ -747,6 +747,7 @@ def _fit(self, dataset):
k: v for k, v in train_call_kwargs_params.items() if v is not None
}
dmatrix_kwargs = {k: v for k, v in dmatrix_kwargs.items() if v is not None}
use_qdm = booster_params.get("tree_method") in ("hist", "gpu_hist")

def _train_booster(pandas_df_iter):
"""Takes in an RDD partition and outputs a booster for that partition after
Expand All @@ -759,20 +760,17 @@ def _train_booster(pandas_df_iter):
context.barrier()

gpu_id = None

if use_qdm and (booster_params.get("max_bin", None) is not None):
dmatrix_kwargs["max_bin"] = booster_params["max_bin"]

if use_gpu:
gpu_id = context.partitionId() if is_local else _get_gpu_id(context)
booster_params["gpu_id"] = gpu_id

# max_bin is needed for qdm
if (
features_cols_names is not None
and booster_params.get("max_bin", None) is not None
):
dmatrix_kwargs["max_bin"] = booster_params["max_bin"]

_rabit_args = {}
if context.partitionId() == 0:
get_logger("XGBoostPySpark").info(
get_logger("XGBoostPySpark").debug(
"booster params: %s\n"
"train_call_kwargs_params: %s\n"
"dmatrix_kwargs: %s",
Expand All @@ -791,6 +789,7 @@ def _train_booster(pandas_df_iter):
pandas_df_iter,
features_cols_names,
gpu_id,
use_qdm,
dmatrix_kwargs,
enable_sparse_data_optim=enable_sparse_data_optim,
has_validation_col=has_validation_col,
Expand Down

0 comments on commit 97a5b08

Please sign in to comment.