Skip to content

Commit

Permalink
[pyspark] fix empty data issue when constructing DMatrix (#8245)
Browse files Browse the repository at this point in the history
Co-authored-by: Hyunsu Philip Cho <chohyu01@cs.washington.edu>
  • Loading branch information
wbo4958 and hcho3 committed Sep 20, 2022
1 parent 70df36c commit 520586f
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 7 deletions.
6 changes: 6 additions & 0 deletions python-package/xgboost/spark/core.py
Expand Up @@ -658,12 +658,17 @@ def _fit(self, dataset):
col(self.getOrDefault(self.weightCol)).alias(alias.weight)
)

has_validation_col = False
if self.isDefined(self.validationIndicatorCol) and self.getOrDefault(
self.validationIndicatorCol
):
select_cols.append(
col(self.getOrDefault(self.validationIndicatorCol)).alias(alias.valid)
)
# In some cases, see https://issues.apache.org/jira/browse/SPARK-40407,
# the df.repartition can result in some reducer partitions without data,
# which will cause exception or hanging issue when creating DMatrix.
has_validation_col = True

if self.isDefined(self.base_margin_col) and self.getOrDefault(
self.base_margin_col
Expand Down Expand Up @@ -765,6 +770,7 @@ def _train_booster(pandas_df_iter):
gpu_id,
dmatrix_kwargs,
enable_sparse_data_optim=enable_sparse_data_optim,
has_validation_col=has_validation_col,
)
if dvalid is not None:
dval = [(dtrain, "training"), (dvalid, "validation")]
Expand Down
18 changes: 14 additions & 4 deletions python-package/xgboost/spark/data.py
Expand Up @@ -147,12 +147,13 @@ def _read_csr_matrix_from_unwrapped_spark_vec(part: pd.DataFrame) -> csr_matrix:
)


