Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pyspark] support a list of feature column names #8117

Merged
merged 20 commits into from Aug 8, 2022
Merged
149 changes: 118 additions & 31 deletions python-package/xgboost/spark/core.py
Expand Up @@ -2,7 +2,7 @@
"""Xgboost pyspark integration submodule for core code."""
# pylint: disable=fixme, too-many-ancestors, protected-access, no-member, invalid-name
# pylint: disable=too-few-public-methods
from typing import Iterator, Tuple
from typing import Iterator, Optional, Tuple

import numpy as np
import pandas as pd
Expand All @@ -26,6 +26,7 @@
DoubleType,
FloatType,
IntegerType,
IntegralType,
LongType,
ShortType,
)
Expand All @@ -43,7 +44,7 @@
SparkXGBReader,
SparkXGBWriter,
)
from .params import HasArbitraryParamsDict, HasBaseMarginCol
from .params import HasArbitraryParamsDict, HasBaseMarginCol, HasFeaturesCols
from .utils import (
RabitContext,
_get_args_from_message_list,
Expand Down Expand Up @@ -73,14 +74,10 @@
"num_workers",
"use_gpu",
"feature_names",
"features_cols",
]

_non_booster_params = [
"missing",
"n_estimators",
"feature_types",
"feature_weights",
]
_non_booster_params = ["missing", "n_estimators", "feature_types", "feature_weights"]

