Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pyspark] Use quantile dmatrix. #8284

Merged
merged 21 commits into from Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/parameter.rst
Expand Up @@ -349,7 +349,7 @@ Specify the learning task and the corresponding learning objective. The objectiv
- ``reg:squaredlogerror``: regression with squared log loss :math:`\frac{1}{2}[log(pred + 1) - log(label + 1)]^2`. All input labels are required to be greater than -1. Also, see metric ``rmsle`` for possible issue with this objective.
- ``reg:logistic``: logistic regression.
- ``reg:pseudohubererror``: regression with Pseudo Huber loss, a twice differentiable alternative to absolute loss.
- ``reg:absoluteerror``: Regression with L1 error. When tree model is used, leaf value is refreshed after tree construction.
- ``reg:absoluteerror``: Regression with L1 error. When tree model is used, leaf value is refreshed after tree construction. If used in distributed training, the leaf value is calculated as the mean value from all workers, which is not guaranteed to be optimal.
- ``binary:logistic``: logistic regression for binary classification, output probability
- ``binary:logitraw``: logistic regression for binary classification, output score before logistic transformation
- ``binary:hinge``: hinge loss for binary classification. This makes predictions of 0 or 1, rather than producing probabilities.
Expand Down
43 changes: 31 additions & 12 deletions python-package/xgboost/core.py
Expand Up @@ -105,6 +105,11 @@ def from_cstr_to_pystr(data: CStrPptr, length: c_bst_ulong) -> List[str]:
return res


def make_jcargs(**kwargs: Any) -> bytes:
"Make JSON-based arguments for C functions."
return from_pystr_to_cstr(json.dumps(kwargs))


IterRange = TypeVar("IterRange", Optional[Tuple[int, int]], Tuple[int, int])


Expand Down Expand Up @@ -1256,7 +1261,7 @@ def __init__(self) -> None: # pylint: disable=super-init-not-called
def _set_data_from_cuda_interface(self, data: DataType) -> None:
"""Set data from CUDA array interface."""
interface = data.__cuda_array_interface__
interface_str = bytes(json.dumps(interface, indent=2), "utf-8")
interface_str = bytes(json.dumps(interface), "utf-8")
_check_call(
_LIB.XGProxyDMatrixSetDataCudaArrayInterface(self.handle, interface_str)
)
Expand Down Expand Up @@ -1357,6 +1362,26 @@ def __init__( # pylint: disable=super-init-not-called
"Only one of the eval_qid or eval_group for each evaluation "
"dataset should be provided."
)
if isinstance(data, DataIter):
if any(
info is not None
for info in (
label,
weight,
base_margin,
feature_names,
feature_types,
group,
qid,
label_lower_bound,
label_upper_bound,
feature_weights,
)
):
raise ValueError(
"If data iterator is used as input, data like label should be "
"specified as batch argument."
)

