Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/dmlc/xgboost into optimiz…
Browse files Browse the repository at this point in the history
…ation_part_applysplit
  • Loading branch information
ShvetsKS committed May 17, 2022
2 parents efb4f50 + 806c92c commit d41893c
Show file tree
Hide file tree
Showing 27 changed files with 856 additions and 705 deletions.
Expand Up @@ -61,15 +61,14 @@ class GpuPreXGBoost extends PreXGBoostProvider {
* @param estimator [[XGBoostClassifier]] or [[XGBoostRegressor]]
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* RDD[() => Watches] will be used as the training input
* Option[ RDD[_] ] is the optional cached RDD
*/
override def buildDatasetToRDD(estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = {
GpuPreXGBoost.buildDatasetToRDD(estimator, dataset, params)
}

Expand Down Expand Up @@ -123,16 +122,15 @@ object GpuPreXGBoost extends PreXGBoostProvider {
* @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* RDD[() => Watches] will be used as the training input to build DMatrix
* Option[ RDD[_] ] is the optional cached RDD
*/
override def buildDatasetToRDD(
estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = {

val (Seq(labelName, weightName, marginName), feturesCols, groupName, evalSets) =
estimator match {
Expand Down Expand Up @@ -170,7 +168,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
xgbExecParams: XGBoostExecutionParams =>
val dataMap = prepareInputData(trainingData, evalDataMap, xgbExecParams.numWorkers,
xgbExecParams.cacheTrainingSet)
(true, buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
(buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
}

/**
Expand Down
Expand Up @@ -101,16 +101,15 @@ object PreXGBoost extends PreXGBoostProvider {
* @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* RDD[() => Watches] will be used as the training input
* Option[RDD[_]\] is the optional cached RDD
*/
override def buildDatasetToRDD(
estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]): XGBoostExecutionParams =>
(Boolean, RDD[() => Watches], Option[RDD[_]]) = {
(RDD[() => Watches], Option[RDD[_]]) = {

if (optionProvider.isDefined && optionProvider.get.providerEnabled(Some(dataset))) {
return optionProvider.get.buildDatasetToRDD(estimator, dataset, params)
Expand Down Expand Up @@ -172,12 +171,12 @@ object PreXGBoost extends PreXGBoostProvider {
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
case Right(trainingData) =>
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
}

}
Expand Down Expand Up @@ -324,20 +323,20 @@ object PreXGBoost extends PreXGBoostProvider {
trainingSet: RDD[XGBLabeledPoint],
evalRDDMap: Map[String, RDD[XGBLabeledPoint]] = Map(),
hasGroup: Boolean = false):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = {

xgbExecParams: XGBoostExecutionParams =>
composeInputData(trainingSet, hasGroup, xgbExecParams.numWorkers) match {
case Left(trainingData) =>
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
case Right(trainingData) =>
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
}
}

Expand Down
Expand Up @@ -50,16 +50,15 @@ private[scala] trait PreXGBoostProvider {
* @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* RDD[() => Watches] will be used as the training input to build DMatrix
* Option[ RDD[_] ] is the optional cached RDD
*/
def buildDatasetToRDD(
estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]])
XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]])

/**
* Transform Dataset
Expand Down
Expand Up @@ -286,7 +286,6 @@ object XGBoost extends Serializable {
}

private def buildDistributedBooster(
buildDMatrixInRabit: Boolean,
buildWatches: () => Watches,
xgbExecutionParam: XGBoostExecutionParams,
rabitEnv: java.util.Map[String, String],
Expand All @@ -295,11 +294,6 @@ object XGBoost extends Serializable {
prevBooster: Booster): Iterator[(Booster, Map[String, Array[Float]])] = {

var watches: Watches = null
if (!buildDMatrixInRabit) {
// for CPU pipeline, we need to build DMatrix out of rabit context
watches = buildWatchesAndCheck(buildWatches)
}

val taskId = TaskContext.getPartitionId().toString
val attempt = TaskContext.get().attemptNumber.toString
rabitEnv.put("DMLC_TASK_ID", taskId)
Expand All @@ -310,10 +304,7 @@ object XGBoost extends Serializable {
try {
Rabit.init(rabitEnv)

if (buildDMatrixInRabit) {
// for GPU pipeline, we need to move dmatrix building into rabit context
watches = buildWatchesAndCheck(buildWatches)
}
watches = buildWatchesAndCheck(buildWatches)

val numEarlyStoppingRounds = xgbExecutionParam.earlyStoppingParams.numEarlyStoppingRounds
val metrics = Array.tabulate(watches.size)(_ => Array.ofDim[Float](numRounds))
Expand Down Expand Up @@ -377,7 +368,7 @@ object XGBoost extends Serializable {
@throws(classOf[XGBoostError])
private[spark] def trainDistributed(
sc: SparkContext,
buildTrainingData: XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]),
buildTrainingData: XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]),
params: Map[String, Any]):
(Booster, Map[String, Array[Float]]) = {

Expand All @@ -396,7 +387,7 @@ object XGBoost extends Serializable {
}.orNull

// Get the training data RDD and the cachedRDD
val (buildDMatrixInRabit, trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams)
val (trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams)

try {
// Train for every ${savingRound} rounds and save the partially completed booster
Expand All @@ -413,9 +404,8 @@ object XGBoost extends Serializable {
optionWatches = Some(iter.next())
}

optionWatches.map { buildWatches => buildDistributedBooster(buildDMatrixInRabit,
buildWatches, xgbExecParams, rabitEnv, xgbExecParams.obj,
xgbExecParams.eval, prevBooster)}
optionWatches.map { buildWatches => buildDistributedBooster(buildWatches,
xgbExecParams, rabitEnv, xgbExecParams.obj, xgbExecParams.eval, prevBooster)}
.getOrElse(throw new RuntimeException("No Watches to train"))

}}
Expand Down
Expand Up @@ -65,8 +65,6 @@ class FeatureSizeValidatingSuite extends FunSuite with PerTest {
(id, lp.label, lp.features)
}.toDF("id", "label", "features")
val xgb = new XGBoostClassifier(paramMap)
intercept[Exception] {
xgb.fit(repartitioned)
}
xgb.fit(repartitioned)
}
}
18 changes: 15 additions & 3 deletions python-package/xgboost/_typing.py
@@ -1,21 +1,32 @@
"""Shared typing definition."""
import ctypes
import os
from typing import Optional, Any, TypeVar, Union, Sequence
from typing import Any, TypeVar, Union, Type, Sequence, Callable, List, Dict

# os.PathLike/string/numpy.array/scipy.sparse/pd.DataFrame/dt.Frame/
# cudf.DataFrame/cupy.array/dlpack
import numpy as np

DataType = Any

# xgboost accepts some other possible types in practice due to historical reason, which is
# lesser tested. For now we encourage users to pass a simple list of string.
FeatureNames = Optional[Sequence[str]]
FeatureTypes = Optional[Sequence[str]]
FeatureInfo = Sequence[str]
FeatureNames = FeatureInfo
FeatureTypes = FeatureInfo
BoosterParam = Union[List, Dict] # better be sequence

ArrayLike = Any
PathLike = Union[str, os.PathLike]
CupyT = ArrayLike # maybe need a stub for cupy arrays
NumpyOrCupy = Any
NumpyDType = Union[str, Type[np.number]]
PandasDType = Any # real type is pandas.core.dtypes.base.ExtensionDtype

FloatCompatible = Union[float, np.float32, np.float64]

# callables
FPreProcCallable = Callable

# ctypes
# c_bst_ulong corresponds to bst_ulong defined in xgboost/c_api.h
Expand Down Expand Up @@ -59,3 +70,4 @@

# template parameter
_T = TypeVar("_T")
_F = TypeVar("_F", bound=Callable[..., Any])
38 changes: 20 additions & 18 deletions python-package/xgboost/callback.py
Expand Up @@ -10,8 +10,7 @@
import collections
import os
import pickle
from typing import Callable, List, Optional, Union, Dict, Tuple, TypeVar, cast
from typing import Sequence
from typing import Callable, List, Optional, Union, Dict, Tuple, TypeVar, cast, Sequence, Any
import numpy

from . import rabit
Expand All @@ -24,11 +23,14 @@
"EarlyStopping",
"EvaluationMonitor",
"TrainingCheckPoint",
"CallbackContainer"
]

_Score = Union[float, Tuple[float, float]]
_ScoreList = Union[List[float], List[Tuple[float, float]]]

_Model = Any # real type is Union[Booster, CVPack]; need more work


# pylint: disable=unused-argument
class TrainingCallback(ABC):
Expand All @@ -43,19 +45,19 @@ class TrainingCallback(ABC):
def __init__(self) -> None:
pass

def before_training(self, model):
def before_training(self, model: _Model) -> _Model:
'''Run before training starts.'''
return model

def after_training(self, model):
def after_training(self, model: _Model) -> _Model:
'''Run after training is finished.'''
return model

def before_iteration(self, model, epoch: int, evals_log: EvalsLog) -> bool:
def before_iteration(self, model: _Model, epoch: int, evals_log: EvalsLog) -> bool:
'''Run before each iteration. Return True when training should stop.'''
return False

def after_iteration(self, model, epoch: int, evals_log: EvalsLog) -> bool:
def after_iteration(self, model: _Model, epoch: int, evals_log: EvalsLog) -> bool:
'''Run after each iteration. Return True when training should stop.'''
return False

Expand Down Expand Up @@ -140,7 +142,7 @@ def __init__(
if self.is_cv:
self.aggregated_cv = None

def before_training(self, model):
def before_training(self, model: _Model) -> _Model:
'''Function called before training.'''
for c in self.callbacks:
model = c.before_training(model=model)
Expand All @@ -151,7 +153,7 @@ def before_training(self, model):
assert isinstance(model, Booster), msg
return model

def after_training(self, model):
def after_training(self, model: _Model) -> _Model:
'''Function called after training.'''
for c in self.callbacks:
model = c.after_training(model=model)
Expand Down Expand Up @@ -182,7 +184,7 @@ def after_training(self, model):
return model

def before_iteration(
self, model, epoch: int, dtrain: DMatrix, evals: List[Tuple[DMatrix, str]]
self, model: _Model, epoch: int, dtrain: DMatrix, evals: Optional[List[Tuple[DMatrix, str]]]
) -> bool:
'''Function called before training iteration.'''
return any(c.before_iteration(model, epoch, self.history)
Expand Down Expand Up @@ -220,7 +222,7 @@ def _update_history(

def after_iteration(
self,
model,
model: _Model,
epoch: int,
dtrain: DMatrix,
evals: Optional[List[Tuple[DMatrix, str]]],
Expand Down Expand Up @@ -276,7 +278,7 @@ def __init__(
super().__init__()

def after_iteration(
self, model, epoch: int, evals_log: TrainingCallback.EvalsLog
self, model: _Model, epoch: int, evals_log: TrainingCallback.EvalsLog
) -> bool:
model.set_param("learning_rate", self.learning_rates(epoch))
return False
Expand Down Expand Up @@ -344,12 +346,12 @@ def __init__(
self.starting_round: int = 0
super().__init__()

def before_training(self, model):
def before_training(self, model: _Model) -> _Model:
self.starting_round = model.num_boosted_rounds()
return model

def _update_rounds(
self, score: _Score, name: str, metric: str, model, epoch: int
self, score: _Score, name: str, metric: str, model: _Model, epoch: int
) -> bool:
def get_s(x: _Score) -> float:
"""get score if it's cross validation history."""
Expand Down Expand Up @@ -403,7 +405,7 @@ def minimize(new: _Score, best: _Score) -> bool:
return True
return False

def after_iteration(self, model, epoch: int,
def after_iteration(self, model: _Model, epoch: int,
evals_log: TrainingCallback.EvalsLog) -> bool:
epoch += self.starting_round # training continuation
msg = 'Must have at least 1 validation dataset for early stopping.'
Expand Down Expand Up @@ -431,7 +433,7 @@ def after_iteration(self, model, epoch: int,
score = data_log[metric_name][-1]
return self._update_rounds(score, data_name, metric_name, model, epoch)

def after_training(self, model):
def after_training(self, model: _Model) -> _Model:
try:
if self.save_best:
model = model[: int(model.attr("best_iteration")) + 1]
Expand Down Expand Up @@ -477,7 +479,7 @@ def _fmt_metric(
msg = f"\t{data + '-' + metric}:{score:.5f}"
return msg

def after_iteration(self, model, epoch: int,
def after_iteration(self, model: _Model, epoch: int,
evals_log: TrainingCallback.EvalsLog) -> bool:
if not evals_log:
return False
Expand All @@ -503,7 +505,7 @@ def after_iteration(self, model, epoch: int,
self._latest = msg
return False

def after_training(self, model):
def after_training(self, model: _Model) -> _Model:
if rabit.get_rank() == self.printer_rank and self._latest is not None:
rabit.tracker_print(self._latest)
return model
Expand Down Expand Up @@ -544,7 +546,7 @@ def __init__(
self._epoch = 0
super().__init__()

def after_iteration(self, model, epoch: int,
def after_iteration(self, model: _Model, epoch: int,
evals_log: TrainingCallback.EvalsLog) -> bool:
if self._epoch == self._iterations:
path = os.path.join(self._path, self._name + '_' + str(epoch) +
Expand Down

0 comments on commit d41893c

Please sign in to comment.