_pyspark_param_alias_map = {
"features_col": "featuresCol",
Expand Down Expand Up @@ -126,6 +123,7 @@ class _SparkXGBParams(
HasValidationIndicatorCol,
HasArbitraryParamsDict,
HasBaseMarginCol,
HasFeaturesCols,
):
num_workers = Param(
Params._dummy(),
Expand Down Expand Up @@ -262,6 +260,13 @@ def _validate_params(self):
"Therefore, that parameter will be ignored."
)

if self.getOrDefault(self.features_cols) and not self.getOrDefault(
self.use_gpu
):
raise ValueError(
"XGBoost accepts a list of feature column names only when use_gpu is enabled."
)
trivialfis marked this conversation as resolved.
Show resolved Hide resolved

if self.getOrDefault(self.use_gpu):
tree_method = self.getParam("tree_method")
if (
Expand Down Expand Up @@ -315,6 +320,23 @@ def _validate_params(self):
)


def _validate_and_convert_feature_col_as_float_col(
trivialfis marked this conversation as resolved.
Show resolved Hide resolved
dataset, features_col_name: list
trivialfis marked this conversation as resolved.
Show resolved Hide resolved
) -> list:
"""feature column names must be IntegralType or float or double types"""
trivialfis marked this conversation as resolved.
Show resolved Hide resolved
feature_cols = []
for c in features_col_name:
if isinstance(dataset.schema[c].dataType, DoubleType):
feature_cols.append(col(c).cast(FloatType()).alias(c))
trivialfis marked this conversation as resolved.
Show resolved Hide resolved
elif isinstance(dataset.schema[c].dataType, (FloatType, IntegralType)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's IntegralType type ? Do you mean IntegerType ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trivialfis, could you correct me if xgboost supports IntegralType (Byte/Integer/Long/Short) ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sort of. XGBoost will convert the data to float internally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, XGBoost can convert it on the fly without creating a copy of data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx @trivialfis, Yeah, if the type is short/byte, it will reduce the size of the shuffle write.

feature_cols.append(col(c))
else:
raise ValueError(
"Feature column must be integral types or float/double types"
)
return feature_cols


def _validate_and_convert_feature_col_as_array_col(dataset, features_col_name):
features_col_datatype = dataset.schema[features_col_name].dataType
features_col = col(features_col_name)
Expand All @@ -341,6 +363,22 @@ def _validate_and_convert_feature_col_as_array_col(dataset, features_col_name):
return features_array_col


def _validate_and_convert_feature_col(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the function being used ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx, removed.

dataset, feature_names: list, feature_name: str
) -> list:
"""XGBoost model trained with features_cols parameter is also can transform
vector or array feature type. But we first check features_cols and then check
featuresCol"""
if (
len(feature_names)
> 0
>= len([c for c in feature_names if c not in dataset.columns])
):
return _validate_and_convert_feature_col_as_float_col(dataset, feature_names)

return [_validate_and_convert_feature_col_as_array_col(dataset, feature_name)]


class _SparkXGBEstimator(Estimator, _SparkXGBParams, MLReadable, MLWritable):
def __init__(self):
super().__init__()
Expand Down Expand Up @@ -373,8 +411,14 @@ def setParams(self, **kwargs): # pylint: disable=invalid-name
f"Please use param name {_inverse_pyspark_param_alias_map[k]} instead."
)
if k in _pyspark_param_alias_map:
real_k = _pyspark_param_alias_map[k]
k = real_k
if k == _inverse_pyspark_param_alias_map[
self.featuresCol.name
] and isinstance(v, list):
real_k = self.features_cols.name
k = real_k
else:
real_k = _pyspark_param_alias_map[k]
k = real_k

if self.hasParam(k):
self._set(**{str(k): v})
Expand Down Expand Up @@ -497,10 +541,19 @@ def _fit(self, dataset):
self._validate_params()
label_col = col(self.getOrDefault(self.labelCol)).alias(alias.label)

features_array_col = _validate_and_convert_feature_col_as_array_col(
dataset, self.getOrDefault(self.featuresCol)
)
select_cols = [features_array_col, label_col]
select_cols = [label_col]
features_cols_names = None
if len(self.getOrDefault(self.features_cols)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(self.getOrDefault(self.features_cols)):
if self.getOrDefault(self.features_cols):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, Thx

features_cols_names = self.getOrDefault(self.features_cols)
features_cols = _validate_and_convert_feature_col_as_float_col(
dataset, features_cols_names
)
select_cols.extend(features_cols)
else:
features_array_col = _validate_and_convert_feature_col_as_array_col(
dataset, self.getOrDefault(self.featuresCol)
)
select_cols.append(features_array_col)

if self.isDefined(self.weightCol) and self.getOrDefault(self.weightCol):
select_cols.append(
Expand Down Expand Up @@ -583,9 +636,7 @@ def _train_booster(pandas_df_iter):
evals_result = {}
with RabitContext(_rabit_args, context):
dtrain, dvalid = create_dmatrix_from_partitions(
pandas_df_iter,
None,
dmatrix_kwargs,
pandas_df_iter, features_cols_names, dmatrix_kwargs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if features_cols_names conflicts with label / weight / base_margin column name ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx, I tried, it worked, no exception happened.

)
if dvalid is not None:
dval = [(dtrain, "training"), (dvalid, "validation")]
Expand Down Expand Up @@ -685,6 +736,38 @@ def read(cls):
def _transform(self, dataset):
raise NotImplementedError()

def _get_feature_col(self, dataset) -> (list, Optional[list]):
"""XGBoost model trained with features_cols parameter can also predict
vector or array feature type. But frist we need to check features_cols
and then featuresCol
"""

feature_col_names = self.getOrDefault(self.features_cols)
features_col = []
if (
len(feature_col_names)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition looks weird?

len(feature_col_names) > 0 >= len(...)

is it correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. the pylint required this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all(c in dataset.columns for c in feature_col_names)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I use another set way to check.

> 0
>= len([c for c in feature_col_names if c not in dataset.columns])
):
# The model is trained with features_cols and the predicted dataset
# also contains all the columns specified by features_cols.
features_col = _validate_and_convert_feature_col_as_float_col(
dataset, feature_col_names
)
else:
# 1. The model was trained by features_cols, but the dataset doesn't contain
# all the columns specified by features_cols, so we need to check if
# the dataframe has the featuresCol
# 2. The model was trained by featuresCol, and the predicted dataset must contain
# featuresCol column.
feature_col_names = None
features_col.append(
_validate_and_convert_feature_col_as_array_col(
dataset, self.getOrDefault(self.featuresCol)
)
)
return features_col, feature_col_names


class SparkXGBRegressorModel(_SparkXGBModel):
"""
Expand Down Expand Up @@ -712,11 +795,17 @@ def _transform(self, dataset):
alias.margin
)

features_col, feature_col_names = self._get_feature_col(dataset)

@pandas_udf("double")
def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]:
model = xgb_sklearn_model
for data in iterator:
X = stack_series(data[alias.data])
if feature_col_names is not None:
X = data[feature_col_names]
else:
X = stack_series(data[alias.data])

if has_base_margin:
base_margin = data[alias.margin].to_numpy()
else:
Expand All @@ -730,14 +819,10 @@ def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]:
)
yield pd.Series(preds)

features_col = _validate_and_convert_feature_col_as_array_col(
dataset, self.getOrDefault(self.featuresCol)
)

if has_base_margin:
pred_col = predict_udf(struct(features_col, base_margin_col))
pred_col = predict_udf(struct(*features_col, base_margin_col))
else:
pred_col = predict_udf(struct(features_col))
pred_col = predict_udf(struct(*features_col))

predictionColName = self.getOrDefault(self.predictionCol)

Expand Down Expand Up @@ -783,6 +868,8 @@ def transform_margin(margins: np.ndarray):
class_probs = softmax(raw_preds, axis=1)
return raw_preds, class_probs

features_col, feature_col_names = self._get_feature_col(dataset)

@pandas_udf(
"rawPrediction array<double>, prediction double, probability array<double>"
)
Expand All @@ -791,7 +878,11 @@ def predict_udf(
) -> Iterator[pd.DataFrame]:
model = xgb_sklearn_model
for data in iterator:
X = stack_series(data[alias.data])
if feature_col_names is not None:
X = data[feature_col_names]
else:
X = stack_series(data[alias.data])

if has_base_margin:
base_margin = stack_series(data[alias.margin])
else:
Expand All @@ -817,14 +908,10 @@ def predict_udf(
}
)

features_col = _validate_and_convert_feature_col_as_array_col(
dataset, self.getOrDefault(self.featuresCol)
)

if has_base_margin:
pred_struct = predict_udf(struct(features_col, base_margin_col))
pred_struct = predict_udf(struct(*features_col, base_margin_col))
else:
pred_struct = predict_udf(struct(features_col))
pred_struct = predict_udf(struct(*features_col))

pred_struct_col = "_prediction_struct"

Expand Down
19 changes: 19 additions & 0 deletions python-package/xgboost/spark/params.py
@@ -1,6 +1,7 @@
# type: ignore
"""Xgboost pyspark integration submodule for params."""
# pylint: disable=too-few-public-methods
from pyspark.ml.param import TypeConverters
from pyspark.ml.param.shared import Param, Params


Expand Down Expand Up @@ -31,3 +32,21 @@ class HasBaseMarginCol(Params):
"base_margin_col",
"This stores the name for the column of the base margin",
)


class HasFeaturesCols(Params):
"""
Mixin for param featuresCols: a list of feature column names.
This parameter is taken effect only when use_gpu is enabled.
"""

features_cols = Param(
Params._dummy(),
"features_cols",
"feature column names.",
typeConverter=TypeConverters.toListString,
)

def __init__(self):
super().__init__()
self._setDefault(features_cols=[])