From a74b4727a5429771859552173b787a8af42ec0a7 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Mon, 25 Jul 2022 11:22:44 +0800 Subject: [PATCH 01/18] [pyspark] support a list of feature column names --- python-package/xgboost/spark/core.py | 138 ++++++++++++++++++++----- python-package/xgboost/spark/params.py | 21 ++++ 2 files changed, 136 insertions(+), 23 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 9cf5abab90d7..16394cfd696d 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -2,7 +2,7 @@ """Xgboost pyspark integration submodule for core code.""" # pylint: disable=fixme, too-many-ancestors, protected-access, no-member, invalid-name # pylint: disable=too-few-public-methods -from typing import Iterator, Tuple +from typing import Iterator, Tuple, Optional import numpy as np import pandas as pd @@ -28,6 +28,7 @@ IntegerType, LongType, ShortType, + IntegralType, ) from scipy.special import expit, softmax # pylint: disable=no-name-in-module from xgboost.core import Booster @@ -43,7 +44,7 @@ SparkXGBReader, SparkXGBWriter, ) -from .params import HasArbitraryParamsDict, HasBaseMarginCol +from .params import HasArbitraryParamsDict, HasBaseMarginCol, HasFeaturesCols from .utils import ( RabitContext, _get_args_from_message_list, @@ -73,6 +74,7 @@ "num_workers", "use_gpu", "feature_names", + "features_cols", ] _non_booster_params = [ @@ -126,6 +128,7 @@ class _SparkXGBParams( HasValidationIndicatorCol, HasArbitraryParamsDict, HasBaseMarginCol, + HasFeaturesCols, ): num_workers = Param( Params._dummy(), @@ -262,6 +265,14 @@ def _validate_params(self): "Therefore, that parameter will be ignored." ) + if ( + self.getOrDefault(self.features_cols) + and not self.getOrDefault(self.use_gpu) + ): + raise ValueError( + "XGBoost accepts a list of feature column names only when use_gpu is enabled." + ) + if self.getOrDefault(self.use_gpu): tree_method = self.getParam("tree_method") if ( @@ -315,6 +326,21 @@ def _validate_params(self): ) +def _validate_and_convert_feature_col_as_float_col(dataset, features_col_name: list) -> list: + """feature column names must be IntegralType or float or double types""" + feature_cols = [] + for c in features_col_name: + if isinstance(dataset.schema[c].dataType, DoubleType): + feature_cols.append(col(c).cast(FloatType()).alias(c)) + elif isinstance(dataset.schema[c].dataType, (FloatType, IntegralType)): + feature_cols.append(col(c)) + else: + raise ValueError( + "Feature column must be integral types or float/double types" + ) + return feature_cols + + def _validate_and_convert_feature_col_as_array_col(dataset, features_col_name): features_col_datatype = dataset.schema[features_col_name].dataType features_col = col(features_col_name) @@ -341,6 +367,20 @@ def _validate_and_convert_feature_col_as_array_col(dataset, features_col_name): return features_array_col +def _validate_and_convert_feature_col(dataset, feature_names: list, feature_name: str) -> list: + """XGBoost model trained with features_cols parameter is also can transform + vector or array feature type. But we first check features_cols and then check + featuresCol""" + if ( + len(feature_names) > + 0 >= + len([c for c in feature_names if c not in dataset.columns]) + ): + return _validate_and_convert_feature_col_as_float_col(dataset, feature_names) + + return [_validate_and_convert_feature_col_as_array_col(dataset, feature_name)] + + class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable): def __init__(self): super().__init__() @@ -373,8 +413,15 @@ def setParams(self, **kwargs): # pylint: disable=invalid-name f"Please use param name {_inverse_pyspark_param_alias_map[k]} instead." ) if k in _pyspark_param_alias_map: - real_k = _pyspark_param_alias_map[k] - k = real_k + if ( + k == _inverse_pyspark_param_alias_map[self.featuresCol.name] + and isinstance(v, list) + ): + real_k = self.features_cols.name + k = real_k + else: + real_k = _pyspark_param_alias_map[k] + k = real_k if self.hasParam(k): self._set(**{str(k): v}) @@ -497,10 +544,19 @@ def _fit(self, dataset): self._validate_params() label_col = col(self.getOrDefault(self.labelCol)).alias(alias.label) - features_array_col = _validate_and_convert_feature_col_as_array_col( - dataset, self.getOrDefault(self.featuresCol) - ) - select_cols = [features_array_col, label_col] + select_cols = [label_col] + features_cols_names=None + if len(self.getOrDefault(self.features_cols)): + features_cols_names = self.getOrDefault(self.features_cols) + features_cols = _validate_and_convert_feature_col_as_float_col( + dataset, features_cols_names + ) + select_cols.extend(features_cols) + else: + features_array_col = _validate_and_convert_feature_col_as_array_col( + dataset, self.getOrDefault(self.featuresCol) + ) + select_cols.append(features_array_col) if self.isDefined(self.weightCol) and self.getOrDefault(self.weightCol): select_cols.append( @@ -584,7 +640,7 @@ def _train_booster(pandas_df_iter): with RabitContext(_rabit_args, context): dtrain, dvalid = create_dmatrix_from_partitions( pandas_df_iter, - None, + features_cols_names, dmatrix_kwargs, ) if dvalid is not None: @@ -685,6 +741,38 @@ def read(cls): def _transform(self, dataset): raise NotImplementedError() + def _get_feature_col(self, dataset) -> (list, Optional[list]): + """XGBoost model trained with features_cols parameter can also predict + vector or array feature type. But frist we need to check features_cols + and then featuresCol + """ + + feature_col_names = self.getOrDefault(self.features_cols) + features_col = [] + if ( + len(feature_col_names) > + 0 >= + len([c for c in feature_col_names if c not in dataset.columns]) + ): + # The model is trained with features_cols and the predicted dataset + # also contains all the columns specified by features_cols. + features_col = _validate_and_convert_feature_col_as_float_col( + dataset, + feature_col_names) + else: + # 1. The model was trained by features_cols, but the dataset doesn't contain + # all the columns specified by features_cols, so we need to check if + # the dataframe has the featuresCol + # 2. The model was trained by featuresCol, and the predicted dataset must contain + # featuresCol column. + feature_col_names = None + features_col.append( + _validate_and_convert_feature_col_as_array_col( + dataset, + self.getOrDefault(self.featuresCol)) + ) + return features_col, feature_col_names + class SparkXGBRegressorModel(_SparkXGBModel): """ @@ -712,11 +800,17 @@ def _transform(self, dataset): alias.margin ) + features_col, feature_col_names = self._get_feature_col(dataset) + @pandas_udf("double") def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]: model = xgb_sklearn_model for data in iterator: - X = stack_series(data[alias.data]) + if feature_col_names is not None: + X = data[feature_col_names] + else: + X = stack_series(data[alias.data]) + if has_base_margin: base_margin = data[alias.margin].to_numpy() else: @@ -730,14 +824,10 @@ def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]: ) yield pd.Series(preds) - features_col = _validate_and_convert_feature_col_as_array_col( - dataset, self.getOrDefault(self.featuresCol) - ) - if has_base_margin: - pred_col = predict_udf(struct(features_col, base_margin_col)) + pred_col = predict_udf(struct(*features_col, base_margin_col)) else: - pred_col = predict_udf(struct(features_col)) + pred_col = predict_udf(struct(*features_col)) predictionColName = self.getOrDefault(self.predictionCol) @@ -783,6 +873,8 @@ def transform_margin(margins: np.ndarray): class_probs = softmax(raw_preds, axis=1) return raw_preds, class_probs + features_col, feature_col_names = self._get_feature_col(dataset) + @pandas_udf( "rawPrediction array, prediction double, probability array" ) @@ -791,7 +883,11 @@ def predict_udf( ) -> Iterator[pd.DataFrame]: model = xgb_sklearn_model for data in iterator: - X = stack_series(data[alias.data]) + if feature_col_names is not None: + X = data[feature_col_names] + else: + X = stack_series(data[alias.data]) + if has_base_margin: base_margin = stack_series(data[alias.margin]) else: @@ -817,14 +913,10 @@ def predict_udf( } ) - features_col = _validate_and_convert_feature_col_as_array_col( - dataset, self.getOrDefault(self.featuresCol) - ) - if has_base_margin: - pred_struct = predict_udf(struct(features_col, base_margin_col)) + pred_struct = predict_udf(struct(*features_col, base_margin_col)) else: - pred_struct = predict_udf(struct(features_col)) + pred_struct = predict_udf(struct(*features_col)) pred_struct_col = "_prediction_struct" diff --git a/python-package/xgboost/spark/params.py b/python-package/xgboost/spark/params.py index 9528eb69dd70..3b1526fcdd25 100644 --- a/python-package/xgboost/spark/params.py +++ b/python-package/xgboost/spark/params.py @@ -1,6 +1,7 @@ # type: ignore """Xgboost pyspark integration submodule for params.""" # pylint: disable=too-few-public-methods +from pyspark.ml.param import TypeConverters from pyspark.ml.param.shared import Param, Params @@ -31,3 +32,23 @@ class HasBaseMarginCol(Params): "base_margin_col", "This stores the name for the column of the base margin", ) + + +class HasFeaturesCols(Params): + """ + Mixin for param featuresCols: a list of feature column names. + This parameter is taken effect only when use_gpu is enabled. + """ + + features_cols = ( + Param( + Params._dummy(), + "features_cols", + "feature column names.", + typeConverter=TypeConverters.toListString + ) + ) + + def __init__(self): + super().__init__() + self._setDefault(features_cols=[]) From dbfdf04c3b950380ae9d65e196ae2f30cb5c2fb1 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Wed, 27 Jul 2022 10:32:46 +0800 Subject: [PATCH 02/18] add tests --- .../test_spark_with_gpu.py | 107 +++++++++++++++--- 1 file changed, 94 insertions(+), 13 deletions(-) diff --git a/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py b/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py index ab6faed2c41b..9bc717133c2e 100644 --- a/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py +++ b/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py @@ -17,23 +17,28 @@ from pyspark.ml.linalg import Vectors from xgboost.spark import SparkXGBRegressor, SparkXGBClassifier +gpu_discovery_script_path = "tests/python-gpu/test_spark_with_gpu/discover_gpu.sh" +executor_gpu_amount = 4 +executor_cores = 4 +num_workers = executor_gpu_amount + @pytest.fixture(scope="module", autouse=True) def spark_session_with_gpu(): spark_config = { - "spark.master": "local-cluster[1, 4, 1024]", + "spark.master": f"local-cluster[1, {executor_gpu_amount}, 1024]", "spark.python.worker.reuse": "false", "spark.driver.host": "127.0.0.1", "spark.task.maxFailures": "1", "spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled": "false", "spark.sql.pyspark.jvmStacktrace.enabled": "true", - "spark.cores.max": "4", + "spark.cores.max": executor_cores, "spark.task.cpus": "1", - "spark.executor.cores": "4", - "spark.worker.resource.gpu.amount": "4", + "spark.executor.cores": executor_cores, + "spark.worker.resource.gpu.amount": executor_gpu_amount, "spark.task.resource.gpu.amount": "1", - "spark.executor.resource.gpu.amount": "4", - "spark.worker.resource.gpu.discoveryScript": "tests/python-gpu/test_spark_with_gpu/discover_gpu.sh", + "spark.executor.resource.gpu.amount": executor_gpu_amount, + "spark.worker.resource.gpu.discoveryScript": gpu_discovery_script_path, } builder = SparkSession.builder.appName("xgboost spark python API Tests with GPU") for k, v in spark_config.items(): @@ -41,7 +46,7 @@ def spark_session_with_gpu(): spark = builder.getOrCreate() logging.getLogger("pyspark").setLevel(logging.INFO) # We run a dummy job so that we block until the workers have connected to the master - spark.sparkContext.parallelize(range(4), 4).barrier().mapPartitions( + spark.sparkContext.parallelize(range(num_workers), num_workers).barrier().mapPartitions( lambda _: [] ).collect() yield spark @@ -57,18 +62,39 @@ def spark_iris_dataset(spark_session_with_gpu): for features, label in zip(data.data[0::2], data.target[0::2]) ] train_df = spark.createDataFrame( - spark.sparkContext.parallelize(train_rows, 4), ["features", "label"] + spark.sparkContext.parallelize(train_rows, num_workers), ["features", "label"] ) test_rows = [ (Vectors.dense(features), float(label)) for features, label in zip(data.data[1::2], data.target[1::2]) ] test_df = spark.createDataFrame( - spark.sparkContext.parallelize(test_rows, 4), ["features", "label"] + spark.sparkContext.parallelize(test_rows, num_workers), ["features", "label"] ) return train_df, test_df +@pytest.fixture +def spark_iris_dataset_feature_cols(spark_session_with_gpu): + spark = spark_session_with_gpu + data = sklearn.datasets.load_iris() + train_rows = [ + (*features.tolist(), float(label)) + for features, label in zip(data.data[0::2], data.target[0::2]) + ] + train_df = spark.createDataFrame( + spark.sparkContext.parallelize(train_rows, num_workers), [*data.feature_names, "label"] + ) + test_rows = [ + (*features.tolist(), float(label)) + for features, label in zip(data.data[1::2], data.target[1::2]) + ] + test_df = spark.createDataFrame( + spark.sparkContext.parallelize(test_rows, num_workers), [*data.feature_names, "label"] + ) + return train_df, test_df, data.feature_names + + @pytest.fixture def spark_diabetes_dataset(spark_session_with_gpu): spark = spark_session_with_gpu @@ -78,24 +104,45 @@ def spark_diabetes_dataset(spark_session_with_gpu): for features, label in zip(data.data[0::2], data.target[0::2]) ] train_df = spark.createDataFrame( - spark.sparkContext.parallelize(train_rows, 4), ["features", "label"] + spark.sparkContext.parallelize(train_rows, num_workers), ["features", "label"] ) test_rows = [ (Vectors.dense(features), float(label)) for features, label in zip(data.data[1::2], data.target[1::2]) ] test_df = spark.createDataFrame( - spark.sparkContext.parallelize(test_rows, 4), ["features", "label"] + spark.sparkContext.parallelize(test_rows, num_workers), ["features", "label"] ) return train_df, test_df +@pytest.fixture +def spark_diabetes_dataset_feature_cols(spark_session_with_gpu): + spark = spark_session_with_gpu + data = sklearn.datasets.load_diabetes() + train_rows = [ + (*features.tolist(), float(label)) + for features, label in zip(data.data[0::2], data.target[0::2]) + ] + train_df = spark.createDataFrame( + spark.sparkContext.parallelize(train_rows, num_workers), [*data.feature_names, "label"] + ) + test_rows = [ + (*features.tolist(), float(label)) + for features, label in zip(data.data[1::2], data.target[1::2]) + ] + test_df = spark.createDataFrame( + spark.sparkContext.parallelize(test_rows, num_workers), [*data.feature_names, "label"] + ) + return train_df, test_df, data.feature_names + + def test_sparkxgb_classifier_with_gpu(spark_iris_dataset): from pyspark.ml.evaluation import MulticlassClassificationEvaluator classifier = SparkXGBClassifier( use_gpu=True, - num_workers=4, + num_workers=num_workers, ) train_df, test_df = spark_iris_dataset model = classifier.fit(train_df) @@ -105,12 +152,30 @@ def test_sparkxgb_classifier_with_gpu(spark_iris_dataset): assert f1 >= 0.97 +def test_sparkxgb_classifier_feature_cols_with_gpu(spark_iris_dataset_feature_cols): + from pyspark.ml.evaluation import MulticlassClassificationEvaluator + + train_df, test_df, feature_names = spark_iris_dataset_feature_cols + + classifier = SparkXGBClassifier( + features_col=feature_names, + use_gpu=True, + num_workers=num_workers, + ) + + model = classifier.fit(train_df) + pred_result_df = model.transform(test_df) + evaluator = MulticlassClassificationEvaluator(metricName="f1") + f1 = evaluator.evaluate(pred_result_df) + assert f1 >= 0.97 + + def test_sparkxgb_regressor_with_gpu(spark_diabetes_dataset): from pyspark.ml.evaluation import RegressionEvaluator regressor = SparkXGBRegressor( use_gpu=True, - num_workers=4, + num_workers=num_workers, ) train_df, test_df = spark_diabetes_dataset model = regressor.fit(train_df) @@ -118,3 +183,19 @@ def test_sparkxgb_regressor_with_gpu(spark_diabetes_dataset): evaluator = RegressionEvaluator(metricName="rmse") rmse = evaluator.evaluate(pred_result_df) assert rmse <= 65.0 + + +def test_sparkxgb_regressor_feature_cols_with_gpu(spark_diabetes_dataset_feature_cols): + from pyspark.ml.evaluation import RegressionEvaluator + train_df, test_df, feature_names = spark_diabetes_dataset_feature_cols + regressor = SparkXGBRegressor( + features_col=feature_names, + use_gpu=True, + num_workers=num_workers, + ) + + model = regressor.fit(train_df) + pred_result_df = model.transform(test_df) + evaluator = RegressionEvaluator(metricName="rmse") + rmse = evaluator.evaluate(pred_result_df) + assert rmse <= 65.0 From bc58b4b3d9ded7aa3f96946c5f9a0ab94ea39bb0 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Wed, 27 Jul 2022 14:37:59 +0800 Subject: [PATCH 03/18] add spark ml pipeline test --- .../test_spark_with_gpu.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py b/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py index 9bc717133c2e..43420596b63a 100644 --- a/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py +++ b/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py @@ -3,6 +3,7 @@ import logging import pytest import sklearn +from pyspark.ml.tuning import CrossValidator, ParamGridBuilder sys.path.append("tests/python") import testing as tm @@ -170,6 +171,25 @@ def test_sparkxgb_classifier_feature_cols_with_gpu(spark_iris_dataset_feature_co assert f1 >= 0.97 +def test_cv_sparkxgb_classifier_feature_cols_with_gpu(spark_iris_dataset_feature_cols): + from pyspark.ml.evaluation import MulticlassClassificationEvaluator + train_df, test_df, feature_names = spark_iris_dataset_feature_cols + + classifier = SparkXGBClassifier( + features_col=feature_names, + use_gpu=True, + num_workers=num_workers, + ) + grid = ParamGridBuilder().addGrid(classifier.max_bin, [6, 8]).build() + evaluator = MulticlassClassificationEvaluator(metricName="f1") + cv = CrossValidator( + estimator=classifier, evaluator=evaluator, estimatorParamMaps=grid, numFolds=3) + cvModel = cv.fit(train_df) + pred_result_df = cvModel.transform(test_df) + f1 = evaluator.evaluate(pred_result_df) + assert f1 >= 0.97 + + def test_sparkxgb_regressor_with_gpu(spark_diabetes_dataset): from pyspark.ml.evaluation import RegressionEvaluator From 4a0d399a340ae13a5326e5a02f993df1bf9e9c62 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Wed, 27 Jul 2022 14:41:21 +0800 Subject: [PATCH 04/18] format --- python-package/xgboost/spark/core.py | 57 ++++++++++++-------------- python-package/xgboost/spark/params.py | 12 +++--- 2 files changed, 31 insertions(+), 38 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 16394cfd696d..62eaf260a4cf 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -2,7 +2,7 @@ """Xgboost pyspark integration submodule for core code.""" # pylint: disable=fixme, too-many-ancestors, protected-access, no-member, invalid-name # pylint: disable=too-few-public-methods -from typing import Iterator, Tuple, Optional +from typing import Iterator, Optional, Tuple import numpy as np import pandas as pd @@ -26,9 +26,9 @@ DoubleType, FloatType, IntegerType, + IntegralType, LongType, ShortType, - IntegralType, ) from scipy.special import expit, softmax # pylint: disable=no-name-in-module from xgboost.core import Booster @@ -77,12 +77,7 @@ "features_cols", ] -_non_booster_params = [ - "missing", - "n_estimators", - "feature_types", - "feature_weights", -] +_non_booster_params = ["missing", "n_estimators", "feature_types", "feature_weights"] _pyspark_param_alias_map = { "features_col": "featuresCol", @@ -265,9 +260,8 @@ def _validate_params(self): "Therefore, that parameter will be ignored." ) - if ( - self.getOrDefault(self.features_cols) - and not self.getOrDefault(self.use_gpu) + if self.getOrDefault(self.features_cols) and not self.getOrDefault( + self.use_gpu ): raise ValueError( "XGBoost accepts a list of feature column names only when use_gpu is enabled." @@ -326,7 +320,9 @@ def _validate_params(self): ) -def _validate_and_convert_feature_col_as_float_col(dataset, features_col_name: list) -> list: +def _validate_and_convert_feature_col_as_float_col( + dataset, features_col_name: list +) -> list: """feature column names must be IntegralType or float or double types""" feature_cols = [] for c in features_col_name: @@ -367,14 +363,16 @@ def _validate_and_convert_feature_col_as_array_col(dataset, features_col_name): return features_array_col -def _validate_and_convert_feature_col(dataset, feature_names: list, feature_name: str) -> list: +def _validate_and_convert_feature_col( + dataset, feature_names: list, feature_name: str +) -> list: """XGBoost model trained with features_cols parameter is also can transform vector or array feature type. But we first check features_cols and then check featuresCol""" if ( - len(feature_names) > - 0 >= - len([c for c in feature_names if c not in dataset.columns]) + len(feature_names) + > 0 + >= len([c for c in feature_names if c not in dataset.columns]) ): return _validate_and_convert_feature_col_as_float_col(dataset, feature_names) @@ -413,10 +411,9 @@ def setParams(self, **kwargs): # pylint: disable=invalid-name f"Please use param name {_inverse_pyspark_param_alias_map[k]} instead." ) if k in _pyspark_param_alias_map: - if ( - k == _inverse_pyspark_param_alias_map[self.featuresCol.name] - and isinstance(v, list) - ): + if k == _inverse_pyspark_param_alias_map[ + self.featuresCol.name + ] and isinstance(v, list): real_k = self.features_cols.name k = real_k else: @@ -545,7 +542,7 @@ def _fit(self, dataset): label_col = col(self.getOrDefault(self.labelCol)).alias(alias.label) select_cols = [label_col] - features_cols_names=None + features_cols_names = None if len(self.getOrDefault(self.features_cols)): features_cols_names = self.getOrDefault(self.features_cols) features_cols = _validate_and_convert_feature_col_as_float_col( @@ -639,9 +636,7 @@ def _train_booster(pandas_df_iter): evals_result = {} with RabitContext(_rabit_args, context): dtrain, dvalid = create_dmatrix_from_partitions( - pandas_df_iter, - features_cols_names, - dmatrix_kwargs, + pandas_df_iter, features_cols_names, dmatrix_kwargs ) if dvalid is not None: dval = [(dtrain, "training"), (dvalid, "validation")] @@ -750,15 +745,15 @@ def _get_feature_col(self, dataset) -> (list, Optional[list]): feature_col_names = self.getOrDefault(self.features_cols) features_col = [] if ( - len(feature_col_names) > - 0 >= - len([c for c in feature_col_names if c not in dataset.columns]) + len(feature_col_names) + > 0 + >= len([c for c in feature_col_names if c not in dataset.columns]) ): # The model is trained with features_cols and the predicted dataset # also contains all the columns specified by features_cols. features_col = _validate_and_convert_feature_col_as_float_col( - dataset, - feature_col_names) + dataset, feature_col_names + ) else: # 1. The model was trained by features_cols, but the dataset doesn't contain # all the columns specified by features_cols, so we need to check if @@ -768,8 +763,8 @@ def _get_feature_col(self, dataset) -> (list, Optional[list]): feature_col_names = None features_col.append( _validate_and_convert_feature_col_as_array_col( - dataset, - self.getOrDefault(self.featuresCol)) + dataset, self.getOrDefault(self.featuresCol) + ) ) return features_col, feature_col_names diff --git a/python-package/xgboost/spark/params.py b/python-package/xgboost/spark/params.py index 3b1526fcdd25..7a9844e532c9 100644 --- a/python-package/xgboost/spark/params.py +++ b/python-package/xgboost/spark/params.py @@ -40,13 +40,11 @@ class HasFeaturesCols(Params): This parameter is taken effect only when use_gpu is enabled. """ - features_cols = ( - Param( - Params._dummy(), - "features_cols", - "feature column names.", - typeConverter=TypeConverters.toListString - ) + features_cols = Param( + Params._dummy(), + "features_cols", + "feature column names.", + typeConverter=TypeConverters.toListString, ) def __init__(self): From 727dadbe44e0c597a10ff311aa895bd64b2127b0 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Thu, 28 Jul 2022 12:45:44 +0800 Subject: [PATCH 05/18] resolve comments --- python-package/xgboost/spark/core.py | 54 ++++++++++------------------ 1 file changed, 19 insertions(+), 35 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 62eaf260a4cf..88aff9ee1986 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -238,12 +238,11 @@ def _gen_predict_params_dict(self): def _validate_params(self): init_model = self.getOrDefault(self.xgb_model) - if init_model is not None: - if init_model is not None and not isinstance(init_model, Booster): - raise ValueError( - "The xgb_model param must be set with a `xgboost.core.Booster` " - "instance." - ) + if init_model is not None and not isinstance(init_model, Booster): + raise ValueError( + "The xgb_model param must be set with a `xgboost.core.Booster` " + "instance." + ) if self.getOrDefault(self.num_workers) < 1: raise ValueError( @@ -260,11 +259,12 @@ def _validate_params(self): "Therefore, that parameter will be ignored." ) - if self.getOrDefault(self.features_cols) and not self.getOrDefault( - self.use_gpu - ): - raise ValueError( - "XGBoost accepts a list of feature column names only when use_gpu is enabled." + if self.getOrDefault(self.features_cols): + if not self.getOrDefault(self.use_gpu): + raise ValueError("features_cols param requires enabling use_gpu.") + + get_logger(self.__class__.__name__).warning( + "If features_cols param set, then features_col param is ignored." ) if self.getOrDefault(self.use_gpu): @@ -320,19 +320,19 @@ def _validate_params(self): ) -def _validate_and_convert_feature_col_as_float_col( - dataset, features_col_name: list +def _validate_and_convert_feature_col_as_float_col_list( + dataset, features_col_names: list ) -> list: - """feature column names must be IntegralType or float or double types""" + """Values in feature columns must be integral types or float/double types""" feature_cols = [] - for c in features_col_name: + for c in features_col_names: if isinstance(dataset.schema[c].dataType, DoubleType): feature_cols.append(col(c).cast(FloatType()).alias(c)) elif isinstance(dataset.schema[c].dataType, (FloatType, IntegralType)): feature_cols.append(col(c)) else: raise ValueError( - "Feature column must be integral types or float/double types" + "Values in feature columns must be integral types or float/double types." ) return feature_cols @@ -363,22 +363,6 @@ def _validate_and_convert_feature_col_as_array_col(dataset, features_col_name): return features_array_col -def _validate_and_convert_feature_col( - dataset, feature_names: list, feature_name: str -) -> list: - """XGBoost model trained with features_cols parameter is also can transform - vector or array feature type. But we first check features_cols and then check - featuresCol""" - if ( - len(feature_names) - > 0 - >= len([c for c in feature_names if c not in dataset.columns]) - ): - return _validate_and_convert_feature_col_as_float_col(dataset, feature_names) - - return [_validate_and_convert_feature_col_as_array_col(dataset, feature_name)] - - class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable): def __init__(self): super().__init__() @@ -545,7 +529,7 @@ def _fit(self, dataset): features_cols_names = None if len(self.getOrDefault(self.features_cols)): features_cols_names = self.getOrDefault(self.features_cols) - features_cols = _validate_and_convert_feature_col_as_float_col( + features_cols = _validate_and_convert_feature_col_as_float_col_list( dataset, features_cols_names ) select_cols.extend(features_cols) @@ -738,7 +722,7 @@ def _transform(self, dataset): def _get_feature_col(self, dataset) -> (list, Optional[list]): """XGBoost model trained with features_cols parameter can also predict - vector or array feature type. But frist we need to check features_cols + vector or array feature type. But first we need to check features_cols and then featuresCol """ @@ -751,7 +735,7 @@ def _get_feature_col(self, dataset) -> (list, Optional[list]): ): # The model is trained with features_cols and the predicted dataset # also contains all the columns specified by features_cols. - features_col = _validate_and_convert_feature_col_as_float_col( + features_col = _validate_and_convert_feature_col_as_float_col_list( dataset, feature_col_names ) else: From 494d80d4a6e49644928b9c7bea74d7899f706c6a Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Thu, 28 Jul 2022 19:28:43 +0800 Subject: [PATCH 06/18] fix bug --- python-package/xgboost/spark/core.py | 8 ++++---- python-package/xgboost/spark/data.py | 12 ++++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 88aff9ee1986..80ed5f7b9bf8 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -606,10 +606,10 @@ def _train_booster(pandas_df_iter): context = BarrierTaskContext.get() context.barrier() + gpu_id = None if use_gpu: - booster_params["gpu_id"] = ( - context.partitionId() if is_local else _get_gpu_id(context) - ) + gpu_id = context.partitionId() if is_local else _get_gpu_id(context) + booster_params["gpu_id"] = gpu_id _rabit_args = "" if context.partitionId() == 0: @@ -620,7 +620,7 @@ def _train_booster(pandas_df_iter): evals_result = {} with RabitContext(_rabit_args, context): dtrain, dvalid = create_dmatrix_from_partitions( - pandas_df_iter, features_cols_names, dmatrix_kwargs + pandas_df_iter, features_cols_names, gpu_id, dmatrix_kwargs ) if dvalid is not None: dval = [(dtrain, "training"), (dvalid, "validation")] diff --git a/python-package/xgboost/spark/data.py b/python-package/xgboost/spark/data.py index e3fda4c14d03..ba615aaeabab 100644 --- a/python-package/xgboost/spark/data.py +++ b/python-package/xgboost/spark/data.py @@ -63,9 +63,9 @@ def make_blob(part: pd.DataFrame, is_valid: bool) -> None: class PartIter(DataIter): """Iterator for creating Quantile DMatrix from partitions.""" - def __init__(self, data: Dict[str, List], on_device: bool) -> None: + def __init__(self, data: Dict[str, List], device_id: Optional[int]) -> None: self._iter = 0 - self._cuda = on_device + self._device_id = device_id self._data = data super().__init__() @@ -74,9 +74,12 @@ def _fetch(self, data: Optional[Sequence[pd.DataFrame]]) -> Optional[pd.DataFram if not data: return None - if self._cuda: + if self._device_id: import cudf # pylint: disable=import-error + import cupy as cp + cp.cuda.runtime.setDevice(self._device_id) + return cudf.DataFrame(data[self._iter]) return data[self._iter] @@ -100,6 +103,7 @@ def reset(self) -> None: def create_dmatrix_from_partitions( iterator: Iterator[pd.DataFrame], feature_cols: Optional[Sequence[str]], + gpu_id: Optional[int], kwargs: Dict[str, Any], # use dict to make sure this parameter is passed. ) -> Tuple[DMatrix, Optional[DMatrix]]: """Create DMatrix from spark data partitions. This is not particularly efficient as @@ -169,7 +173,7 @@ def make(values: Dict[str, List[np.ndarray]], kwargs: Dict[str, Any]) -> DMatrix dtrain = make(train_data, kwargs) else: cache_partitions(iterator, append_dqm) - it = PartIter(train_data, True) + it = PartIter(train_data, True, gpu_id) dtrain = DeviceQuantileDMatrix(it, **kwargs) dvalid = make(valid_data, kwargs) if len(valid_data) != 0 else None From af3dcd1232a9b2b9d52dd83471ab5d547cb0e5b4 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Thu, 28 Jul 2022 19:45:52 +0800 Subject: [PATCH 07/18] update --- python-package/xgboost/spark/data.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/python-package/xgboost/spark/data.py b/python-package/xgboost/spark/data.py index ba615aaeabab..ef23576d329a 100644 --- a/python-package/xgboost/spark/data.py +++ b/python-package/xgboost/spark/data.py @@ -79,8 +79,16 @@ def _fetch(self, data: Optional[Sequence[pd.DataFrame]]) -> Optional[pd.DataFram import cupy as cp cp.cuda.runtime.setDevice(self._device_id) - - return cudf.DataFrame(data[self._iter]) + import pyspark + context = pyspark.TaskContext.get() + pid = context.partitionId() + f = open(f"/tmp/debug_hanging_{pid}", "w") + gpu_id = cp.cuda.runtime.getDevice() + f.write(f"got gpu id {gpu_id} for partition {pid} \n") + df = cudf.DataFrame(data[self._iter]) + f.write("after cudf ----") + f.close() + return df return data[self._iter] @@ -173,7 +181,7 @@ def make(values: Dict[str, List[np.ndarray]], kwargs: Dict[str, Any]) -> DMatrix dtrain = make(train_data, kwargs) else: cache_partitions(iterator, append_dqm) - it = PartIter(train_data, True, gpu_id) + it = PartIter(train_data, gpu_id) dtrain = DeviceQuantileDMatrix(it, **kwargs) dvalid = make(valid_data, kwargs) if len(valid_data) != 0 else None From 32dad7179883692a0f93ec517731fdcf00b7c5b0 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Fri, 29 Jul 2022 09:59:56 +0800 Subject: [PATCH 08/18] fix gpu id --- python-package/xgboost/spark/data.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/python-package/xgboost/spark/data.py b/python-package/xgboost/spark/data.py index ef23576d329a..5e6a8c1468a7 100644 --- a/python-package/xgboost/spark/data.py +++ b/python-package/xgboost/spark/data.py @@ -74,21 +74,14 @@ def _fetch(self, data: Optional[Sequence[pd.DataFrame]]) -> Optional[pd.DataFram if not data: return None - if self._device_id: + if self._device_id is not None: import cudf # pylint: disable=import-error - import cupy as cp + + # We must set the device after import cudf, which will change the device id to 0 + # See https://github.com/rapidsai/cudf/issues/11386 cp.cuda.runtime.setDevice(self._device_id) - import pyspark - context = pyspark.TaskContext.get() - pid = context.partitionId() - f = open(f"/tmp/debug_hanging_{pid}", "w") - gpu_id = cp.cuda.runtime.getDevice() - f.write(f"got gpu id {gpu_id} for partition {pid} \n") - df = cudf.DataFrame(data[self._iter]) - f.write("after cudf ----") - f.close() - return df + return cudf.DataFrame(data[self._iter]) return data[self._iter] From 1da666ff0f24ec4c5284993d1386310d3f1a1b4d Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Fri, 29 Jul 2022 10:14:28 +0800 Subject: [PATCH 09/18] fix --- tests/python/test_spark/test_data.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/python/test_spark/test_data.py b/tests/python/test_spark/test_data.py index a3da4764ad4e..e0132a6a5f70 100644 --- a/tests/python/test_spark/test_data.py +++ b/tests/python/test_spark/test_data.py @@ -62,9 +62,9 @@ def run_dmatrix_ctor(is_dqm: bool) -> None: kwargs = {"feature_types": feature_types} if is_dqm: cols = [f"feat-{i}" for i in range(n_features)] - train_Xy, valid_Xy = create_dmatrix_from_partitions(iter(dfs), cols, kwargs) + train_Xy, valid_Xy = create_dmatrix_from_partitions(iter(dfs), cols, 0, kwargs) else: - train_Xy, valid_Xy = create_dmatrix_from_partitions(iter(dfs), None, kwargs) + train_Xy, valid_Xy = create_dmatrix_from_partitions(iter(dfs), None, None, kwargs) assert valid_Xy is not None assert valid_Xy.num_row() + train_Xy.num_row() == n_samples_per_batch * n_batches From 5e3382a5f8d4f16f0465ae022ed29ad82a62f1e9 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Fri, 29 Jul 2022 10:54:26 +0800 Subject: [PATCH 10/18] format --- tests/python/test_spark/test_data.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/python/test_spark/test_data.py b/tests/python/test_spark/test_data.py index e0132a6a5f70..07896ece618b 100644 --- a/tests/python/test_spark/test_data.py +++ b/tests/python/test_spark/test_data.py @@ -64,7 +64,9 @@ def run_dmatrix_ctor(is_dqm: bool) -> None: cols = [f"feat-{i}" for i in range(n_features)] train_Xy, valid_Xy = create_dmatrix_from_partitions(iter(dfs), cols, 0, kwargs) else: - train_Xy, valid_Xy = create_dmatrix_from_partitions(iter(dfs), None, None, kwargs) + train_Xy, valid_Xy = create_dmatrix_from_partitions( + iter(dfs), None, None, kwargs + ) assert valid_Xy is not None assert valid_Xy.num_row() + train_Xy.num_row() == n_samples_per_batch * n_batches From a1766c086f1b361e34b7b858f0114eaaf35be851 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Fri, 29 Jul 2022 11:00:41 +0800 Subject: [PATCH 11/18] format --- .../test_spark_with_gpu.py | 54 +++++++++---------- 1 file changed, 24 insertions(+), 30 deletions(-) diff --git a/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py b/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py index 43420596b63a..14b32b8d3511 100644 --- a/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py +++ b/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py @@ -1,9 +1,8 @@ +import logging import sys -import logging import pytest import sklearn -from pyspark.ml.tuning import CrossValidator, ParamGridBuilder sys.path.append("tests/python") import testing as tm @@ -13,10 +12,10 @@ if sys.platform.startswith("win"): pytest.skip("Skipping PySpark tests on Windows", allow_module_level=True) - -from pyspark.sql import SparkSession from pyspark.ml.linalg import Vectors -from xgboost.spark import SparkXGBRegressor, SparkXGBClassifier +from pyspark.ml.tuning import CrossValidator, ParamGridBuilder +from pyspark.sql import SparkSession +from xgboost.spark import SparkXGBClassifier, SparkXGBRegressor gpu_discovery_script_path = "tests/python-gpu/test_spark_with_gpu/discover_gpu.sh" executor_gpu_amount = 4 @@ -47,9 +46,9 @@ def spark_session_with_gpu(): spark = builder.getOrCreate() logging.getLogger("pyspark").setLevel(logging.INFO) # We run a dummy job so that we block until the workers have connected to the master - spark.sparkContext.parallelize(range(num_workers), num_workers).barrier().mapPartitions( - lambda _: [] - ).collect() + spark.sparkContext.parallelize( + range(num_workers), num_workers + ).barrier().mapPartitions(lambda _: []).collect() yield spark spark.stop() @@ -84,14 +83,16 @@ def spark_iris_dataset_feature_cols(spark_session_with_gpu): for features, label in zip(data.data[0::2], data.target[0::2]) ] train_df = spark.createDataFrame( - spark.sparkContext.parallelize(train_rows, num_workers), [*data.feature_names, "label"] + spark.sparkContext.parallelize(train_rows, num_workers), + [*data.feature_names, "label"], ) test_rows = [ (*features.tolist(), float(label)) for features, label in zip(data.data[1::2], data.target[1::2]) ] test_df = spark.createDataFrame( - spark.sparkContext.parallelize(test_rows, num_workers), [*data.feature_names, "label"] + spark.sparkContext.parallelize(test_rows, num_workers), + [*data.feature_names, "label"], ) return train_df, test_df, data.feature_names @@ -126,14 +127,16 @@ def spark_diabetes_dataset_feature_cols(spark_session_with_gpu): for features, label in zip(data.data[0::2], data.target[0::2]) ] train_df = spark.createDataFrame( - spark.sparkContext.parallelize(train_rows, num_workers), [*data.feature_names, "label"] + spark.sparkContext.parallelize(train_rows, num_workers), + [*data.feature_names, "label"], ) test_rows = [ (*features.tolist(), float(label)) for features, label in zip(data.data[1::2], data.target[1::2]) ] test_df = spark.createDataFrame( - spark.sparkContext.parallelize(test_rows, num_workers), [*data.feature_names, "label"] + spark.sparkContext.parallelize(test_rows, num_workers), + [*data.feature_names, "label"], ) return train_df, test_df, data.feature_names @@ -141,10 +144,7 @@ def spark_diabetes_dataset_feature_cols(spark_session_with_gpu): def test_sparkxgb_classifier_with_gpu(spark_iris_dataset): from pyspark.ml.evaluation import MulticlassClassificationEvaluator - classifier = SparkXGBClassifier( - use_gpu=True, - num_workers=num_workers, - ) + classifier = SparkXGBClassifier(use_gpu=True, num_workers=num_workers) train_df, test_df = spark_iris_dataset model = classifier.fit(train_df) pred_result_df = model.transform(test_df) @@ -159,9 +159,7 @@ def test_sparkxgb_classifier_feature_cols_with_gpu(spark_iris_dataset_feature_co train_df, test_df, feature_names = spark_iris_dataset_feature_cols classifier = SparkXGBClassifier( - features_col=feature_names, - use_gpu=True, - num_workers=num_workers, + features_col=feature_names, use_gpu=True, num_workers=num_workers ) model = classifier.fit(train_df) @@ -173,17 +171,17 @@ def test_sparkxgb_classifier_feature_cols_with_gpu(spark_iris_dataset_feature_co def test_cv_sparkxgb_classifier_feature_cols_with_gpu(spark_iris_dataset_feature_cols): from pyspark.ml.evaluation import MulticlassClassificationEvaluator + train_df, test_df, feature_names = spark_iris_dataset_feature_cols classifier = SparkXGBClassifier( - features_col=feature_names, - use_gpu=True, - num_workers=num_workers, + features_col=feature_names, use_gpu=True, num_workers=num_workers ) grid = ParamGridBuilder().addGrid(classifier.max_bin, [6, 8]).build() evaluator = MulticlassClassificationEvaluator(metricName="f1") cv = CrossValidator( - estimator=classifier, evaluator=evaluator, estimatorParamMaps=grid, numFolds=3) + estimator=classifier, evaluator=evaluator, estimatorParamMaps=grid, numFolds=3 + ) cvModel = cv.fit(train_df) pred_result_df = cvModel.transform(test_df) f1 = evaluator.evaluate(pred_result_df) @@ -193,10 +191,7 @@ def test_cv_sparkxgb_classifier_feature_cols_with_gpu(spark_iris_dataset_feature def test_sparkxgb_regressor_with_gpu(spark_diabetes_dataset): from pyspark.ml.evaluation import RegressionEvaluator - regressor = SparkXGBRegressor( - use_gpu=True, - num_workers=num_workers, - ) + regressor = SparkXGBRegressor(use_gpu=True, num_workers=num_workers) train_df, test_df = spark_diabetes_dataset model = regressor.fit(train_df) pred_result_df = model.transform(test_df) @@ -207,11 +202,10 @@ def test_sparkxgb_regressor_with_gpu(spark_diabetes_dataset): def test_sparkxgb_regressor_feature_cols_with_gpu(spark_diabetes_dataset_feature_cols): from pyspark.ml.evaluation import RegressionEvaluator + train_df, test_df, feature_names = spark_diabetes_dataset_feature_cols regressor = SparkXGBRegressor( - features_col=feature_names, - use_gpu=True, - num_workers=num_workers, + features_col=feature_names, use_gpu=True, num_workers=num_workers ) model = regressor.fit(train_df) From 209699717b61b1a30ee70d48143d0f558c5e8cb2 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Tue, 2 Aug 2022 10:35:30 +0800 Subject: [PATCH 12/18] add max_bin support --- python-package/xgboost/spark/core.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 80ed5f7b9bf8..3579ef1fbb53 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -611,6 +611,13 @@ def _train_booster(pandas_df_iter): gpu_id = context.partitionId() if is_local else _get_gpu_id(context) booster_params["gpu_id"] = gpu_id + # max_bin is needed for qdm + if ( + features_cols_names is not None + and booster_params.get("max_bin", None) is not None + ): + dmatrix_kwargs["max_bin"] = booster_params["max_bin"] + _rabit_args = "" if context.partitionId() == 0: _rabit_args = str(_get_rabit_args(context, num_workers)) From 0f388d971a15c791dc5ba75e1c2c07c361c1e267 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Tue, 2 Aug 2022 10:59:06 +0800 Subject: [PATCH 13/18] move the gpu dask tests running behind spark the gpu dask tests may corrupt the whole gpu env --- tests/python-gpu/conftest.py | 4 ++-- .../{test_spark_with_gpu => test_gpu_spark}/discover_gpu.sh | 0 .../{test_spark_with_gpu => test_gpu_spark}/test_data.py | 0 .../test_gpu_spark.py} | 2 +- .../python-gpu/{ => test_gpu_with_dask}/test_gpu_with_dask.py | 0 5 files changed, 3 insertions(+), 3 deletions(-) rename tests/python-gpu/{test_spark_with_gpu => test_gpu_spark}/discover_gpu.sh (100%) rename tests/python-gpu/{test_spark_with_gpu => test_gpu_spark}/test_data.py (100%) rename tests/python-gpu/{test_spark_with_gpu/test_spark_with_gpu.py => test_gpu_spark/test_gpu_spark.py} (98%) rename tests/python-gpu/{ => test_gpu_with_dask}/test_gpu_with_dask.py (100%) diff --git a/tests/python-gpu/conftest.py b/tests/python-gpu/conftest.py index 789f96fc5511..c97b1a1668bf 100644 --- a/tests/python-gpu/conftest.py +++ b/tests/python-gpu/conftest.py @@ -61,8 +61,8 @@ def pytest_collection_modifyitems(config, items): mgpu_mark = pytest.mark.mgpu for item in items: if item.nodeid.startswith( - "python-gpu/test_gpu_with_dask.py" + "python-gpu/test_gpu_with_dask/test_gpu_with_dask.py" ) or item.nodeid.startswith( - "python-gpu/test_spark_with_gpu/test_spark_with_gpu.py" + "python-gpu/test_gpu_spark/test_gpu_spark.py" ): item.add_marker(mgpu_mark) diff --git a/tests/python-gpu/test_spark_with_gpu/discover_gpu.sh b/tests/python-gpu/test_gpu_spark/discover_gpu.sh similarity index 100% rename from tests/python-gpu/test_spark_with_gpu/discover_gpu.sh rename to tests/python-gpu/test_gpu_spark/discover_gpu.sh diff --git a/tests/python-gpu/test_spark_with_gpu/test_data.py b/tests/python-gpu/test_gpu_spark/test_data.py similarity index 100% rename from tests/python-gpu/test_spark_with_gpu/test_data.py rename to tests/python-gpu/test_gpu_spark/test_data.py diff --git a/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py b/tests/python-gpu/test_gpu_spark/test_gpu_spark.py similarity index 98% rename from tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py rename to tests/python-gpu/test_gpu_spark/test_gpu_spark.py index 14b32b8d3511..ef961ab34bd5 100644 --- a/tests/python-gpu/test_spark_with_gpu/test_spark_with_gpu.py +++ b/tests/python-gpu/test_gpu_spark/test_gpu_spark.py @@ -17,7 +17,7 @@ from pyspark.sql import SparkSession from xgboost.spark import SparkXGBClassifier, SparkXGBRegressor -gpu_discovery_script_path = "tests/python-gpu/test_spark_with_gpu/discover_gpu.sh" +gpu_discovery_script_path = "tests/python-gpu/test_gpu_spark/discover_gpu.sh" executor_gpu_amount = 4 executor_cores = 4 num_workers = executor_gpu_amount diff --git a/tests/python-gpu/test_gpu_with_dask.py b/tests/python-gpu/test_gpu_with_dask/test_gpu_with_dask.py similarity index 100% rename from tests/python-gpu/test_gpu_with_dask.py rename to tests/python-gpu/test_gpu_with_dask/test_gpu_with_dask.py From 6dbfd010424ea4131595a4090c395378849f5489 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Tue, 2 Aug 2022 11:27:35 +0800 Subject: [PATCH 14/18] pylint --- tests/ci_build/lint_python.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/ci_build/lint_python.py b/tests/ci_build/lint_python.py index 8008c5774250..cfc4b85988e2 100644 --- a/tests/ci_build/lint_python.py +++ b/tests/ci_build/lint_python.py @@ -115,7 +115,7 @@ def print_summary_map(result_map: Dict[str, Dict[str, int]]) -> int: "python-package/xgboost/dask.py", "python-package/xgboost/spark", "tests/python/test_spark/test_data.py", - "tests/python-gpu/test_spark_with_gpu/test_data.py", + "tests/python-gpu/test_gpu_spark/test_data.py", "tests/ci_build/lint_python.py", ] ): @@ -130,9 +130,9 @@ def print_summary_map(result_map: Dict[str, Dict[str, int]]) -> int: "demo/guide-python/cat_in_the_dat.py", "tests/python/test_data_iterator.py", "tests/python/test_spark/test_data.py", - "tests/python-gpu/test_gpu_with_dask.py", + "tests/python-gpu/test_gpu_with_dask/test_gpu_with_dask.py", "tests/python-gpu/test_gpu_data_iterator.py", - "tests/python-gpu/test_spark_with_gpu/test_data.py", + "tests/python-gpu/test_gpu_spark/test_data.py", "tests/ci_build/lint_python.py", ] ): From b002cbc5b979b13d8c28597c79c672c208364d79 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Tue, 2 Aug 2022 12:44:27 +0800 Subject: [PATCH 15/18] update cv parameters --- tests/python-gpu/test_gpu_spark/test_gpu_spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/python-gpu/test_gpu_spark/test_gpu_spark.py b/tests/python-gpu/test_gpu_spark/test_gpu_spark.py index ef961ab34bd5..ce5b9d8c8d42 100644 --- a/tests/python-gpu/test_gpu_spark/test_gpu_spark.py +++ b/tests/python-gpu/test_gpu_spark/test_gpu_spark.py @@ -177,7 +177,7 @@ def test_cv_sparkxgb_classifier_feature_cols_with_gpu(spark_iris_dataset_feature classifier = SparkXGBClassifier( features_col=feature_names, use_gpu=True, num_workers=num_workers ) - grid = ParamGridBuilder().addGrid(classifier.max_bin, [6, 8]).build() + grid = ParamGridBuilder().addGrid(classifier.max_depth, [6, 8]).build() evaluator = MulticlassClassificationEvaluator(metricName="f1") cv = CrossValidator( estimator=classifier, evaluator=evaluator, estimatorParamMaps=grid, numFolds=3 From e32a6382ee4e7f7a92beeac840d58de57de7f786 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Tue, 2 Aug 2022 14:43:50 +0800 Subject: [PATCH 16/18] pylint issue --- python-package/xgboost/spark/data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python-package/xgboost/spark/data.py b/python-package/xgboost/spark/data.py index 5e6a8c1468a7..c911bc44eb8b 100644 --- a/python-package/xgboost/spark/data.py +++ b/python-package/xgboost/spark/data.py @@ -76,7 +76,7 @@ def _fetch(self, data: Optional[Sequence[pd.DataFrame]]) -> Optional[pd.DataFram if self._device_id is not None: import cudf # pylint: disable=import-error - import cupy as cp + import cupy as cp # pylint: disable=import-error # We must set the device after import cudf, which will change the device id to 0 # See https://github.com/rapidsai/cudf/issues/11386 From 6880928fffcbf5cecdaad9d7197d4f192243cd5d Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Tue, 2 Aug 2022 15:06:53 +0800 Subject: [PATCH 17/18] format --- python-package/xgboost/spark/data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python-package/xgboost/spark/data.py b/python-package/xgboost/spark/data.py index c911bc44eb8b..eb825df73827 100644 --- a/python-package/xgboost/spark/data.py +++ b/python-package/xgboost/spark/data.py @@ -76,7 +76,7 @@ def _fetch(self, data: Optional[Sequence[pd.DataFrame]]) -> Optional[pd.DataFram if self._device_id is not None: import cudf # pylint: disable=import-error - import cupy as cp # pylint: disable=import-error + import cupy as cp # pylint: disable=import-error # We must set the device after import cudf, which will change the device id to 0 # See https://github.com/rapidsai/cudf/issues/11386 From 832989f12da063c34fe2cc07eff1f085398270b1 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Wed, 3 Aug 2022 16:56:01 +0800 Subject: [PATCH 18/18] resolve comments --- python-package/xgboost/spark/core.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 3579ef1fbb53..add414c8746c 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -527,7 +527,7 @@ def _fit(self, dataset): select_cols = [label_col] features_cols_names = None - if len(self.getOrDefault(self.features_cols)): + if self.getOrDefault(self.features_cols): features_cols_names = self.getOrDefault(self.features_cols) features_cols = _validate_and_convert_feature_col_as_float_col_list( dataset, features_cols_names @@ -735,11 +735,7 @@ def _get_feature_col(self, dataset) -> (list, Optional[list]): feature_col_names = self.getOrDefault(self.features_cols) features_col = [] - if ( - len(feature_col_names) - > 0 - >= len([c for c in feature_col_names if c not in dataset.columns]) - ): + if feature_col_names and set(feature_col_names).issubset(set(dataset.columns)): # The model is trained with features_cols and the predicted dataset # also contains all the columns specified by features_cols. features_col = _validate_and_convert_feature_col_as_float_col_list(