Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pyspark] fix empty data issue when constructing DMatrix #8245

Merged
merged 8 commits into from Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions python-package/xgboost/spark/core.py
Expand Up @@ -658,12 +658,17 @@ def _fit(self, dataset):
col(self.getOrDefault(self.weightCol)).alias(alias.weight)
)

has_validation_col = False
if self.isDefined(self.validationIndicatorCol) and self.getOrDefault(
self.validationIndicatorCol
):
select_cols.append(
col(self.getOrDefault(self.validationIndicatorCol)).alias(alias.valid)
)
# In some cases, see https://issues.apache.org/jira/browse/SPARK-40407,
# the df.repartition can result in some reducer partitions without data,
# which will cause exception or hanging issue when creating DMatrix.
has_validation_col = True

if self.isDefined(self.base_margin_col) and self.getOrDefault(
self.base_margin_col
Expand Down Expand Up @@ -765,6 +770,7 @@ def _train_booster(pandas_df_iter):
gpu_id,
dmatrix_kwargs,
enable_sparse_data_optim=enable_sparse_data_optim,
has_validation_col=has_validation_col,
)
if dvalid is not None:
dval = [(dtrain, "training"), (dvalid, "validation")]
Expand Down
18 changes: 14 additions & 4 deletions python-package/xgboost/spark/data.py
Expand Up @@ -147,12 +147,13 @@ def _read_csr_matrix_from_unwrapped_spark_vec(part: pd.DataFrame) -> csr_matrix:
)


