Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into batch-position-me…
Browse files Browse the repository at this point in the history
…mcpy
  • Loading branch information
hcho3 committed Jun 27, 2022
2 parents b4f2128 + f470ad3 commit 0e9ebd7
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 43 deletions.
Expand Up @@ -27,7 +27,6 @@ import ml.dmlc.xgboost4j.scala.spark.params.XGBoostEstimatorCommon
import ml.dmlc.xgboost4j.scala.spark.{PreXGBoost, PreXGBoostProvider, Watches, XGBoost, XGBoostClassificationModel, XGBoostClassifier, XGBoostExecutionParams, XGBoostRegressionModel, XGBoostRegressor}
import org.apache.commons.logging.LogFactory

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -90,6 +89,11 @@ class GpuPreXGBoost extends PreXGBoostProvider {
}
}

class BoosterFlag extends Serializable {
// indicate if the GPU parameters are set.
var isGpuParamsSet = false
}

object GpuPreXGBoost extends PreXGBoostProvider {

private val logger = LogFactory.getLog("XGBoostSpark")
Expand Down Expand Up @@ -187,9 +191,9 @@ object GpuPreXGBoost extends PreXGBoostProvider {

// predict and turn to Row
val predictFunc =
(broadcastBooster: Broadcast[Booster], dm: DMatrix, originalRowItr: Iterator[Row]) => {
(booster: Booster, dm: DMatrix, originalRowItr: Iterator[Row]) => {
val Array(rawPredictionItr, probabilityItr, predLeafItr, predContribItr) =
m.producePredictionItrs(broadcastBooster, dm)
m.producePredictionItrs(booster, dm)
m.produceResultIterator(originalRowItr, rawPredictionItr, probabilityItr,
predLeafItr, predContribItr)
}
Expand Down Expand Up @@ -218,9 +222,9 @@ object GpuPreXGBoost extends PreXGBoostProvider {

// predict and turn to Row
val predictFunc =
(broadcastBooster: Broadcast[Booster], dm: DMatrix, originalRowItr: Iterator[Row]) => {
(booster: Booster, dm: DMatrix, originalRowItr: Iterator[Row]) => {
val Array(rawPredictionItr, predLeafItr, predContribItr) =
m.producePredictionItrs(broadcastBooster, dm)
m.producePredictionItrs(booster, dm)
m.produceResultIterator(originalRowItr, rawPredictionItr, predLeafItr,
predContribItr)
}
Expand Down Expand Up @@ -248,6 +252,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
val bOrigSchema = sc.broadcast(dataset.schema)
val bRowSchema = sc.broadcast(schema)
val bBooster = sc.broadcast(booster)
val bBoosterFlag = sc.broadcast(new BoosterFlag)

// Small vars so don't need to broadcast them
val isLocal = sc.isLocal
Expand All @@ -259,6 +264,31 @@ object GpuPreXGBoost extends PreXGBoostProvider {
// UnsafeProjection is not serializable so do it on the executor side
val toUnsafe = UnsafeProjection.create(bOrigSchema.value)

// booster is visible for all spark tasks in the same executor
val booster = bBooster.value
val boosterFlag = bBoosterFlag.value

synchronized {
// there are two kind of race conditions,
// 1. multi-taskes set parameters at a time
// 2. one task sets parameter and another task reads the parameter
// both of them can cause potential un-expected behavior, moreover,
// it may cause executor crash
// So add synchronized to allow only one task to set parameter if it is not set.
// and rely on BlockManager to ensure the same booster only be called once to
// set parameter.
if (!boosterFlag.isGpuParamsSet) {
// set some params of gpu related to booster
// - gpu id
// - predictor: Force to gpu predictor since native doesn't save predictor.
val gpuId = if (!isLocal) XGBoost.getGPUAddrFromResources else 0
booster.setParam("gpu_id", gpuId.toString)
booster.setParam("predictor", "gpu_predictor")
logger.info("GPU transform on device: " + gpuId)
boosterFlag.isGpuParamsSet = true;
}
}

// Iterator on Row
new Iterator[Row] {
// Convert InternalRow to Row
Expand All @@ -271,14 +301,6 @@ object GpuPreXGBoost extends PreXGBoostProvider {
// Iterator on Row
var iter: Iterator[Row] = null

// set some params of gpu related to booster
// - gpu id
// - predictor: Force to gpu predictor since native doesn't save predictor.
val gpuId = if (!isLocal) XGBoost.getGPUAddrFromResources else 0
bBooster.value.setParam("gpu_id", gpuId.toString)
bBooster.value.setParam("predictor", "gpu_predictor")
logger.info("GPU transform on device: " + gpuId)

TaskContext.get().addTaskCompletionListener[Unit](_ => {
closeCurrentBatch() // close the last ColumnarBatch
})
Expand Down Expand Up @@ -314,7 +336,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
val rowIterator = currentBatch.rowIterator().asScala
.map(toUnsafe)
.map(converter(_))
predictFunc(bBooster, dm, rowIterator)
predictFunc(booster, dm, rowIterator)

} finally {
dm.delete()
Expand Down
Expand Up @@ -201,9 +201,9 @@ object PreXGBoost extends PreXGBoostProvider {
val (xgbInput, featuresName) = m.vectorize(dataset)
// predict and turn to Row
val predictFunc =
(broadcastBooster: Broadcast[Booster], dm: DMatrix, originalRowItr: Iterator[Row]) => {
(booster: Booster, dm: DMatrix, originalRowItr: Iterator[Row]) => {
val Array(rawPredictionItr, probabilityItr, predLeafItr, predContribItr) =
m.producePredictionItrs(broadcastBooster, dm)
m.producePredictionItrs(booster, dm)
m.produceResultIterator(originalRowItr, rawPredictionItr, probabilityItr,
predLeafItr, predContribItr)
}
Expand Down Expand Up @@ -231,9 +231,9 @@ object PreXGBoost extends PreXGBoostProvider {
// predict and turn to Row
val (xgbInput, featuresName) = m.vectorize(dataset)
val predictFunc =
(broadcastBooster: Broadcast[Booster], dm: DMatrix, originalRowItr: Iterator[Row]) => {
(booster: Booster, dm: DMatrix, originalRowItr: Iterator[Row]) => {
val Array(rawPredictionItr, predLeafItr, predContribItr) =
m.producePredictionItrs(broadcastBooster, dm)
m.producePredictionItrs(booster, dm)
m.produceResultIterator(originalRowItr, rawPredictionItr, predLeafItr, predContribItr)
}

Expand Down Expand Up @@ -286,7 +286,7 @@ object PreXGBoost extends PreXGBoostProvider {
cacheInfo)

try {
predictFunc(bBooster, dm, batchRow.iterator)
predictFunc(bBooster.value, dm, batchRow.iterator)
} finally {
batchCnt += 1
dm.delete()
Expand Down
Expand Up @@ -20,7 +20,6 @@ import ml.dmlc.xgboost4j.scala.spark.params._
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, EvalTrait, ObjectiveTrait, XGBoost => SXGBoost}
import org.apache.hadoop.fs.Path

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.classification._
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.util._
Expand Down Expand Up @@ -329,26 +328,26 @@ class XGBoostClassificationModel private[ml](
}
}

private[scala] def producePredictionItrs(broadcastBooster: Broadcast[Booster], dm: DMatrix):
private[scala] def producePredictionItrs(booster: Booster, dm: DMatrix):
Array[Iterator[Row]] = {
val rawPredictionItr = {
broadcastBooster.value.predict(dm, outPutMargin = true, $(treeLimit)).
booster.predict(dm, outPutMargin = true, $(treeLimit)).
map(Row(_)).iterator
}
val probabilityItr = {
broadcastBooster.value.predict(dm, outPutMargin = false, $(treeLimit)).
booster.predict(dm, outPutMargin = false, $(treeLimit)).
map(Row(_)).iterator
}
val predLeafItr = {
if (isDefined(leafPredictionCol)) {
broadcastBooster.value.predictLeaf(dm, $(treeLimit)).map(Row(_)).iterator
booster.predictLeaf(dm, $(treeLimit)).map(Row(_)).iterator
} else {
Iterator()
}
}
val predContribItr = {
if (isDefined(contribPredictionCol)) {
broadcastBooster.value.predictContrib(dm, $(treeLimit)).map(Row(_)).iterator
booster.predictContrib(dm, $(treeLimit)).map(Row(_)).iterator
} else {
Iterator()
}
Expand Down
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.ml.param._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.util.{DefaultXGBoostParamsReader, DefaultXGBoostParamsWriter, XGBoostWriter}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -298,22 +297,22 @@ class XGBoostRegressionModel private[ml] (
}
}

private[scala] def producePredictionItrs(booster: Broadcast[Booster], dm: DMatrix):
private[scala] def producePredictionItrs(booster: Booster, dm: DMatrix):
Array[Iterator[Row]] = {
val originalPredictionItr = {
booster.value.predict(dm, outPutMargin = false, $(treeLimit)).map(Row(_)).iterator
booster.predict(dm, outPutMargin = false, $(treeLimit)).map(Row(_)).iterator
}
val predLeafItr = {
if (isDefined(leafPredictionCol)) {
booster.value.predictLeaf(dm, $(treeLimit)).
booster.predictLeaf(dm, $(treeLimit)).
map(Row(_)).iterator
} else {
Iterator()
}
}
val predContribItr = {
if (isDefined(contribPredictionCol)) {
booster.value.predictContrib(dm, $(treeLimit)).
booster.predictContrib(dm, $(treeLimit)).
map(Row(_)).iterator
} else {
Iterator()
Expand Down
7 changes: 4 additions & 3 deletions plugin/federated/CMakeLists.txt
@@ -1,6 +1,6 @@
# gRPC needs to be installed first. See README.md.
find_package(Protobuf REQUIRED)
find_package(gRPC REQUIRED)
find_package(Protobuf CONFIG REQUIRED)
find_package(gRPC CONFIG REQUIRED)
find_package(Threads)

# Generated code from the protobuf definition.
Expand All @@ -18,7 +18,8 @@ protobuf_generate(
PLUGIN "protoc-gen-grpc=${grpc_cpp_plugin_location}")

# Wrapper for the gRPC client.
add_library(federated_client INTERFACE federated_client.h)
add_library(federated_client INTERFACE)
target_sources(federated_client INTERFACE federated_client.h)
target_link_libraries(federated_client INTERFACE federated_proto)

# Rabit engine for Federated Learning.
Expand Down
2 changes: 1 addition & 1 deletion plugin/federated/README.md
Expand Up @@ -7,7 +7,7 @@ Install gRPC
------------
```shell
sudo apt-get install build-essential autoconf libtool pkg-config cmake ninja-build
git clone -b v1.45.2 https://github.com/grpc/grpc
git clone -b v1.47.0 https://github.com/grpc/grpc
cd grpc
git submodule update --init
cmake -S . -B build -GNinja -DABSL_PROPAGATE_CXX_STD=ON
Expand Down
7 changes: 2 additions & 5 deletions plugin/federated/engine_federated.cc
Expand Up @@ -84,17 +84,14 @@ class FederatedEngine : public IEngine {
}
}

int LoadCheckPoint(Serializable *global_model, Serializable *local_model = nullptr) override {
int LoadCheckPoint() override {
return 0;
}

void CheckPoint(const Serializable *global_model,
const Serializable *local_model = nullptr) override {
void CheckPoint() override {
version_number_ += 1;
}

void LazyCheckPoint(const Serializable *global_model) override { version_number_ += 1; }

int VersionNumber() const override { return version_number_; }

/*! \brief get rank of current node */
Expand Down
8 changes: 4 additions & 4 deletions python-package/xgboost/plotting.py
Expand Up @@ -150,8 +150,8 @@ def to_graphviz(
**kwargs: Any
) -> GraphvizSource:
"""Convert specified tree to graphviz instance. IPython can automatically plot
the returned graphiz instance. Otherwise, you should call .render() method
of the returned graphiz instance.
the returned graphviz instance. Otherwise, you should call .render() method
of the returned graphviz instance.
Parameters
----------
Expand All @@ -162,7 +162,7 @@ def to_graphviz(
num_trees : int, default 0
Specify the ordinal number of target tree
rankdir : str, default "UT"
Passed to graphiz via graph_attr
Passed to graphviz via graph_attr
yes_color : str, default '#0000FF'
Edge color when meets the node condition.
no_color : str, default '#FF0000'
Expand Down Expand Up @@ -257,7 +257,7 @@ def plot_tree(
num_trees : int, default 0
Specify the ordinal number of target tree
rankdir : str, default "TB"
Passed to graphiz via graph_attr
Passed to graphviz via graph_attr
ax : matplotlib Axes, default None
Target axes instance. If None, new figure and axes will be created.
kwargs :
Expand Down

0 comments on commit 0e9ebd7

Please sign in to comment.