Skip to content

Commit

Permalink
[PySpark] add gpu support for spark local mode (#8068)
Browse files Browse the repository at this point in the history
  • Loading branch information
wbo4958 committed Jul 16, 2022
1 parent 91bb9e2 commit a33f35e
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 14 deletions.
56 changes: 42 additions & 14 deletions python-package/xgboost/spark/core.py
Expand Up @@ -53,6 +53,8 @@
_get_rabit_args,
_get_args_from_message_list,
_get_spark_session,
_is_local,
_get_gpu_id,
)
from .params import (
HasArbitraryParamsDict,
Expand Down Expand Up @@ -281,19 +283,42 @@ def _validate_params(self):
.get("spark.task.resource.gpu.amount")
)

if not gpu_per_task or int(gpu_per_task) < 1:
raise RuntimeError(
"The spark cluster does not have the necessary GPU"
+ "configuration for the spark task. Therefore, we cannot"
+ "run xgboost training using GPU."
)
is_local = _is_local(
_get_spark_session()
.sparkContext
)

if int(gpu_per_task) > 1:
if is_local:
# checking spark local mode.
if gpu_per_task:
raise RuntimeError(
"The spark cluster does not support gpu configuration for local mode. "
"Please delete spark.executor.resource.gpu.amount and "
"spark.task.resource.gpu.amount"
)

# Support GPU training in Spark local mode is just for debugging purposes,
# so it's okay for printing the below warning instead of checking the real
# gpu numbers and raising the exception.
get_logger(self.__class__.__name__).warning(
"You configured %s GPU cores for each spark task, but in "
"XGBoost training, every Spark task will only use one GPU core.",
gpu_per_task
"You enabled use_gpu in spark local mode. Please make sure your local node "
"has at least %d GPUs", self.getOrDefault(self.num_workers)
)
else:
# checking spark non-local mode.
if not gpu_per_task or int(gpu_per_task) < 1:
raise RuntimeError(
"The spark cluster does not have the necessary GPU"
+ "configuration for the spark task. Therefore, we cannot"
+ "run xgboost training using GPU."
)

if int(gpu_per_task) > 1:
get_logger(self.__class__.__name__).warning(
"You configured %s GPU cores for each spark task, but in "
"XGBoost training, every Spark task will only use one GPU core.",
gpu_per_task
)


def _validate_and_convert_feature_col_as_array_col(dataset, features_col_name):
Expand Down Expand Up @@ -547,6 +572,11 @@ def _fit(self, dataset):
booster_params["nthread"] = cpu_per_task
use_gpu = self.getOrDefault(self.use_gpu)

is_local = _is_local(
_get_spark_session()
.sparkContext
)

def _train_booster(pandas_df_iter):
"""
Takes in an RDD partition and outputs a booster for that partition after going through
Expand All @@ -558,10 +588,8 @@ def _train_booster(pandas_df_iter):
context.barrier()

if use_gpu:
# Set booster worker to use the first GPU allocated to the spark task.
booster_params["gpu_id"] = int(
context._resources["gpu"].addresses[0].strip()
)
booster_params["gpu_id"] = context.partitionId() if is_local \
else _get_gpu_id(context)

_rabit_args = ""
if context.partitionId() == 0:
Expand Down
22 changes: 22 additions & 0 deletions python-package/xgboost/spark/utils.py
Expand Up @@ -128,3 +128,25 @@ def _get_max_num_concurrent_tasks(spark_context):
spark_context._jsc.sc().resourceProfileManager().resourceProfileFromId(0)
)
return spark_context._jsc.sc().maxNumConcurrentTasks()


def _is_local(spark_context) -> bool:
"""Whether it is Spark local mode"""
# pylint: disable=protected-access
return spark_context._jsc.sc().isLocal()


def _get_gpu_id(task_context) -> int:
"""Get the gpu id from the task resources"""
if task_context is None:
# This is a safety check.
raise RuntimeError(
"_get_gpu_id should not be invoked from driver side."
)
resources = task_context.resources()
if 'gpu' not in resources:
raise RuntimeError(
"Couldn't get the gpu id, Please check the GPU resource configuration"
)
# return the first gpu id.
return int(resources['gpu'].addresses[0].strip())

0 comments on commit a33f35e

Please sign in to comment.