self._init(
data,
Expand Down Expand Up @@ -1405,12 +1430,9 @@ def _init(
"in iterator to fix this error."
)

args = {
"nthread": self.nthread,
"missing": self.missing,
"max_bin": self.max_bin,
}
config = from_pystr_to_cstr(json.dumps(args))
config = make_jcargs(
nthread=self.nthread, missing=self.missing, max_bin=self.max_bin
)
ret = _LIB.XGQuantileDMatrixCreateFromCallback(
None,
it.proxy.handle,
Expand Down Expand Up @@ -2375,7 +2397,7 @@ def save_raw(self, raw_format: str = "deprecated") -> bytearray:
"""
length = c_bst_ulong()
cptr = ctypes.POINTER(ctypes.c_char)()
config = from_pystr_to_cstr(json.dumps({"format": raw_format}))
config = make_jcargs(format=raw_format)
_check_call(
_LIB.XGBoosterSaveModelToBuffer(
self.handle, config, ctypes.byref(length), ctypes.byref(cptr)
Expand Down Expand Up @@ -2570,9 +2592,6 @@ def get_score(
`n_classes`, otherwise they're scalars.
"""
fmap = os.fspath(os.path.expanduser(fmap))
args = from_pystr_to_cstr(
json.dumps({"importance_type": importance_type, "feature_map": fmap})
)
features = ctypes.POINTER(ctypes.c_char_p)()
scores = ctypes.POINTER(ctypes.c_float)()
n_out_features = c_bst_ulong()
Expand All @@ -2582,7 +2601,7 @@ def get_score(
_check_call(
_LIB.XGBoosterFeatureScore(
self.handle,
args,
make_jcargs(importance_type=importance_type, feature_map=fmap),
ctypes.byref(n_out_features),
ctypes.byref(features),
ctypes.byref(out_dim),
Expand Down
13 changes: 9 additions & 4 deletions python-package/xgboost/dask.py
Expand Up @@ -573,6 +573,7 @@ def __init__(
label_upper_bound: Optional[List[Any]] = None,
feature_names: Optional[FeatureNames] = None,
feature_types: Optional[Union[Any, List[Any]]] = None,
feature_weights: Optional[Any] = None,
) -> None:
self._data = data
self._label = label
Expand All @@ -583,6 +584,7 @@ def __init__(
self._label_upper_bound = label_upper_bound
self._feature_names = feature_names
self._feature_types = feature_types
self._feature_weights = feature_weights

assert isinstance(self._data, collections.abc.Sequence)

Expand Down Expand Up @@ -633,6 +635,7 @@ def next(self, input_data: Callable) -> int:
label_upper_bound=self._get("_label_upper_bound"),
feature_names=feature_names,
feature_types=self._feature_types,
feature_weights=self._feature_weights,
)
self._iter += 1
return 1
Expand Down Expand Up @@ -731,19 +734,21 @@ def _create_quantile_dmatrix(
return d

unzipped_dict = _get_worker_parts(parts)
it = DaskPartitionIter(**unzipped_dict)
it = DaskPartitionIter(
**unzipped_dict,
feature_types=feature_types,
feature_names=feature_names,
feature_weights=feature_weights,
)

dmatrix = QuantileDMatrix(
it,
missing=missing,
feature_names=feature_names,
feature_types=feature_types,
nthread=nthread,
max_bin=max_bin,
ref=ref,
enable_categorical=enable_categorical,
)
dmatrix.set_info(feature_weights=feature_weights)
return dmatrix


Expand Down
15 changes: 7 additions & 8 deletions python-package/xgboost/spark/core.py
Expand Up @@ -747,6 +747,7 @@ def _fit(self, dataset):
k: v for k, v in train_call_kwargs_params.items() if v is not None
}
dmatrix_kwargs = {k: v for k, v in dmatrix_kwargs.items() if v is not None}
use_qdm = booster_params.get("tree_method") in ("hist", "gpu_hist")

def _train_booster(pandas_df_iter):
"""Takes in an RDD partition and outputs a booster for that partition after
Expand All @@ -759,20 +760,17 @@ def _train_booster(pandas_df_iter):
context.barrier()

gpu_id = None

if use_qdm and (booster_params.get("max_bin", None) is not None):
dmatrix_kwargs["max_bin"] = booster_params["max_bin"]

if use_gpu:
gpu_id = context.partitionId() if is_local else _get_gpu_id(context)
booster_params["gpu_id"] = gpu_id

# max_bin is needed for qdm
if (
features_cols_names is not None
and booster_params.get("max_bin", None) is not None
):
dmatrix_kwargs["max_bin"] = booster_params["max_bin"]

_rabit_args = {}
if context.partitionId() == 0:
get_logger("XGBoostPySpark").info(
get_logger("XGBoostPySpark").debug(
"booster params: %s\n"
"train_call_kwargs_params: %s\n"
"dmatrix_kwargs: %s",
Expand All @@ -791,6 +789,7 @@ def _train_booster(pandas_df_iter):
pandas_df_iter,
features_cols_names,
gpu_id,
use_qdm,
dmatrix_kwargs,
enable_sparse_data_optim=enable_sparse_data_optim,
has_validation_col=has_validation_col,
Expand Down