def create_dmatrix_from_partitions(
def create_dmatrix_from_partitions( # pylint: disable=too-many-arguments
iterator: Iterator[pd.DataFrame],
feature_cols: Optional[Sequence[str]],
gpu_id: Optional[int],
kwargs: Dict[str, Any], # use dict to make sure this parameter is passed.
enable_sparse_data_optim: bool,
has_validation_col: bool,
) -> Tuple[DMatrix, Optional[DMatrix]]:
"""Create DMatrix from spark data partitions. This is not particularly efficient as
we need to convert the pandas series format to numpy then concatenate all the data.
Expand All @@ -173,7 +174,7 @@ def create_dmatrix_from_partitions(

def append_m(part: pd.DataFrame, name: str, is_valid: bool) -> None:
nonlocal n_features
if name in part.columns:
if name in part.columns and part[name].shape[0] > 0:
array = part[name]
if name == alias.data:
array = stack_series(array)
Expand Down Expand Up @@ -224,6 +225,10 @@ def append_dqm(part: pd.DataFrame, name: str, is_valid: bool) -> None:
train_data[name].append(array)

def make(values: Dict[str, List[np.ndarray]], kwargs: Dict[str, Any]) -> DMatrix:
if len(values) == 0:
# We must construct an empty DMatrix to bypass the AllReduce
return DMatrix(data=np.empty((0, 0)), **kwargs)

data = concat_or_none(values[alias.data])
label = concat_or_none(values.get(alias.label, None))
weight = concat_or_none(values.get(alias.weight, None))
Expand All @@ -247,9 +252,14 @@ def make(values: Dict[str, List[np.ndarray]], kwargs: Dict[str, Any]) -> DMatrix
it = PartIter(train_data, gpu_id)
dtrain = DeviceQuantileDMatrix(it, **kwargs)

dvalid = make(valid_data, kwargs) if len(valid_data) != 0 else None
# Using has_validation_col here to indicate if there is validation col
# instead of getting it from iterator, since the iterator may be empty
# in some special case. That is to say, we must ensure every worker
# construct DMatrix even there is no any data since we need to ensure every
# worker do the AllReduce when constructing DMatrix, or else it may hang
# forever.
dvalid = make(valid_data, kwargs) if has_validation_col else None

assert dtrain.num_col() == n_features
if dvalid is not None:
assert dvalid.num_col() == dtrain.num_col()

Expand Down
4 changes: 2 additions & 2 deletions tests/python/test_spark/test_data.py
Expand Up @@ -68,11 +68,11 @@ def run_dmatrix_ctor(is_dqm: bool) -> None:
if is_dqm:
cols = [f"feat-{i}" for i in range(n_features)]
train_Xy, valid_Xy = create_dmatrix_from_partitions(
iter(dfs), cols, 0, kwargs, False
iter(dfs), cols, 0, kwargs, False, True
)
else:
train_Xy, valid_Xy = create_dmatrix_from_partitions(
iter(dfs), None, None, kwargs, False
iter(dfs), None, None, kwargs, False, True
)

assert valid_Xy is not None
Expand Down
63 changes: 63 additions & 0 deletions tests/python/test_spark/test_spark_local.py
Expand Up @@ -17,6 +17,7 @@
BinaryClassificationEvaluator,
MulticlassClassificationEvaluator,
)
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.functions import vector_to_array
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
Expand Down Expand Up @@ -1058,3 +1059,65 @@ def test_ranker(self):

for row in pred_result:
assert np.isclose(row.prediction, row.expected_prediction, rtol=1e-3)

def test_empty_validation_data(self):
df_train = self.session.createDataFrame(
[
(Vectors.dense(10.1, 11.2, 11.3), 0, False),
(Vectors.dense(1, 1.2, 1.3), 1, False),
(Vectors.dense(14.0, 15.0, 16.0), 0, False),
(Vectors.dense(1.1, 1.2, 1.3), 1, True),
],
["features", "label", "val_col"],
)
classifier = SparkXGBClassifier(
num_workers=2,
min_child_weight=0.0,
reg_alpha=0,
reg_lambda=0,
validation_indicator_col="val_col",
)
model = classifier.fit(df_train)
pred_result = model.transform(df_train).collect()
for row in pred_result:
self.assertEqual(row.prediction, row.label)

def test_empty_train_data(self):
df_train = self.session.createDataFrame(
[
(Vectors.dense(10.1, 11.2, 11.3), 0, True),
(Vectors.dense(1, 1.2, 1.3), 1, True),
(Vectors.dense(14.0, 15.0, 16.0), 0, True),
(Vectors.dense(1.1, 1.2, 1.3), 1, False),
],
["features", "label", "val_col"],
)
classifier = SparkXGBClassifier(
num_workers=2,
min_child_weight=0.0,
reg_alpha=0,
reg_lambda=0,
validation_indicator_col="val_col",
)
model = classifier.fit(df_train)
pred_result = model.transform(df_train).collect()
for row in pred_result:
self.assertEqual(row.prediction, 1.0)

def test_empty_partition(self):
# raw_df.repartition(4) will result int severe data skew, actually,
# there is no any data in reducer partition 1, reducer partition 2
# see https://github.com/dmlc/xgboost/issues/8221
raw_df = self.session.range(0, 100, 1, 50).withColumn(
"label", spark_sql_func.when(spark_sql_func.rand(1) > 0.5, 1).otherwise(0)
)
vector_assembler = (
VectorAssembler().setInputCols(["id"]).setOutputCol("features")
)
data_trans = vector_assembler.setHandleInvalid("keep").transform(raw_df)
data_trans.show(100)

classifier = SparkXGBClassifier(
num_workers=4,
)
classifier.fit(data_trans)
2 changes: 1 addition & 1 deletion tests/python/test_spark/utils.py
Expand Up @@ -102,7 +102,7 @@ class SparkTestCase(TestSparkContext, TestTempDir, unittest.TestCase):
def setUpClass(cls):
cls.setup_env(
{
"spark.master": "local[2]",
"spark.master": "local[4]",
"spark.python.worker.reuse": "false",
"spark.driver.host": "127.0.0.1",
"spark.task.maxFailures": "1",
Expand Down

0 comments on commit 520586f

Please sign in to comment.