From 49c9ce05808ee0a41a394128abbb615ab57b7fcc Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Thu, 14 Jul 2022 18:34:05 +0800 Subject: [PATCH 1/3] [PySpark] change the returning model type to string from binary XGBoost pyspark can be can be accelerated by RAPIDS Accelerator seamlessly by changing the returning model type from binary to string. --- python-package/xgboost/spark/core.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index c38fcbffd48a..200e3cd81ab2 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -2,7 +2,6 @@ """Xgboost pyspark integration submodule for core code.""" # pylint: disable=fixme, too-many-ancestors, protected-access, no-member, invalid-name # pylint: disable=too-few-public-methods -import cloudpickle import numpy as np import pandas as pd from scipy.special import expit, softmax # pylint: disable=no-name-in-module @@ -415,7 +414,7 @@ def _convert_to_sklearn_model(self, booster): gen_xgb_sklearn_estimator_param=True ) sklearn_model = self._xgb_cls()(**xgb_sklearn_params) - sklearn_model._Booster = booster + sklearn_model.load_model(booster) return sklearn_model def _query_plan_contains_valid_repartition(self, dataset): @@ -629,16 +628,18 @@ def _train_booster(pandas_df_iter): context.barrier() if context.partitionId() == 0: - yield pd.DataFrame(data={"booster_bytes": [cloudpickle.dumps(booster)]}) + yield pd.DataFrame( + data={"booster_string": [booster.save_raw("json").decode("utf-8")]} + ) result_ser_booster = ( - dataset.mapInPandas(_train_booster, schema="booster_bytes binary") + dataset.mapInPandas(_train_booster, schema="booster_string string") .rdd.barrier() .mapPartitions(lambda x: x) .collect()[0][0] ) result_xgb_model = self._convert_to_sklearn_model( - cloudpickle.loads(result_ser_booster) + bytearray(result_ser_booster, "utf-8") ) return self._copyValues(self._create_pyspark_model(result_xgb_model)) From 19d406fbc84245589e2ebf29c19a6b56eb230d96 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Sun, 17 Jul 2022 12:53:00 +0800 Subject: [PATCH 2/3] fix test --- tests/python/test_spark/test_spark_local.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/python/test_spark/test_spark_local.py b/tests/python/test_spark/test_spark_local.py index f57e9b874d8a..9ddcaefd8081 100644 --- a/tests/python/test_spark/test_spark_local.py +++ b/tests/python/test_spark/test_spark_local.py @@ -904,7 +904,7 @@ def test_convert_to_sklearn_model(self): # Check that regardless of what booster, _convert_to_model converts to the correct class type sklearn_classifier = classifier._convert_to_sklearn_model( - clf_model.get_booster() + clf_model.get_booster().save_raw("json") ) assert isinstance(sklearn_classifier, XGBClassifier) assert sklearn_classifier.n_estimators == 200 @@ -912,7 +912,7 @@ def test_convert_to_sklearn_model(self): assert sklearn_classifier.max_depth == 3 assert sklearn_classifier.get_params()["sketch_eps"] == 0.5 - sklearn_regressor = regressor._convert_to_sklearn_model(reg_model.get_booster()) + sklearn_regressor = regressor._convert_to_sklearn_model(reg_model.get_booster().save_raw("json")) assert isinstance(sklearn_regressor, XGBRegressor) assert sklearn_regressor.n_estimators == 200 assert sklearn_regressor.missing == 2.0 From c8af30ea0a602f7e40311e51ecdbfd98065fb187 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Tue, 19 Jul 2022 12:10:10 +0800 Subject: [PATCH 3/3] return config --- python-package/xgboost/spark/core.py | 28 ++++++++++++++------- tests/python/test_spark/test_spark_local.py | 8 ++++-- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 200e3cd81ab2..bd461d49b5cd 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -409,12 +409,13 @@ def _pyspark_model_cls(cls): def _create_pyspark_model(self, xgb_model): return self._pyspark_model_cls()(xgb_model) - def _convert_to_sklearn_model(self, booster): + def _convert_to_sklearn_model(self, booster: bytearray, config: str): xgb_sklearn_params = self._gen_xgb_params_dict( gen_xgb_sklearn_estimator_param=True ) sklearn_model = self._xgb_cls()(**xgb_sklearn_params) sklearn_model.load_model(booster) + sklearn_model._Booster.load_config(config) return sklearn_model def _query_plan_contains_valid_repartition(self, dataset): @@ -629,17 +630,26 @@ def _train_booster(pandas_df_iter): if context.partitionId() == 0: yield pd.DataFrame( - data={"booster_string": [booster.save_raw("json").decode("utf-8")]} + data={ + "config": [booster.save_config()], + "booster": [booster.save_raw("json").decode("utf-8")] + } ) - result_ser_booster = ( - dataset.mapInPandas(_train_booster, schema="booster_string string") - .rdd.barrier() - .mapPartitions(lambda x: x) - .collect()[0][0] - ) + def _run_job(): + ret = ( + dataset.mapInPandas(_train_booster, schema="config string, booster string") + .rdd.barrier() + .mapPartitions(lambda x: x) + .collect()[0] + ) + return ret[0], ret[1] + + (config, booster) = _run_job() + result_xgb_model = self._convert_to_sklearn_model( - bytearray(result_ser_booster, "utf-8") + bytearray(booster, "utf-8"), + config ) return self._copyValues(self._create_pyspark_model(result_xgb_model)) diff --git a/tests/python/test_spark/test_spark_local.py b/tests/python/test_spark/test_spark_local.py index 9ddcaefd8081..3b04a002a1e2 100644 --- a/tests/python/test_spark/test_spark_local.py +++ b/tests/python/test_spark/test_spark_local.py @@ -904,7 +904,8 @@ def test_convert_to_sklearn_model(self): # Check that regardless of what booster, _convert_to_model converts to the correct class type sklearn_classifier = classifier._convert_to_sklearn_model( - clf_model.get_booster().save_raw("json") + clf_model.get_booster().save_raw("json"), + clf_model.get_booster().save_config() ) assert isinstance(sklearn_classifier, XGBClassifier) assert sklearn_classifier.n_estimators == 200 @@ -912,7 +913,10 @@ def test_convert_to_sklearn_model(self): assert sklearn_classifier.max_depth == 3 assert sklearn_classifier.get_params()["sketch_eps"] == 0.5 - sklearn_regressor = regressor._convert_to_sklearn_model(reg_model.get_booster().save_raw("json")) + sklearn_regressor = regressor._convert_to_sklearn_model( + reg_model.get_booster().save_raw("json"), + reg_model.get_booster().save_config() + ) assert isinstance(sklearn_regressor, XGBRegressor) assert sklearn_regressor.n_estimators == 200 assert sklearn_regressor.missing == 2.0