From 93f05c28a599970d26866829601ca3026e50934c Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Fri, 2 Sep 2022 19:05:14 +0800 Subject: [PATCH 1/5] [pyspark] make the model saved by pyspark be compatible --- python-package/xgboost/spark/model.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/python-package/xgboost/spark/model.py b/python-package/xgboost/spark/model.py index 95a96051a4e6..cd9f1b9fb482 100644 --- a/python-package/xgboost/spark/model.py +++ b/python-package/xgboost/spark/model.py @@ -21,9 +21,9 @@ def _get_or_create_tmp_dir(): return xgb_tmp_dir -def serialize_xgb_model(model): +def dump_model_to_json_file(model) -> str: """ - Serialize the input model to a string. + Dump the input model to a local file in driver and return the path. Parameters ---------- @@ -31,12 +31,10 @@ def serialize_xgb_model(model): an xgboost.XGBModel instance, such as xgboost.XGBClassifier or xgboost.XGBRegressor instance """ - # TODO: change to use string io + # Dump the model to json format tmp_file_name = os.path.join(_get_or_create_tmp_dir(), f"{uuid.uuid4()}.json") model.save_model(tmp_file_name) - with open(tmp_file_name, "r", encoding="utf-8") as f: - ser_model_string = f.read() - return ser_model_string + return tmp_file_name def deserialize_xgb_model(ser_model_string, xgb_model_creator): @@ -222,11 +220,11 @@ def saveImpl(self, path): """ xgb_model = self.instance._xgb_sklearn_model _SparkXGBSharedReadWrite.saveMetadata(self.instance, path, self.sc, self.logger) - model_save_path = os.path.join(path, "model.json") - ser_xgb_model = serialize_xgb_model(xgb_model) - _get_spark_session().createDataFrame( - [(ser_xgb_model,)], ["xgb_sklearn_model"] - ).write.parquet(model_save_path) + model_save_path = os.path.join(path, "model") + xgb_model_file = dump_model_to_json_file(xgb_model) + # The json file written by Spark based on model string generated by + # booster.save_raw("json").decode("utf-8") can't be loaded by XGBoost directly. + _get_spark_session().read.json(xgb_model_file).write.json(model_save_path) class SparkXGBModelReader(MLReader): From 768235a73ef322c5199eed33bc2a546b1c7dcc07 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Mon, 5 Sep 2022 10:48:56 +0800 Subject: [PATCH 2/5] Add load/write and some tests --- python-package/xgboost/spark/model.py | 23 ++++++-------- tests/python/test_spark/test_spark_local.py | 34 +++++++++++++++++---- 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/python-package/xgboost/spark/model.py b/python-package/xgboost/spark/model.py index cd9f1b9fb482..338793ad870a 100644 --- a/python-package/xgboost/spark/model.py +++ b/python-package/xgboost/spark/model.py @@ -37,16 +37,12 @@ def dump_model_to_json_file(model) -> str: return tmp_file_name -def deserialize_xgb_model(ser_model_string, xgb_model_creator): +def deserialize_xgb_model(model_string, xgb_model_creator): """ - Deserialize an xgboost.XGBModel instance from the input ser_model_string. + Deserialize an xgboost.XGBModel instance from the input model_string. """ xgb_model = xgb_model_creator() - # TODO: change to use string io - tmp_file_name = os.path.join(_get_or_create_tmp_dir(), f"{uuid.uuid4()}.json") - with open(tmp_file_name, "w", encoding="utf-8") as f: - f.write(ser_model_string) - xgb_model.load_model(tmp_file_name) + xgb_model.load_model(bytearray(model_string.encode("utf-8"))) return xgb_model @@ -222,9 +218,9 @@ def saveImpl(self, path): _SparkXGBSharedReadWrite.saveMetadata(self.instance, path, self.sc, self.logger) model_save_path = os.path.join(path, "model") xgb_model_file = dump_model_to_json_file(xgb_model) - # The json file written by Spark based on model string generated by - # booster.save_raw("json").decode("utf-8") can't be loaded by XGBoost directly. - _get_spark_session().read.json(xgb_model_file).write.json(model_save_path) + # The json file written by Spark base on `booster.save_raw("json").decode("utf-8")` + # can't be loaded by XGBoost directly. + _get_spark_session().read.text(xgb_model_file).write.text(model_save_path) class SparkXGBModelReader(MLReader): @@ -250,13 +246,12 @@ def load(self, path): xgb_sklearn_params = py_model._gen_xgb_params_dict( gen_xgb_sklearn_estimator_param=True ) - model_load_path = os.path.join(path, "model.json") + model_load_path = os.path.join(path, "model") ser_xgb_model = ( _get_spark_session() - .read.parquet(model_load_path) - .collect()[0] - .xgb_sklearn_model + .read.text(model_load_path) + .collect()[0][0] ) def create_xgb_model(): diff --git a/tests/python/test_spark/test_spark_local.py b/tests/python/test_spark/test_spark_local.py index 58c313ea043f..8161ae552cf5 100644 --- a/tests/python/test_spark/test_spark_local.py +++ b/tests/python/test_spark/test_spark_local.py @@ -2,7 +2,9 @@ import random import sys import uuid +import glob +import xgboost as xgb import numpy as np import pytest import testing as tm @@ -30,7 +32,7 @@ ) from xgboost.spark.core import _non_booster_params -from xgboost import XGBClassifier, XGBRegressor +from xgboost import XGBClassifier, XGBRegressor, XGBModel from .utils import SparkTestCase @@ -62,7 +64,7 @@ def setUp(self): # >>> reg2.fit(X, y) # >>> reg2.predict(X, ntree_limit=5) # array([0.22185266, 0.77814734], dtype=float32) - self.reg_params = {"max_depth": 5, "n_estimators": 10, "ntree_limit": 5} + self.reg_params = {"max_depth": 5, "n_estimators": 10, "ntree_limit": 5, "max_bin": 9} self.reg_df_train = self.session.createDataFrame( [ (Vectors.dense(1.0, 2.0, 3.0), 0), @@ -427,6 +429,15 @@ def setUp(self): def get_local_tmp_dir(self): return self.tempdir + str(uuid.uuid4()) + def assert_model_compatible(self, model: XGBModel, model_path: str): + bst = xgb.Booster() + path = glob.glob(f"{model_path}/**/*.txt", recursive=True)[0] + bst.load_model(path) + # The model is saved by XGBModel which will add an extra scikit_learn attribute + bst.set_attr(scikit_learn=None) + self.assertEqual(model.get_booster().save_raw("json"), bst.save_raw("json")) + + def test_regressor_params_basic(self): py_reg = SparkXGBRegressor() self.assertTrue(hasattr(py_reg, "n_estimators")) @@ -591,7 +602,8 @@ def test_classifier_with_params(self): ) def test_regressor_model_save_load(self): - path = "file:" + self.get_local_tmp_dir() + tmp_dir = self.get_local_tmp_dir() + path = "file:" + tmp_dir regressor = SparkXGBRegressor(**self.reg_params) model = regressor.fit(self.reg_df_train) model.save(path) @@ -611,8 +623,11 @@ def test_regressor_model_save_load(self): with self.assertRaisesRegex(AssertionError, "Expected class name"): SparkXGBClassifierModel.load(path) + self.assert_model_compatible(model, tmp_dir) + def test_classifier_model_save_load(self): - path = "file:" + self.get_local_tmp_dir() + tmp_dir = self.get_local_tmp_dir() + path = "file:" + tmp_dir regressor = SparkXGBClassifier(**self.cls_params) model = regressor.fit(self.cls_df_train) model.save(path) @@ -632,12 +647,15 @@ def test_classifier_model_save_load(self): with self.assertRaisesRegex(AssertionError, "Expected class name"): SparkXGBRegressorModel.load(path) + self.assert_model_compatible(model, tmp_dir) + @staticmethod def _get_params_map(params_kv, estimator): return {getattr(estimator, k): v for k, v in params_kv.items()} def test_regressor_model_pipeline_save_load(self): - path = "file:" + self.get_local_tmp_dir() + tmp_dir = self.get_local_tmp_dir() + path = "file:" + tmp_dir regressor = SparkXGBRegressor() pipeline = Pipeline(stages=[regressor]) pipeline = pipeline.copy(extra=self._get_params_map(self.reg_params, regressor)) @@ -655,9 +673,11 @@ def test_regressor_model_pipeline_save_load(self): row.prediction, row.expected_prediction_with_params, atol=1e-3 ) ) + self.assert_model_compatible(model.stages[0], tmp_dir) def test_classifier_model_pipeline_save_load(self): - path = "file:" + self.get_local_tmp_dir() + tmp_dir = self.get_local_tmp_dir() + path = "file:" + tmp_dir classifier = SparkXGBClassifier() pipeline = Pipeline(stages=[classifier]) pipeline = pipeline.copy( @@ -677,6 +697,8 @@ def test_classifier_model_pipeline_save_load(self): row.probability, row.expected_probability_with_params, atol=1e-3 ) ) + self.assert_model_compatible(model.stages[0], tmp_dir) + def test_classifier_with_cross_validator(self): xgb_classifer = SparkXGBClassifier() From a1e1bc7f13b742bafc0990a8461a6a8108c951d5 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Mon, 5 Sep 2022 20:08:35 +0800 Subject: [PATCH 3/5] format --- python-package/xgboost/spark/model.py | 6 +----- tests/python/test_spark/test_spark_local.py | 16 ++++++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/python-package/xgboost/spark/model.py b/python-package/xgboost/spark/model.py index 338793ad870a..aded379ba2fd 100644 --- a/python-package/xgboost/spark/model.py +++ b/python-package/xgboost/spark/model.py @@ -248,11 +248,7 @@ def load(self, path): ) model_load_path = os.path.join(path, "model") - ser_xgb_model = ( - _get_spark_session() - .read.text(model_load_path) - .collect()[0][0] - ) + ser_xgb_model = _get_spark_session().read.text(model_load_path).collect()[0][0] def create_xgb_model(): return self.cls._xgb_cls()(**xgb_sklearn_params) diff --git a/tests/python/test_spark/test_spark_local.py b/tests/python/test_spark/test_spark_local.py index 8161ae552cf5..bd7c1043c0ea 100644 --- a/tests/python/test_spark/test_spark_local.py +++ b/tests/python/test_spark/test_spark_local.py @@ -1,14 +1,15 @@ +import glob import logging import random import sys import uuid -import glob -import xgboost as xgb import numpy as np import pytest import testing as tm +import xgboost as xgb + if tm.no_spark()["condition"]: pytest.skip(msg=tm.no_spark()["reason"], allow_module_level=True) if sys.platform.startswith("win") or sys.platform.startswith("darwin"): @@ -32,7 +33,7 @@ ) from xgboost.spark.core import _non_booster_params -from xgboost import XGBClassifier, XGBRegressor, XGBModel +from xgboost import XGBClassifier, XGBModel, XGBRegressor from .utils import SparkTestCase @@ -64,7 +65,12 @@ def setUp(self): # >>> reg2.fit(X, y) # >>> reg2.predict(X, ntree_limit=5) # array([0.22185266, 0.77814734], dtype=float32) - self.reg_params = {"max_depth": 5, "n_estimators": 10, "ntree_limit": 5, "max_bin": 9} + self.reg_params = { + "max_depth": 5, + "n_estimators": 10, + "ntree_limit": 5, + "max_bin": 9, + } self.reg_df_train = self.session.createDataFrame( [ (Vectors.dense(1.0, 2.0, 3.0), 0), @@ -437,7 +443,6 @@ def assert_model_compatible(self, model: XGBModel, model_path: str): bst.set_attr(scikit_learn=None) self.assertEqual(model.get_booster().save_raw("json"), bst.save_raw("json")) - def test_regressor_params_basic(self): py_reg = SparkXGBRegressor() self.assertTrue(hasattr(py_reg, "n_estimators")) @@ -699,7 +704,6 @@ def test_classifier_model_pipeline_save_load(self): ) self.assert_model_compatible(model.stages[0], tmp_dir) - def test_classifier_with_cross_validator(self): xgb_classifer = SparkXGBClassifier() paramMaps = ParamGridBuilder().addGrid(xgb_classifer.max_depth, [1, 2]).build() From 5a37c546539903f8b32cfab5ac43c7df2105da5c Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Thu, 15 Sep 2022 10:07:33 +0800 Subject: [PATCH 4/5] resolve comments --- python-package/xgboost/spark/model.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python-package/xgboost/spark/model.py b/python-package/xgboost/spark/model.py index aded379ba2fd..49aca6196c31 100644 --- a/python-package/xgboost/spark/model.py +++ b/python-package/xgboost/spark/model.py @@ -21,9 +21,9 @@ def _get_or_create_tmp_dir(): return xgb_tmp_dir -def dump_model_to_json_file(model) -> str: +def save_model_to_json_file(model) -> str: """ - Dump the input model to a local file in driver and return the path. + Save the input model to a local file in driver side and return the path. Parameters ---------- @@ -31,7 +31,7 @@ def dump_model_to_json_file(model) -> str: an xgboost.XGBModel instance, such as xgboost.XGBClassifier or xgboost.XGBRegressor instance """ - # Dump the model to json format + # Save the model to json format tmp_file_name = os.path.join(_get_or_create_tmp_dir(), f"{uuid.uuid4()}.json") model.save_model(tmp_file_name) return tmp_file_name @@ -217,7 +217,7 @@ def saveImpl(self, path): xgb_model = self.instance._xgb_sklearn_model _SparkXGBSharedReadWrite.saveMetadata(self.instance, path, self.sc, self.logger) model_save_path = os.path.join(path, "model") - xgb_model_file = dump_model_to_json_file(xgb_model) + xgb_model_file = save_model_to_json_file(xgb_model) # The json file written by Spark base on `booster.save_raw("json").decode("utf-8")` # can't be loaded by XGBoost directly. _get_spark_session().read.text(xgb_model_file).write.text(model_save_path) From fdbfa22dcc1031075a639104e4c77f387827ab21 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Fri, 16 Sep 2022 06:08:56 +0800 Subject: [PATCH 5/5] use RDD to save json model --- python-package/xgboost/spark/model.py | 28 ++++++--------------- tests/python/test_spark/test_spark_local.py | 4 +-- 2 files changed, 8 insertions(+), 24 deletions(-) diff --git a/python-package/xgboost/spark/model.py b/python-package/xgboost/spark/model.py index 49aca6196c31..6b050a468357 100644 --- a/python-package/xgboost/spark/model.py +++ b/python-package/xgboost/spark/model.py @@ -21,22 +21,6 @@ def _get_or_create_tmp_dir(): return xgb_tmp_dir -def save_model_to_json_file(model) -> str: - """ - Save the input model to a local file in driver side and return the path. - - Parameters - ---------- - model: - an xgboost.XGBModel instance, such as - xgboost.XGBClassifier or xgboost.XGBRegressor instance - """ - # Save the model to json format - tmp_file_name = os.path.join(_get_or_create_tmp_dir(), f"{uuid.uuid4()}.json") - model.save_model(tmp_file_name) - return tmp_file_name - - def deserialize_xgb_model(model_string, xgb_model_creator): """ Deserialize an xgboost.XGBModel instance from the input model_string. @@ -217,10 +201,10 @@ def saveImpl(self, path): xgb_model = self.instance._xgb_sklearn_model _SparkXGBSharedReadWrite.saveMetadata(self.instance, path, self.sc, self.logger) model_save_path = os.path.join(path, "model") - xgb_model_file = save_model_to_json_file(xgb_model) - # The json file written by Spark base on `booster.save_raw("json").decode("utf-8")` - # can't be loaded by XGBoost directly. - _get_spark_session().read.text(xgb_model_file).write.text(model_save_path) + booster = xgb_model.get_booster().save_raw("json").decode("utf-8") + _get_spark_session().sparkContext.parallelize([booster], 1).saveAsTextFile( + model_save_path + ) class SparkXGBModelReader(MLReader): @@ -248,7 +232,9 @@ def load(self, path): ) model_load_path = os.path.join(path, "model") - ser_xgb_model = _get_spark_session().read.text(model_load_path).collect()[0][0] + ser_xgb_model = ( + _get_spark_session().sparkContext.textFile(model_load_path).collect()[0] + ) def create_xgb_model(): return self.cls._xgb_cls()(**xgb_sklearn_params) diff --git a/tests/python/test_spark/test_spark_local.py b/tests/python/test_spark/test_spark_local.py index bd7c1043c0ea..0e5bded06c96 100644 --- a/tests/python/test_spark/test_spark_local.py +++ b/tests/python/test_spark/test_spark_local.py @@ -437,10 +437,8 @@ def get_local_tmp_dir(self): def assert_model_compatible(self, model: XGBModel, model_path: str): bst = xgb.Booster() - path = glob.glob(f"{model_path}/**/*.txt", recursive=True)[0] + path = glob.glob(f"{model_path}/**/model/part-00000", recursive=True)[0] bst.load_model(path) - # The model is saved by XGBModel which will add an extra scikit_learn attribute - bst.set_attr(scikit_learn=None) self.assertEqual(model.get_booster().save_raw("json"), bst.save_raw("json")) def test_regressor_params_basic(self):