From 0bc92f5516c485aed72ca12b1512451c505685e9 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Sun, 14 Aug 2022 12:00:20 +0800 Subject: [PATCH 01/11] init --- python-package/xgboost/spark/core.py | 12 ++++++++++++ python-package/xgboost/spark/estimator.py | 7 +++++++ 2 files changed, 19 insertions(+) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index add414c8746c..314f501fbae3 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -267,6 +267,18 @@ def _validate_params(self): "If features_cols param set, then features_col param is ignored." ) + if self.getOrDefault(self.objective) is not None: + if not isinstance(self.getOrDefault(self.objective), str): + raise ValueError( + "Only string type 'objective' param is allowed." + ) + + if self.getOrDefault(self.eval_metric) is not None: + if not isinstance(self.getOrDefault(self.eval_metric), str): + raise ValueError( + "Only string type 'eval_metric' param is allowed." + ) + if self.getOrDefault(self.use_gpu): tree_method = self.getParam("tree_method") if ( diff --git a/python-package/xgboost/spark/estimator.py b/python-package/xgboost/spark/estimator.py index 3e7c5fdf65a3..ad04b26488de 100644 --- a/python-package/xgboost/spark/estimator.py +++ b/python-package/xgboost/spark/estimator.py @@ -213,5 +213,12 @@ def _xgb_cls(cls): def _pyspark_model_cls(cls): return SparkXGBClassifierModel + def _validate_params(self): + super(SparkXGBClassifier, self)._validate_params() + if self.isDefined(self.objective): + raise ValueError( + "Setting custom 'objective' param is not allowed in 'SparkXGBClassifier'." + ) + _set_pyspark_xgb_cls_param_attrs(SparkXGBClassifier, SparkXGBClassifierModel) From 1cd06ac2e25a2e820b967131ccf5672830d2abe0 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Sun, 14 Aug 2022 12:18:17 +0800 Subject: [PATCH 02/11] update --- python-package/xgboost/spark/core.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 314f501fbae3..2d028d1cb286 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -101,10 +101,11 @@ _unsupported_fit_params = { "sample_weight", # Supported by spark param weightCol - # Supported by spark param weightCol # and validationIndicatorCol - "eval_set", - "sample_weight_eval_set", + "eval_set", # Supported by spark param validation_indicator_col + "sample_weight_eval_set", # Supported by spark param weight_col + validation_indicator_col "base_margin", # Supported by spark param base_margin_col + "base_margin_eval_set", # Supported by spark param base_margin_col + validation_indicator_col + "feature_weights", # Use constructor feature_weights param instead } _unsupported_predict_params = { @@ -591,6 +592,8 @@ def _fit(self, dataset): booster_params, train_call_kwargs_params = self._get_xgb_train_call_args( train_params ) + print("DBG: booster param:") + print(booster_params) cpu_per_task = int( _get_spark_session().sparkContext.getConf().get("spark.task.cpus", "1") From 509ad6c69c2b190357e993c3d0a4c2456a4f0e14 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Sun, 14 Aug 2022 12:21:56 +0800 Subject: [PATCH 03/11] update --- python-package/xgboost/spark/core.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 2d028d1cb286..fbedb91ce143 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -105,7 +105,6 @@ "sample_weight_eval_set", # Supported by spark param weight_col + validation_indicator_col "base_margin", # Supported by spark param base_margin_col "base_margin_eval_set", # Supported by spark param base_margin_col + validation_indicator_col - "feature_weights", # Use constructor feature_weights param instead } _unsupported_predict_params = { @@ -531,6 +530,11 @@ def _get_xgb_train_call_args(cls, train_params): kwargs_params[key] = value else: booster_params[key] = value + + booster_params = { + k: v + for k, v in booster_params.items() if k not in _non_booster_params + } return booster_params, kwargs_params def _fit(self, dataset): From 08daaa9f4949be9301d91aef1b2322f724bc885d Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Sun, 14 Aug 2022 12:23:27 +0800 Subject: [PATCH 04/11] clean --- python-package/xgboost/spark/core.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index fbedb91ce143..fb989cac69fd 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -596,8 +596,6 @@ def _fit(self, dataset): booster_params, train_call_kwargs_params = self._get_xgb_train_call_args( train_params ) - print("DBG: booster param:") - print(booster_params) cpu_per_task = int( _get_spark_session().sparkContext.getConf().get("spark.task.cpus", "1") From f6aad79bfa59e4349d23ba443e93d23e2389f778 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Sun, 14 Aug 2022 12:29:13 +0800 Subject: [PATCH 05/11] update Signed-off-by: Weichen Xu --- python-package/xgboost/spark/estimator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python-package/xgboost/spark/estimator.py b/python-package/xgboost/spark/estimator.py index ad04b26488de..b05b007e74ba 100644 --- a/python-package/xgboost/spark/estimator.py +++ b/python-package/xgboost/spark/estimator.py @@ -215,7 +215,7 @@ def _pyspark_model_cls(cls): def _validate_params(self): super(SparkXGBClassifier, self)._validate_params() - if self.isDefined(self.objective): + if self.getOrDefault(self.objective) is not None: raise ValueError( "Setting custom 'objective' param is not allowed in 'SparkXGBClassifier'." ) From 7ca49d4483dbb80a21999fa739d5a1a3aa485406 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Sun, 14 Aug 2022 12:47:49 +0800 Subject: [PATCH 06/11] update Signed-off-by: Weichen Xu --- python-package/xgboost/spark/core.py | 17 ++++++++++------- python-package/xgboost/spark/estimator.py | 7 ++++++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index fb989cac69fd..d8fea042cf6e 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -504,14 +504,17 @@ def _get_distributed_train_params(self, dataset): params.update(fit_params) params["verbose_eval"] = verbose_eval classification = self._xgb_cls() == XGBClassifier - num_classes = int(dataset.select(countDistinct(alias.label)).collect()[0][0]) - if classification and num_classes == 2: - params["objective"] = "binary:logistic" - elif classification and num_classes > 2: - params["objective"] = "multi:softprob" - params["num_class"] = num_classes + if classification: + num_classes = int(dataset.select(countDistinct(alias.label)).collect()[0][0]) + if num_classes <= 2: + params["objective"] = "binary:logistic" + else: + params["objective"] = "multi:softprob" + params["num_class"] = num_classes else: - params["objective"] = "reg:squarederror" + # use user specified objective or default objective. + # e.g., the default objective for Regressor is 'reg:squarederror' + params["objective"] = self.getOrDefault(self.objective) # TODO: support "num_parallel_tree" for random forest params["num_boost_round"] = self.getOrDefault(self.n_estimators) diff --git a/python-package/xgboost/spark/estimator.py b/python-package/xgboost/spark/estimator.py index b05b007e74ba..59564fc08a8d 100644 --- a/python-package/xgboost/spark/estimator.py +++ b/python-package/xgboost/spark/estimator.py @@ -203,6 +203,11 @@ class SparkXGBClassifier(_SparkXGBEstimator, HasProbabilityCol, HasRawPrediction def __init__(self, **kwargs): super().__init__() + # The default 'objective' param value comes from sklearn `XGBClassifier` ctor, + # but in pyspark we will automatically set objective param depending on + # binary or multinomial input dataset, and we need to remove the fixed default + # param value as well to avoid causing ambiguity. + self._setDefault(objective=None) self.setParams(**kwargs) @classmethod @@ -215,7 +220,7 @@ def _pyspark_model_cls(cls): def _validate_params(self): super(SparkXGBClassifier, self)._validate_params() - if self.getOrDefault(self.objective) is not None: + if self.isDefined(self.objective): raise ValueError( "Setting custom 'objective' param is not allowed in 'SparkXGBClassifier'." ) From fa064e3d554263e4042582becb3495b6015eff4a Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Sun, 14 Aug 2022 12:51:24 +0800 Subject: [PATCH 07/11] update Signed-off-by: Weichen Xu --- python-package/xgboost/spark/estimator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python-package/xgboost/spark/estimator.py b/python-package/xgboost/spark/estimator.py index 59564fc08a8d..a8fcddfc6db7 100644 --- a/python-package/xgboost/spark/estimator.py +++ b/python-package/xgboost/spark/estimator.py @@ -220,7 +220,7 @@ def _pyspark_model_cls(cls): def _validate_params(self): super(SparkXGBClassifier, self)._validate_params() - if self.isDefined(self.objective): + if self.getOrDefault(self.objective): raise ValueError( "Setting custom 'objective' param is not allowed in 'SparkXGBClassifier'." ) From c6f157a13b73eff4aefe8c23813e25df566c3695 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Sun, 14 Aug 2022 13:06:58 +0800 Subject: [PATCH 08/11] update --- tests/python/test_spark/test_spark_local.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/python/test_spark/test_spark_local.py b/tests/python/test_spark/test_spark_local.py index c6b612b7aec8..a8d3592ab483 100644 --- a/tests/python/test_spark/test_spark_local.py +++ b/tests/python/test_spark/test_spark_local.py @@ -390,6 +390,7 @@ def test_regressor_params_basic(self): self.assertEqual(py_reg.n_estimators.parent, py_reg.uid) self.assertFalse(hasattr(py_reg, "gpu_id")) self.assertEqual(py_reg.getOrDefault(py_reg.n_estimators), 100) + self.assertEqual(py_reg.getOrDefault(py_reg.objective), "reg:squarederror") py_reg2 = SparkXGBRegressor(n_estimators=200) self.assertEqual(py_reg2.getOrDefault(py_reg2.n_estimators), 200) py_reg3 = py_reg2.copy({py_reg2.max_depth: 10}) @@ -402,6 +403,7 @@ def test_classifier_params_basic(self): self.assertEqual(py_cls.n_estimators.parent, py_cls.uid) self.assertFalse(hasattr(py_cls, "gpu_id")) self.assertEqual(py_cls.getOrDefault(py_cls.n_estimators), 100) + self.assertEqual(py_cls.getOrDefault(py_cls.objective), None) py_cls2 = SparkXGBClassifier(n_estimators=200) self.assertEqual(py_cls2.getOrDefault(py_cls2.n_estimators), 200) py_cls3 = py_cls2.copy({py_cls2.max_depth: 10}) From 68c7494c0f61a6d84e17d8846afd6bd5f43d4212 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Tue, 23 Aug 2022 16:49:04 +0800 Subject: [PATCH 09/11] update Signed-off-by: Weichen Xu --- python-package/xgboost/spark/core.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index dc80f09a41fd..830704c366b6 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -357,7 +357,14 @@ def _validate_params(self): ) else: # checking spark non-local mode. - if not gpu_per_task or int(gpu_per_task) < 1: + if not gpu_per_task: + # For spark cluster with GPU config, the "spark.task.resource.gpu.amount" + # config might not be set. In this case, the default gpu per task is 1. + # If running on no GPU available, in spark task calling `_get_gpu_id` will + # raise gpu not found error. + gpu_per_task = "1" + + if int(gpu_per_task) < 1: raise RuntimeError( "The spark cluster does not have the necessary GPU" + "configuration for the spark task. Therefore, we cannot" From c5106c3041c4a9130820a8cdcd7082a33179ae73 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 23 Aug 2022 09:10:59 +0000 Subject: [PATCH 10/11] fix format Signed-off-by: Ubuntu --- python-package/xgboost/spark/core.py | 15 ++++++--------- python-package/xgboost/spark/estimator.py | 3 +-- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 830704c366b6..559c216d770f 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -289,15 +289,11 @@ def _validate_params(self): if self.getOrDefault(self.objective) is not None: if not isinstance(self.getOrDefault(self.objective), str): - raise ValueError( - "Only string type 'objective' param is allowed." - ) + raise ValueError("Only string type 'objective' param is allowed.") if self.getOrDefault(self.eval_metric) is not None: if not isinstance(self.getOrDefault(self.eval_metric), str): - raise ValueError( - "Only string type 'eval_metric' param is allowed." - ) + raise ValueError("Only string type 'eval_metric' param is allowed.") if self.getOrDefault(self.enable_sparse_data_optim): if self.getOrDefault(self.missing) != 0.0: @@ -598,7 +594,9 @@ def _get_distributed_train_params(self, dataset): params["verbose_eval"] = verbose_eval classification = self._xgb_cls() == XGBClassifier if classification: - num_classes = int(dataset.select(countDistinct(alias.label)).collect()[0][0]) + num_classes = int( + dataset.select(countDistinct(alias.label)).collect()[0][0] + ) if num_classes <= 2: params["objective"] = "binary:logistic" else: @@ -628,8 +626,7 @@ def _get_xgb_train_call_args(cls, train_params): booster_params[key] = value booster_params = { - k: v - for k, v in booster_params.items() if k not in _non_booster_params + k: v for k, v in booster_params.items() if k not in _non_booster_params } return booster_params, kwargs_params diff --git a/python-package/xgboost/spark/estimator.py b/python-package/xgboost/spark/estimator.py index a485852bf8e1..7336b597167a 100644 --- a/python-package/xgboost/spark/estimator.py +++ b/python-package/xgboost/spark/estimator.py @@ -232,8 +232,7 @@ def _validate_params(self): raise ValueError( "Spark Xgboost classifier estimator does not support `qid_col` param." ) - super(SparkXGBClassifier, self)._validate_params() - if self.getOrDefault(self.objective): + if self.getOrDefault(self.objective): # pylint: disable=no-member raise ValueError( "Setting custom 'objective' param is not allowed in 'SparkXGBClassifier'." ) From 13ef73da6d085cdcf2ec26cb2bc02c94096bac24 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Tue, 23 Aug 2022 21:59:58 +0800 Subject: [PATCH 11/11] update Signed-off-by: Weichen Xu --- python-package/xgboost/spark/core.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 559c216d770f..ca1acdeaebb6 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -353,14 +353,7 @@ def _validate_params(self): ) else: # checking spark non-local mode. - if not gpu_per_task: - # For spark cluster with GPU config, the "spark.task.resource.gpu.amount" - # config might not be set. In this case, the default gpu per task is 1. - # If running on no GPU available, in spark task calling `_get_gpu_id` will - # raise gpu not found error. - gpu_per_task = "1" - - if int(gpu_per_task) < 1: + if not gpu_per_task or int(gpu_per_task) < 1: raise RuntimeError( "The spark cluster does not have the necessary GPU" + "configuration for the spark task. Therefore, we cannot"