diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index c38fcbffd48a..bd461d49b5cd 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -2,7 +2,6 @@ """Xgboost pyspark integration submodule for core code.""" # pylint: disable=fixme, too-many-ancestors, protected-access, no-member, invalid-name # pylint: disable=too-few-public-methods -import cloudpickle import numpy as np import pandas as pd from scipy.special import expit, softmax # pylint: disable=no-name-in-module @@ -410,12 +409,13 @@ def _pyspark_model_cls(cls): def _create_pyspark_model(self, xgb_model): return self._pyspark_model_cls()(xgb_model) - def _convert_to_sklearn_model(self, booster): + def _convert_to_sklearn_model(self, booster: bytearray, config: str): xgb_sklearn_params = self._gen_xgb_params_dict( gen_xgb_sklearn_estimator_param=True ) sklearn_model = self._xgb_cls()(**xgb_sklearn_params) - sklearn_model._Booster = booster + sklearn_model.load_model(booster) + sklearn_model._Booster.load_config(config) return sklearn_model def _query_plan_contains_valid_repartition(self, dataset): @@ -629,16 +629,27 @@ def _train_booster(pandas_df_iter): context.barrier() if context.partitionId() == 0: - yield pd.DataFrame(data={"booster_bytes": [cloudpickle.dumps(booster)]}) + yield pd.DataFrame( + data={ + "config": [booster.save_config()], + "booster": [booster.save_raw("json").decode("utf-8")] + } + ) + + def _run_job(): + ret = ( + dataset.mapInPandas(_train_booster, schema="config string, booster string") + .rdd.barrier() + .mapPartitions(lambda x: x) + .collect()[0] + ) + return ret[0], ret[1] + + (config, booster) = _run_job() - result_ser_booster = ( - dataset.mapInPandas(_train_booster, schema="booster_bytes binary") - .rdd.barrier() - .mapPartitions(lambda x: x) - .collect()[0][0] - ) result_xgb_model = self._convert_to_sklearn_model( - cloudpickle.loads(result_ser_booster) + bytearray(booster, "utf-8"), + config ) return self._copyValues(self._create_pyspark_model(result_xgb_model)) diff --git a/tests/python/test_spark/test_spark_local.py b/tests/python/test_spark/test_spark_local.py index f57e9b874d8a..3b04a002a1e2 100644 --- a/tests/python/test_spark/test_spark_local.py +++ b/tests/python/test_spark/test_spark_local.py @@ -904,7 +904,8 @@ def test_convert_to_sklearn_model(self): # Check that regardless of what booster, _convert_to_model converts to the correct class type sklearn_classifier = classifier._convert_to_sklearn_model( - clf_model.get_booster() + clf_model.get_booster().save_raw("json"), + clf_model.get_booster().save_config() ) assert isinstance(sklearn_classifier, XGBClassifier) assert sklearn_classifier.n_estimators == 200 @@ -912,7 +913,10 @@ def test_convert_to_sklearn_model(self): assert sklearn_classifier.max_depth == 3 assert sklearn_classifier.get_params()["sketch_eps"] == 0.5 - sklearn_regressor = regressor._convert_to_sklearn_model(reg_model.get_booster()) + sklearn_regressor = regressor._convert_to_sklearn_model( + reg_model.get_booster().save_raw("json"), + reg_model.get_booster().save_config() + ) assert isinstance(sklearn_regressor, XGBRegressor) assert sklearn_regressor.n_estimators == 200 assert sklearn_regressor.missing == 2.0