From 98541310569a8a54c1671faa76df4bce066b488d Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Thu, 8 Sep 2022 08:26:11 +0800 Subject: [PATCH 1/7] update Signed-off-by: Weichen Xu --- python-package/xgboost/spark/core.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index edff40349676..302bd709ff45 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -20,7 +20,9 @@ HasWeightCol, ) from pyspark.ml.util import MLReadable, MLWritable -from pyspark.sql.functions import col, countDistinct, pandas_udf, struct +from pyspark.sql.functions import ( + col, countDistinct, pandas_udf, struct, monotonically_increasing_id +) from pyspark.sql.types import ( ArrayType, DoubleType, @@ -270,15 +272,6 @@ def _validate_params(self): f"It cannot be less than 1 [Default is 1]" ) - if ( - self.getOrDefault(self.force_repartition) - and self.getOrDefault(self.num_workers) == 1 - ): - get_logger(self.__class__.__name__).warning( - "You set force_repartition to true when there is no need for a repartition." - "Therefore, that parameter will be ignored." - ) - if self.getOrDefault(self.features_cols): if not self.getOrDefault(self.use_gpu): raise ValueError("features_cols param requires enabling use_gpu.") @@ -691,7 +684,10 @@ def _fit(self, dataset): ) if self._repartition_needed(dataset): - dataset = dataset.repartition(num_workers) + # Repartition on `monotonically_increasing_id` column to avoid repartition + # result unbalance. Directly using `.repartition(N)` might result in some + # empty partitions. + dataset = dataset.repartition(num_workers, monotonically_increasing_id()) train_params = self._get_distributed_train_params(dataset) booster_params, train_call_kwargs_params = self._get_xgb_train_call_args( train_params From 7bcdda3aaf63d13a7833e475fc0a3a0660a1878a Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Fri, 9 Sep 2022 22:23:54 +0800 Subject: [PATCH 2/7] add debug log Signed-off-by: Weichen Xu --- python-package/xgboost/spark/core.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 302bd709ff45..ffa61cc3556d 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -543,6 +543,9 @@ def _query_plan_contains_valid_repartition(self, dataset): query_plan = dataset._sc._jvm.PythonSQLUtils.explainString( dataset._jdf.queryExecution(), "extended" ) + get_logger(self.__class__.__name__).warning( + f"debug-repartition: \n{query_plan}\n" + ) start = query_plan.index("== Optimized Logical Plan ==") start += len("== Optimized Logical Plan ==") + 1 num_workers = self.getOrDefault(self.num_workers) @@ -683,11 +686,23 @@ def _fit(self, dataset): num_workers, ) + def log_partition_rows(df, msg): + + def count_partition_rows(iter): + yield len(list(iter)) + + result = df.rdd.mapPartitions(count_partition_rows).collect() + get_logger(self.__class__.__name__).warning( + f"debug-repartition: {msg}: {str(list(result))}\n" + ) + + log_partition_rows(dataset, "before-repartition") if self._repartition_needed(dataset): # Repartition on `monotonically_increasing_id` column to avoid repartition # result unbalance. Directly using `.repartition(N)` might result in some # empty partitions. dataset = dataset.repartition(num_workers, monotonically_increasing_id()) + log_partition_rows(dataset, "after-repartition") train_params = self._get_distributed_train_params(dataset) booster_params, train_call_kwargs_params = self._get_xgb_train_call_args( train_params From ec05b261067723cc83d8e7e29d3713447b066d31 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Sat, 10 Sep 2022 21:24:04 +0800 Subject: [PATCH 3/7] update Signed-off-by: Weichen Xu --- python-package/xgboost/spark/core.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index ffa61cc3556d..ef46fe2e72d9 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -20,9 +20,7 @@ HasWeightCol, ) from pyspark.ml.util import MLReadable, MLWritable -from pyspark.sql.functions import ( - col, countDistinct, pandas_udf, struct, monotonically_increasing_id -) +from pyspark.sql.functions import col, countDistinct, pandas_udf, struct, rand from pyspark.sql.types import ( ArrayType, DoubleType, @@ -687,7 +685,6 @@ def _fit(self, dataset): ) def log_partition_rows(df, msg): - def count_partition_rows(iter): yield len(list(iter)) @@ -697,11 +694,18 @@ def count_partition_rows(iter): ) log_partition_rows(dataset, "before-repartition") - if self._repartition_needed(dataset): - # Repartition on `monotonically_increasing_id` column to avoid repartition + if self._repartition_needed(dataset) or ( + self.isDefined(self.validationIndicatorCol) + and self.getOrDefault(self.validationIndicatorCol) + ): + # If validationIndicatorCol defined, we always repartition dataset + # to balance data, because user might unionise train and validation dataset, + # without shuffling data then some partitions might contain only train or validation + # dataset. + # Repartition on `rand` column to avoid repartition # result unbalance. Directly using `.repartition(N)` might result in some # empty partitions. - dataset = dataset.repartition(num_workers, monotonically_increasing_id()) + dataset = dataset.repartition(num_workers, rand(1)) log_partition_rows(dataset, "after-repartition") train_params = self._get_distributed_train_params(dataset) booster_params, train_call_kwargs_params = self._get_xgb_train_call_args( From 8596b89106691ea4145072455b701376219870dd Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Tue, 13 Sep 2022 10:47:40 +0800 Subject: [PATCH 4/7] remove debug logging Signed-off-by: Weichen Xu --- python-package/xgboost/spark/core.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index ef46fe2e72d9..35c3f3da0558 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -684,16 +684,6 @@ def _fit(self, dataset): num_workers, ) - def log_partition_rows(df, msg): - def count_partition_rows(iter): - yield len(list(iter)) - - result = df.rdd.mapPartitions(count_partition_rows).collect() - get_logger(self.__class__.__name__).warning( - f"debug-repartition: {msg}: {str(list(result))}\n" - ) - - log_partition_rows(dataset, "before-repartition") if self._repartition_needed(dataset) or ( self.isDefined(self.validationIndicatorCol) and self.getOrDefault(self.validationIndicatorCol) @@ -706,7 +696,7 @@ def count_partition_rows(iter): # result unbalance. Directly using `.repartition(N)` might result in some # empty partitions. dataset = dataset.repartition(num_workers, rand(1)) - log_partition_rows(dataset, "after-repartition") + train_params = self._get_distributed_train_params(dataset) booster_params, train_call_kwargs_params = self._get_xgb_train_call_args( train_params From 5f89620b9ac77a9327c7c91b21aed646290d0dda Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Wed, 14 Sep 2022 08:29:54 +0800 Subject: [PATCH 5/7] remove debug log Signed-off-by: Weichen Xu --- python-package/xgboost/spark/core.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 35c3f3da0558..4121d7610bd7 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -541,9 +541,6 @@ def _query_plan_contains_valid_repartition(self, dataset): query_plan = dataset._sc._jvm.PythonSQLUtils.explainString( dataset._jdf.queryExecution(), "extended" ) - get_logger(self.__class__.__name__).warning( - f"debug-repartition: \n{query_plan}\n" - ) start = query_plan.index("== Optimized Logical Plan ==") start += len("== Optimized Logical Plan ==") + 1 num_workers = self.getOrDefault(self.num_workers) From 9410a898a231ba99fc4e3d38ce775737c1657204 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Thu, 15 Sep 2022 18:38:54 +0800 Subject: [PATCH 6/7] update Signed-off-by: Weichen Xu --- python-package/xgboost/spark/core.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 4121d7610bd7..6ec8dfb57aaa 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -164,6 +164,12 @@ class _SparkXGBParams( + "Note: The auto repartitioning judgement is not fully accurate, so it is recommended" + "to have force_repartition be True.", ) + repartition_random_shuffle = Param( + Params._dummy(), + "repartition_random_shuffle", + "A boolean variable. Set repartition_random_shuffle=true if you want to random shuffle " + "dataset when repartitioning is required. By default is True." + ) feature_names = Param( Params._dummy(), "feature_names", "A list of str to specify feature names." ) @@ -461,6 +467,7 @@ def __init__(self): num_workers=1, use_gpu=False, force_repartition=False, + repartition_random_shuffle=True, feature_names=None, feature_types=None, arbitrary_params_dict={}, @@ -689,10 +696,12 @@ def _fit(self, dataset): # to balance data, because user might unionise train and validation dataset, # without shuffling data then some partitions might contain only train or validation # dataset. - # Repartition on `rand` column to avoid repartition - # result unbalance. Directly using `.repartition(N)` might result in some - # empty partitions. - dataset = dataset.repartition(num_workers, rand(1)) + if self.getOrDefault(self.repartition_random_shuffle): + # In some cases, spark round-robin repartition might cause data skew + # use random shuffle can address it. + dataset = dataset.repartition(num_workers, rand(1)) + else: + dataset = dataset.repartition(num_workers) train_params = self._get_distributed_train_params(dataset) booster_params, train_call_kwargs_params = self._get_xgb_train_call_args( From ba45f0784d06dc93240b998c65b9e666aabeab53 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 20 Sep 2022 08:28:50 +0000 Subject: [PATCH 7/7] fix lint --- python-package/xgboost/spark/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 6ec8dfb57aaa..8f58fe0b6c84 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -20,7 +20,7 @@ HasWeightCol, ) from pyspark.ml.util import MLReadable, MLWritable -from pyspark.sql.functions import col, countDistinct, pandas_udf, struct, rand +from pyspark.sql.functions import col, countDistinct, pandas_udf, rand, struct from pyspark.sql.types import ( ArrayType, DoubleType, @@ -168,7 +168,7 @@ class _SparkXGBParams( Params._dummy(), "repartition_random_shuffle", "A boolean variable. Set repartition_random_shuffle=true if you want to random shuffle " - "dataset when repartitioning is required. By default is True." + "dataset when repartitioning is required. By default is True.", ) feature_names = Param( Params._dummy(), "feature_names", "A list of str to specify feature names."