From 6701cd3d222d3c3dd9ea49c66b07f214e2f413ec Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Fri, 9 Sep 2022 22:23:54 +0800 Subject: [PATCH] add debug log Signed-off-by: Weichen Xu --- python-package/xgboost/spark/core.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 302bd709ff45..02e81a89c314 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -543,6 +543,9 @@ def _query_plan_contains_valid_repartition(self, dataset): query_plan = dataset._sc._jvm.PythonSQLUtils.explainString( dataset._jdf.queryExecution(), "extended" ) + get_logger(self.__class__.__name__).warning( + f"debug-repartition: \n{query_plan}\n" + ) start = query_plan.index("== Optimized Logical Plan ==") start += len("== Optimized Logical Plan ==") + 1 num_workers = self.getOrDefault(self.num_workers) @@ -683,11 +686,23 @@ def _fit(self, dataset): num_workers, ) + def log_partition_rows(df, msg): + + def count_partition_rows(iter): + yield len(list(iter)) + + result = df.rdd.mapPartitions(count_partition_rows).collect() + get_logger(self.__class__.__name__).warning( + f"debug-repartition: {msg}: {','.join(result)}\n" + ) + + log_partition_rows(dataset, "before-repartition") if self._repartition_needed(dataset): # Repartition on `monotonically_increasing_id` column to avoid repartition # result unbalance. Directly using `.repartition(N)` might result in some # empty partitions. dataset = dataset.repartition(num_workers, monotonically_increasing_id()) + log_partition_rows(dataset, "after-repartition") train_params = self._get_distributed_train_params(dataset) booster_params, train_call_kwargs_params = self._get_xgb_train_call_args( train_params