diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/rapids/spark/GpuPreXGBoost.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/rapids/spark/GpuPreXGBoost.scala index 08d186d6f84e..12f18be88ab3 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/rapids/spark/GpuPreXGBoost.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/rapids/spark/GpuPreXGBoost.scala @@ -27,7 +27,6 @@ import ml.dmlc.xgboost4j.scala.spark.params.XGBoostEstimatorCommon import ml.dmlc.xgboost4j.scala.spark.{PreXGBoost, PreXGBoostProvider, Watches, XGBoost, XGBoostClassificationModel, XGBoostClassifier, XGBoostExecutionParams, XGBoostRegressionModel, XGBoostRegressor} import org.apache.commons.logging.LogFactory -import org.apache.spark.broadcast.Broadcast import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.rdd.RDD @@ -90,6 +89,11 @@ class GpuPreXGBoost extends PreXGBoostProvider { } } +class BoosterFlag extends Serializable { + // indicate if the GPU parameters are set. + var isGpuParamsSet = false +} + object GpuPreXGBoost extends PreXGBoostProvider { private val logger = LogFactory.getLog("XGBoostSpark") @@ -187,9 +191,9 @@ object GpuPreXGBoost extends PreXGBoostProvider { // predict and turn to Row val predictFunc = - (broadcastBooster: Broadcast[Booster], dm: DMatrix, originalRowItr: Iterator[Row]) => { + (booster: Booster, dm: DMatrix, originalRowItr: Iterator[Row]) => { val Array(rawPredictionItr, probabilityItr, predLeafItr, predContribItr) = - m.producePredictionItrs(broadcastBooster, dm) + m.producePredictionItrs(booster, dm) m.produceResultIterator(originalRowItr, rawPredictionItr, probabilityItr, predLeafItr, predContribItr) } @@ -218,9 +222,9 @@ object GpuPreXGBoost extends PreXGBoostProvider { // predict and turn to Row val predictFunc = - (broadcastBooster: Broadcast[Booster], dm: DMatrix, originalRowItr: Iterator[Row]) => { + (booster: Booster, dm: DMatrix, originalRowItr: Iterator[Row]) => { val Array(rawPredictionItr, predLeafItr, predContribItr) = - m.producePredictionItrs(broadcastBooster, dm) + m.producePredictionItrs(booster, dm) m.produceResultIterator(originalRowItr, rawPredictionItr, predLeafItr, predContribItr) } @@ -248,6 +252,7 @@ object GpuPreXGBoost extends PreXGBoostProvider { val bOrigSchema = sc.broadcast(dataset.schema) val bRowSchema = sc.broadcast(schema) val bBooster = sc.broadcast(booster) + val bBoosterFlag = sc.broadcast(new BoosterFlag) // Small vars so don't need to broadcast them val isLocal = sc.isLocal @@ -259,6 +264,31 @@ object GpuPreXGBoost extends PreXGBoostProvider { // UnsafeProjection is not serializable so do it on the executor side val toUnsafe = UnsafeProjection.create(bOrigSchema.value) + // booster is visible for all spark tasks in the same executor + val booster = bBooster.value + val boosterFlag = bBoosterFlag.value + + synchronized { + // there are two kind of race conditions, + // 1. multi-taskes set parameters at a time + // 2. one task sets parameter and another task reads the parameter + // both of them can cause potential un-expected behavior, moreover, + // it may cause executor crash + // So add synchronized to allow only one task to set parameter if it is not set. + // and rely on BlockManager to ensure the same booster only be called once to + // set parameter. + if (!boosterFlag.isGpuParamsSet) { + // set some params of gpu related to booster + // - gpu id + // - predictor: Force to gpu predictor since native doesn't save predictor. + val gpuId = if (!isLocal) XGBoost.getGPUAddrFromResources else 0 + booster.setParam("gpu_id", gpuId.toString) + booster.setParam("predictor", "gpu_predictor") + logger.info("GPU transform on device: " + gpuId) + boosterFlag.isGpuParamsSet = true; + } + } + // Iterator on Row new Iterator[Row] { // Convert InternalRow to Row @@ -271,14 +301,6 @@ object GpuPreXGBoost extends PreXGBoostProvider { // Iterator on Row var iter: Iterator[Row] = null - // set some params of gpu related to booster - // - gpu id - // - predictor: Force to gpu predictor since native doesn't save predictor. - val gpuId = if (!isLocal) XGBoost.getGPUAddrFromResources else 0 - bBooster.value.setParam("gpu_id", gpuId.toString) - bBooster.value.setParam("predictor", "gpu_predictor") - logger.info("GPU transform on device: " + gpuId) - TaskContext.get().addTaskCompletionListener[Unit](_ => { closeCurrentBatch() // close the last ColumnarBatch }) @@ -314,7 +336,7 @@ object GpuPreXGBoost extends PreXGBoostProvider { val rowIterator = currentBatch.rowIterator().asScala .map(toUnsafe) .map(converter(_)) - predictFunc(bBooster, dm, rowIterator) + predictFunc(booster, dm, rowIterator) } finally { dm.delete() diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/PreXGBoost.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/PreXGBoost.scala index e0a365f6c844..818842608ab8 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/PreXGBoost.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/PreXGBoost.scala @@ -201,9 +201,9 @@ object PreXGBoost extends PreXGBoostProvider { val (xgbInput, featuresName) = m.vectorize(dataset) // predict and turn to Row val predictFunc = - (broadcastBooster: Broadcast[Booster], dm: DMatrix, originalRowItr: Iterator[Row]) => { + (booster: Booster, dm: DMatrix, originalRowItr: Iterator[Row]) => { val Array(rawPredictionItr, probabilityItr, predLeafItr, predContribItr) = - m.producePredictionItrs(broadcastBooster, dm) + m.producePredictionItrs(booster, dm) m.produceResultIterator(originalRowItr, rawPredictionItr, probabilityItr, predLeafItr, predContribItr) } @@ -231,9 +231,9 @@ object PreXGBoost extends PreXGBoostProvider { // predict and turn to Row val (xgbInput, featuresName) = m.vectorize(dataset) val predictFunc = - (broadcastBooster: Broadcast[Booster], dm: DMatrix, originalRowItr: Iterator[Row]) => { + (booster: Booster, dm: DMatrix, originalRowItr: Iterator[Row]) => { val Array(rawPredictionItr, predLeafItr, predContribItr) = - m.producePredictionItrs(broadcastBooster, dm) + m.producePredictionItrs(booster, dm) m.produceResultIterator(originalRowItr, rawPredictionItr, predLeafItr, predContribItr) } @@ -286,7 +286,7 @@ object PreXGBoost extends PreXGBoostProvider { cacheInfo) try { - predictFunc(bBooster, dm, batchRow.iterator) + predictFunc(bBooster.value, dm, batchRow.iterator) } finally { batchCnt += 1 dm.delete() diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala index a10394837a68..32b2c2c0244b 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala @@ -20,7 +20,6 @@ import ml.dmlc.xgboost4j.scala.spark.params._ import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, EvalTrait, ObjectiveTrait, XGBoost => SXGBoost} import org.apache.hadoop.fs.Path -import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.classification._ import org.apache.spark.ml.linalg._ import org.apache.spark.ml.util._ @@ -329,26 +328,26 @@ class XGBoostClassificationModel private[ml]( } } - private[scala] def producePredictionItrs(broadcastBooster: Broadcast[Booster], dm: DMatrix): + private[scala] def producePredictionItrs(booster: Booster, dm: DMatrix): Array[Iterator[Row]] = { val rawPredictionItr = { - broadcastBooster.value.predict(dm, outPutMargin = true, $(treeLimit)). + booster.predict(dm, outPutMargin = true, $(treeLimit)). map(Row(_)).iterator } val probabilityItr = { - broadcastBooster.value.predict(dm, outPutMargin = false, $(treeLimit)). + booster.predict(dm, outPutMargin = false, $(treeLimit)). map(Row(_)).iterator } val predLeafItr = { if (isDefined(leafPredictionCol)) { - broadcastBooster.value.predictLeaf(dm, $(treeLimit)).map(Row(_)).iterator + booster.predictLeaf(dm, $(treeLimit)).map(Row(_)).iterator } else { Iterator() } } val predContribItr = { if (isDefined(contribPredictionCol)) { - broadcastBooster.value.predictContrib(dm, $(treeLimit)).map(Row(_)).iterator + booster.predictContrib(dm, $(treeLimit)).map(Row(_)).iterator } else { Iterator() } diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala index 82f8346b062a..01d001a5667c 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala @@ -30,7 +30,6 @@ import org.apache.spark.ml.param._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ -import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.util.{DefaultXGBoostParamsReader, DefaultXGBoostParamsWriter, XGBoostWriter} import org.apache.spark.sql.types.StructType @@ -298,14 +297,14 @@ class XGBoostRegressionModel private[ml] ( } } - private[scala] def producePredictionItrs(booster: Broadcast[Booster], dm: DMatrix): + private[scala] def producePredictionItrs(booster: Booster, dm: DMatrix): Array[Iterator[Row]] = { val originalPredictionItr = { - booster.value.predict(dm, outPutMargin = false, $(treeLimit)).map(Row(_)).iterator + booster.predict(dm, outPutMargin = false, $(treeLimit)).map(Row(_)).iterator } val predLeafItr = { if (isDefined(leafPredictionCol)) { - booster.value.predictLeaf(dm, $(treeLimit)). + booster.predictLeaf(dm, $(treeLimit)). map(Row(_)).iterator } else { Iterator() @@ -313,7 +312,7 @@ class XGBoostRegressionModel private[ml] ( } val predContribItr = { if (isDefined(contribPredictionCol)) { - booster.value.predictContrib(dm, $(treeLimit)). + booster.predictContrib(dm, $(treeLimit)). map(Row(_)).iterator } else { Iterator() diff --git a/plugin/federated/CMakeLists.txt b/plugin/federated/CMakeLists.txt index b84fbb7592a1..39eac924dd89 100644 --- a/plugin/federated/CMakeLists.txt +++ b/plugin/federated/CMakeLists.txt @@ -1,6 +1,6 @@ # gRPC needs to be installed first. See README.md. -find_package(Protobuf REQUIRED) -find_package(gRPC REQUIRED) +find_package(Protobuf CONFIG REQUIRED) +find_package(gRPC CONFIG REQUIRED) find_package(Threads) # Generated code from the protobuf definition. @@ -18,7 +18,8 @@ protobuf_generate( PLUGIN "protoc-gen-grpc=${grpc_cpp_plugin_location}") # Wrapper for the gRPC client. -add_library(federated_client INTERFACE federated_client.h) +add_library(federated_client INTERFACE) +target_sources(federated_client INTERFACE federated_client.h) target_link_libraries(federated_client INTERFACE federated_proto) # Rabit engine for Federated Learning. diff --git a/plugin/federated/README.md b/plugin/federated/README.md index b9574b977e60..5858d7cebf50 100644 --- a/plugin/federated/README.md +++ b/plugin/federated/README.md @@ -7,7 +7,7 @@ Install gRPC ------------ ```shell sudo apt-get install build-essential autoconf libtool pkg-config cmake ninja-build -git clone -b v1.45.2 https://github.com/grpc/grpc +git clone -b v1.47.0 https://github.com/grpc/grpc cd grpc git submodule update --init cmake -S . -B build -GNinja -DABSL_PROPAGATE_CXX_STD=ON diff --git a/plugin/federated/engine_federated.cc b/plugin/federated/engine_federated.cc index 8fd396d94744..9b43c3997cc3 100644 --- a/plugin/federated/engine_federated.cc +++ b/plugin/federated/engine_federated.cc @@ -84,17 +84,14 @@ class FederatedEngine : public IEngine { } } - int LoadCheckPoint(Serializable *global_model, Serializable *local_model = nullptr) override { + int LoadCheckPoint() override { return 0; } - void CheckPoint(const Serializable *global_model, - const Serializable *local_model = nullptr) override { + void CheckPoint() override { version_number_ += 1; } - void LazyCheckPoint(const Serializable *global_model) override { version_number_ += 1; } - int VersionNumber() const override { return version_number_; } /*! \brief get rank of current node */ diff --git a/python-package/xgboost/plotting.py b/python-package/xgboost/plotting.py index 85a8428bc181..559578a89140 100644 --- a/python-package/xgboost/plotting.py +++ b/python-package/xgboost/plotting.py @@ -150,8 +150,8 @@ def to_graphviz( **kwargs: Any ) -> GraphvizSource: """Convert specified tree to graphviz instance. IPython can automatically plot - the returned graphiz instance. Otherwise, you should call .render() method - of the returned graphiz instance. + the returned graphviz instance. Otherwise, you should call .render() method + of the returned graphviz instance. Parameters ---------- @@ -162,7 +162,7 @@ def to_graphviz( num_trees : int, default 0 Specify the ordinal number of target tree rankdir : str, default "UT" - Passed to graphiz via graph_attr + Passed to graphviz via graph_attr yes_color : str, default '#0000FF' Edge color when meets the node condition. no_color : str, default '#FF0000' @@ -257,7 +257,7 @@ def plot_tree( num_trees : int, default 0 Specify the ordinal number of target tree rankdir : str, default "TB" - Passed to graphiz via graph_attr + Passed to graphviz via graph_attr ax : matplotlib Axes, default None Target axes instance. If None, new figure and axes will be created. kwargs :