Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PySpark] add gpu support for spark local mode #8068

Merged
merged 5 commits into from Jul 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 42 additions & 14 deletions python-package/xgboost/spark/core.py
Expand Up @@ -53,6 +53,8 @@
_get_rabit_args,
_get_args_from_message_list,
_get_spark_session,
_is_local,
_get_gpu_id,
)
from .params import (
HasArbitraryParamsDict,
Expand Down Expand Up @@ -281,19 +283,42 @@ def _validate_params(self):
.get("spark.task.resource.gpu.amount")
)

if not gpu_per_task or int(gpu_per_task) < 1:
raise RuntimeError(
"The spark cluster does not have the necessary GPU"
+ "configuration for the spark task. Therefore, we cannot"
+ "run xgboost training using GPU."
)
is_local = _is_local(
_get_spark_session()
.sparkContext
)

if int(gpu_per_task) > 1:
if is_local:
# checking spark local mode.
if gpu_per_task:
raise RuntimeError(
"The spark cluster does not support gpu configuration for local mode. "
"Please delete spark.executor.resource.gpu.amount and "
"spark.task.resource.gpu.amount"
)

# Support GPU training in Spark local mode is just for debugging purposes,
# so it's okay for printing the below warning instead of checking the real
# gpu numbers and raising the exception.
get_logger(self.__class__.__name__).warning(
"You configured %s GPU cores for each spark task, but in "
"XGBoost training, every Spark task will only use one GPU core.",
gpu_per_task
"You enabled use_gpu in spark local mode. Please make sure your local node "
"has at least %d GPUs", self.getOrDefault(self.num_workers)
)
else:
# checking spark non-local mode.
if not gpu_per_task or int(gpu_per_task) < 1:
raise RuntimeError(
"The spark cluster does not have the necessary GPU"
+ "configuration for the spark task. Therefore, we cannot"
+ "run xgboost training using GPU."
)

if int(gpu_per_task) > 1:
get_logger(self.__class__.__name__).warning(
"You configured %s GPU cores for each spark task, but in "
"XGBoost training, every Spark task will only use one GPU core.",
gpu_per_task
)


def _validate_and_convert_feature_col_as_array_col(dataset, features_col_name):
Expand Down Expand Up @@ -547,6 +572,11 @@ def _fit(self, dataset):
booster_params["nthread"] = cpu_per_task
use_gpu = self.getOrDefault(self.use_gpu)

is_local = _is_local(
_get_spark_session()
.sparkContext
)

def _train_booster(pandas_df_iter):
"""
Takes in an RDD partition and outputs a booster for that partition after going through
Expand All @@ -558,10 +588,8 @@ def _train_booster(pandas_df_iter):
context.barrier()

if use_gpu:
# Set booster worker to use the first GPU allocated to the spark task.
booster_params["gpu_id"] = int(
context._resources["gpu"].addresses[0].strip()
)
booster_params["gpu_id"] = context.partitionId() if is_local \
else _get_gpu_id(context)

_rabit_args = ""
if context.partitionId() == 0:
Expand Down
22 changes: 22 additions & 0 deletions python-package/xgboost/spark/utils.py
Expand Up @@ -128,3 +128,25 @@ def _get_max_num_concurrent_tasks(spark_context):
spark_context._jsc.sc().resourceProfileManager().resourceProfileFromId(0)
)
return spark_context._jsc.sc().maxNumConcurrentTasks()


def _is_local(spark_context) -> bool:
"""Whether it is Spark local mode"""
# pylint: disable=protected-access
return spark_context._jsc.sc().isLocal()


def _get_gpu_id(task_context) -> int:
"""Get the gpu id from the task resources"""
if task_context is None:
# This is a safety check.
raise RuntimeError(
"_get_gpu_id should not be invoked from driver side."
)
resources = task_context.resources()
if 'gpu' not in resources:
raise RuntimeError(
"Couldn't get the gpu id, Please check the GPU resource configuration"
)
# return the first gpu id.
return int(resources['gpu'].addresses[0].strip())