def create_dmatrix_from_partitions(
def create_dmatrix_from_partitions( # pylint: disable=too-many-arguments
iterator: Iterator[pd.DataFrame],
feature_cols: Optional[Sequence[str]],
gpu_id: Optional[int],
kwargs: Dict[str, Any], # use dict to make sure this parameter is passed.
enable_sparse_data_optim: bool,
has_validation_col: bool,
) -> Tuple[DMatrix, Optional[DMatrix]]:
"""Create DMatrix from spark data partitions. This is not particularly efficient as
we need to convert the pandas series format to numpy then concatenate all the data.
Expand All @@ -173,7 +174,7 @@ def create_dmatrix_from_partitions(

def append_m(part: pd.DataFrame, name: str, is_valid: bool) -> None:
nonlocal n_features
if name in part.columns:
if name in part.columns and part[name].shape[0] > 0:
array = part[name]
if name == alias.data:
array = stack_series(array)
Expand Down Expand Up @@ -224,6 +225,10 @@ def append_dqm(part: pd.DataFrame, name: str, is_valid: bool) -> None:
train_data[name].append(array)

def make(values: Dict[str, List[np.ndarray]], kwargs: Dict[str, Any]) -> DMatrix:
if len(values) == 0:
# We must construct an empty DMatrix to bypass the AllReduce
return DMatrix(data=np.empty((0, 0)), **kwargs)

data = concat_or_none(values[alias.data])
label = concat_or_none(values.get(alias.label, None))
weight = concat_or_none(values.get(alias.weight, None))
Expand All @@ -247,9 +252,14 @@ def make(values: Dict[str, List[np.ndarray]], kwargs: Dict[str, Any]) -> DMatrix
it = PartIter(train_data, gpu_id)
dtrain = DeviceQuantileDMatrix(it, **kwargs)

dvalid = make(valid_data, kwargs) if len(valid_data) != 0 else None
# Using has_validation_col here to indicate if there is validation col
# instead of getting it from iterator, since the iterator may be empty
# in some special case. That is to say, we must ensure every worker
# construct DMatrix even there is no any data since we need to ensure every
# worker do the AllReduce when constructing DMatrix, or else it may hang
# forever.
dvalid = make(valid_data, kwargs) if has_validation_col else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good fix!


assert dtrain.num_col() == n_features
if dvalid is not None:
assert dvalid.num_col() == dtrain.num_col()

Expand Down
4 changes: 2 additions & 2 deletions tests/python/test_spark/test_data.py
Expand Up @@ -68,11 +68,11 @@ def run_dmatrix_ctor(is_dqm: bool) -> None:
if is_dqm:
cols = [f"feat-{i}" for i in range(n_features)]
train_Xy, valid_Xy = create_dmatrix_from_partitions(
iter(dfs), cols, 0, kwargs, False
iter(dfs), cols, 0, kwargs, False, False
)
else:
train_Xy, valid_Xy = create_dmatrix_from_partitions(
iter(dfs), None, None, kwargs, False
iter(dfs), None, None, kwargs, False, False
)

assert valid_Xy is not None
Expand Down
64 changes: 64 additions & 0 deletions tests/python/test_spark/test_spark_local.py
Expand Up @@ -6,6 +6,8 @@
import numpy as np
import pytest
import testing as tm
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import rand, when

if tm.no_spark()["condition"]:
pytest.skip(msg=tm.no_spark()["reason"], allow_module_level=True)
Expand Down Expand Up @@ -1058,3 +1060,65 @@ def test_ranker(self):

for row in pred_result:
assert np.isclose(row.prediction, row.expected_prediction, rtol=1e-3)

def test_empty_validation_data(self):
df_train = self.session.createDataFrame(
[
(Vectors.dense(10.1, 11.2, 11.3), 0, False),
(Vectors.dense(1, 1.2, 1.3), 1, False),
(Vectors.dense(14.0, 15.0, 16.0), 0, False),
(Vectors.dense(1.1, 1.2, 1.3), 1, True),
],
["features", "label", "val_col"],
)
classifier = SparkXGBClassifier(
num_workers=2,
min_child_weight=0.0,
reg_alpha=0,
reg_lambda=0,
validation_indicator_col="val_col",
)
model = classifier.fit(df_train)
pred_result = model.transform(df_train).collect()
for row in pred_result:
self.assertEqual(row.prediction, row.label)

def test_empty_train_data(self):
df_train = self.session.createDataFrame(
[
(Vectors.dense(10.1, 11.2, 11.3), 0, True),
(Vectors.dense(1, 1.2, 1.3), 1, True),
(Vectors.dense(14.0, 15.0, 16.0), 0, True),
(Vectors.dense(1.1, 1.2, 1.3), 1, False),
],
["features", "label", "val_col"],
)
classifier = SparkXGBClassifier(
num_workers=2,
min_child_weight=0.0,
reg_alpha=0,
reg_lambda=0,
validation_indicator_col="val_col",
)
model = classifier.fit(df_train)
pred_result = model.transform(df_train).collect()
for row in pred_result:
self.assertEqual(row.prediction, 1.0)

def test_empty_partition(self):
# raw_df.repartition(4) will result int severe data skew, actually,
# there is no any data in reducer partition 1, reducer partition 2
# see https://github.com/dmlc/xgboost/issues/8221
raw_df = self.session.range(0, 100, 1, 50).withColumn(
"label", when(rand(1) > 0.5, 1).otherwise(0)
)
vector_assembler = (
VectorAssembler().setInputCols(["id"]).setOutputCol("features")
)
data_trans = vector_assembler.setHandleInvalid("keep").transform(raw_df)
data_trans.show(100)

classifier = SparkXGBClassifier(
num_workers=4,
)
classifier.fit(data_trans)
2 changes: 1 addition & 1 deletion tests/python/test_spark/utils.py
Expand Up @@ -102,7 +102,7 @@ class SparkTestCase(TestSparkContext, TestTempDir, unittest.TestCase):
def setUpClass(cls):
cls.setup_env(
{
"spark.master": "local[2]",
"spark.master": "local[4]",
"spark.python.worker.reuse": "false",
"spark.driver.host": "127.0.0.1",
"spark.task.maxFailures": "1",
Expand Down