Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PySpark] change the returning model type to string from binary #8085

Merged
merged 3 commits into from Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 22 additions & 11 deletions python-package/xgboost/spark/core.py
Expand Up @@ -2,7 +2,6 @@
"""Xgboost pyspark integration submodule for core code."""
# pylint: disable=fixme, too-many-ancestors, protected-access, no-member, invalid-name
# pylint: disable=too-few-public-methods
import cloudpickle
import numpy as np
import pandas as pd
from scipy.special import expit, softmax # pylint: disable=no-name-in-module
Expand Down Expand Up @@ -410,12 +409,13 @@ def _pyspark_model_cls(cls):
def _create_pyspark_model(self, xgb_model):
return self._pyspark_model_cls()(xgb_model)

def _convert_to_sklearn_model(self, booster):
def _convert_to_sklearn_model(self, booster: bytearray, config: str):
xgb_sklearn_params = self._gen_xgb_params_dict(
gen_xgb_sklearn_estimator_param=True
)
sklearn_model = self._xgb_cls()(**xgb_sklearn_params)
sklearn_model._Booster = booster
sklearn_model.load_model(booster)
sklearn_model._Booster.load_config(config)
return sklearn_model

def _query_plan_contains_valid_repartition(self, dataset):
Expand Down Expand Up @@ -629,16 +629,27 @@ def _train_booster(pandas_df_iter):
context.barrier()

if context.partitionId() == 0:
yield pd.DataFrame(data={"booster_bytes": [cloudpickle.dumps(booster)]})
yield pd.DataFrame(
data={
"config": [booster.save_config()],
"booster": [booster.save_raw("json").decode("utf-8")]
}
)

def _run_job():
ret = (
dataset.mapInPandas(_train_booster, schema="config string, booster string")
.rdd.barrier()
.mapPartitions(lambda x: x)
.collect()[0]
)
return ret[0], ret[1]

(config, booster) = _run_job()

result_ser_booster = (
dataset.mapInPandas(_train_booster, schema="booster_bytes binary")
.rdd.barrier()
.mapPartitions(lambda x: x)
.collect()[0][0]
)
result_xgb_model = self._convert_to_sklearn_model(
cloudpickle.loads(result_ser_booster)
bytearray(booster, "utf-8"),
config
)
return self._copyValues(self._create_pyspark_model(result_xgb_model))

Expand Down
8 changes: 6 additions & 2 deletions tests/python/test_spark/test_spark_local.py
Expand Up @@ -904,15 +904,19 @@ def test_convert_to_sklearn_model(self):

# Check that regardless of what booster, _convert_to_model converts to the correct class type
sklearn_classifier = classifier._convert_to_sklearn_model(
clf_model.get_booster()
clf_model.get_booster().save_raw("json"),
clf_model.get_booster().save_config()
)
assert isinstance(sklearn_classifier, XGBClassifier)
assert sklearn_classifier.n_estimators == 200
assert sklearn_classifier.missing == 2.0
assert sklearn_classifier.max_depth == 3
assert sklearn_classifier.get_params()["sketch_eps"] == 0.5

sklearn_regressor = regressor._convert_to_sklearn_model(reg_model.get_booster())
sklearn_regressor = regressor._convert_to_sklearn_model(
reg_model.get_booster().save_raw("json"),
reg_model.get_booster().save_config()
)
assert isinstance(sklearn_regressor, XGBRegressor)
assert sklearn_regressor.n_estimators == 200
assert sklearn_regressor.missing == 2.0
Expand Down