Skip to content

Commit

Permalink
add debug log
Browse files Browse the repository at this point in the history
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
  • Loading branch information
WeichenXu123 committed Sep 9, 2022
1 parent 9854131 commit 7bcdda3
Showing 1 changed file with 15 additions and 0 deletions.
15 changes: 15 additions & 0 deletions python-package/xgboost/spark/core.py
Expand Up @@ -543,6 +543,9 @@ def _query_plan_contains_valid_repartition(self, dataset):
query_plan = dataset._sc._jvm.PythonSQLUtils.explainString(
dataset._jdf.queryExecution(), "extended"
)
get_logger(self.__class__.__name__).warning(
f"debug-repartition: \n{query_plan}\n"
)
start = query_plan.index("== Optimized Logical Plan ==")
start += len("== Optimized Logical Plan ==") + 1
num_workers = self.getOrDefault(self.num_workers)
Expand Down Expand Up @@ -683,11 +686,23 @@ def _fit(self, dataset):
num_workers,
)

def log_partition_rows(df, msg):

def count_partition_rows(iter):
yield len(list(iter))

result = df.rdd.mapPartitions(count_partition_rows).collect()
get_logger(self.__class__.__name__).warning(
f"debug-repartition: {msg}: {str(list(result))}\n"
)

log_partition_rows(dataset, "before-repartition")
if self._repartition_needed(dataset):
# Repartition on `monotonically_increasing_id` column to avoid repartition
# result unbalance. Directly using `.repartition(N)` might result in some
# empty partitions.
dataset = dataset.repartition(num_workers, monotonically_increasing_id())
log_partition_rows(dataset, "after-repartition")
train_params = self._get_distributed_train_params(dataset)
booster_params, train_call_kwargs_params = self._get_xgb_train_call_args(
train_params
Expand Down

0 comments on commit 7bcdda3

Please sign in to comment.