Skip to content

Commit

Permalink
[PySpark] change the returning model type to string from binary (#8085)
Browse files Browse the repository at this point in the history
* [PySpark] change the returning model type to string from binary

XGBoost pyspark can be can be accelerated by RAPIDS Accelerator seamlessly by
changing the returning model type from binary to string.
  • Loading branch information
wbo4958 committed Jul 19, 2022
1 parent 2365f82 commit f801d3c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 13 deletions.
33 changes: 22 additions & 11 deletions python-package/xgboost/spark/core.py
Expand Up @@ -2,7 +2,6 @@
"""Xgboost pyspark integration submodule for core code."""
# pylint: disable=fixme, too-many-ancestors, protected-access, no-member, invalid-name
# pylint: disable=too-few-public-methods
import cloudpickle
import numpy as np
import pandas as pd
from scipy.special import expit, softmax # pylint: disable=no-name-in-module
Expand Down Expand Up @@ -410,12 +409,13 @@ def _pyspark_model_cls(cls):
def _create_pyspark_model(self, xgb_model):
return self._pyspark_model_cls()(xgb_model)

def _convert_to_sklearn_model(self, booster):
def _convert_to_sklearn_model(self, booster: bytearray, config: str):
xgb_sklearn_params = self._gen_xgb_params_dict(
gen_xgb_sklearn_estimator_param=True
)
sklearn_model = self._xgb_cls()(**xgb_sklearn_params)
sklearn_model._Booster = booster
sklearn_model.load_model(booster)
sklearn_model._Booster.load_config(config)
return sklearn_model

def _query_plan_contains_valid_repartition(self, dataset):
Expand Down Expand Up @@ -629,16 +629,27 @@ def _train_booster(pandas_df_iter):
context.barrier()

if context.partitionId() == 0:
yield pd.DataFrame(data={"booster_bytes": [cloudpickle.dumps(booster)]})
yield pd.DataFrame(
data={
"config": [booster.save_config()],
"booster": [booster.save_raw("json").decode("utf-8")]
}
)

def _run_job():
ret = (
dataset.mapInPandas(_train_booster, schema="config string, booster string")
.rdd.barrier()
.mapPartitions(lambda x: x)
.collect()[0]
)
return ret[0], ret[1]

(config, booster) = _run_job()

result_ser_booster = (
dataset.mapInPandas(_train_booster, schema="booster_bytes binary")
.rdd.barrier()
.mapPartitions(lambda x: x)
.collect()[0][0]
)
result_xgb_model = self._convert_to_sklearn_model(
cloudpickle.loads(result_ser_booster)
bytearray(booster, "utf-8"),
config
)
return self._copyValues(self._create_pyspark_model(result_xgb_model))

Expand Down
8 changes: 6 additions & 2 deletions tests/python/test_spark/test_spark_local.py
Expand Up @@ -904,15 +904,19 @@ def test_convert_to_sklearn_model(self):

# Check that regardless of what booster, _convert_to_model converts to the correct class type
sklearn_classifier = classifier._convert_to_sklearn_model(
clf_model.get_booster()
clf_model.get_booster().save_raw("json"),
clf_model.get_booster().save_config()
)
assert isinstance(sklearn_classifier, XGBClassifier)
assert sklearn_classifier.n_estimators == 200
assert sklearn_classifier.missing == 2.0
assert sklearn_classifier.max_depth == 3
assert sklearn_classifier.get_params()["sketch_eps"] == 0.5

sklearn_regressor = regressor._convert_to_sklearn_model(reg_model.get_booster())
sklearn_regressor = regressor._convert_to_sklearn_model(
reg_model.get_booster().save_raw("json"),
reg_model.get_booster().save_config()
)
assert isinstance(sklearn_regressor, XGBRegressor)
assert sklearn_regressor.n_estimators == 200
assert sklearn_regressor.missing == 2.0
Expand Down

0 comments on commit f801d3c

Please sign in to comment.