Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PySpark] add gpu support for spark local mode #8068

Merged
merged 5 commits into from Jul 16, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 41 additions & 16 deletions python-package/xgboost/spark/core.py
Expand Up @@ -53,6 +53,8 @@
_get_rabit_args,
_get_args_from_message_list,
_get_spark_session,
_is_local,
_get_gpu_id,
)
from .params import (
HasArbitraryParamsDict,
Expand Down Expand Up @@ -281,19 +283,40 @@ def _validate_params(self):
.get("spark.task.resource.gpu.amount")
)

if not gpu_per_task or int(gpu_per_task) < 1:
raise RuntimeError(
"The spark cluster does not have the necessary GPU"
+ "configuration for the spark task. Therefore, we cannot"
+ "run xgboost training using GPU."
)
is_local = _is_local(
_get_spark_session()
.sparkContext
)

if int(gpu_per_task) > 1:
get_logger(self.__class__.__name__).warning(
"You configured %s GPU cores for each spark task, but in "
"XGBoost training, every Spark task will only use one GPU core.",
gpu_per_task
)
if is_local:
# checking spark local mode.
if gpu_per_task:
raise RuntimeError(
"The spark cluster does not support gpu configuration for local mode. "
+ "Please delete spark.executor.resource.gpu.amount and "
+ "spark.task.resource.gpu.amount"
)

if self.getOrDefault(self.num_workers) > 1:
raise ValueError(
"Training XGBoost on the spark local mode only supports num_workers = 1, "
+ "and only primary GPU device will be used."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can still support multiple GPU workers in spark local mode.
We can get partition id from spark TaskContext, and use the partition id as the gpu_id for the corresponding spark task. WDYT ?

Copy link
Contributor Author

@wbo4958 wbo4958 Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx. Hmm, there will be only 1 process for spark local mode, If we support the scenario of num_workers > 1, then all the GPU training tasks will run on the same process, which seems not to be supported due to the NCCL issue. eg,

Task1 taking GPU 0 runs on Process1
Task2 taking GPU 1 runs on Process1

@trivialfis is this supported by xgboost?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should work but let's not invite trouble.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. The GPU supporting for the local mode is mostly used in local debugging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wbo4958
A correction:

local mode pyspark all the GPU training tasks will run on the same process

This is not true.
For pyspark, in barrier mode pyspark task, each python UDF task will be run in an individual python process (in pyspark code, the process is called python worker).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, my bad. I previously thought it was the JVM side. Ok, let me add it support.

else:
# checking spark non-local mode.
if not gpu_per_task or int(gpu_per_task) < 1:
raise RuntimeError(
"The spark cluster does not have the necessary GPU"
+ "configuration for the spark task. Therefore, we cannot"
+ "run xgboost training using GPU."
)

if int(gpu_per_task) > 1:
get_logger(self.__class__.__name__).warning(
"You configured %s GPU cores for each spark task, but in "
"XGBoost training, every Spark task will only use one GPU core.",
gpu_per_task
)


def _validate_and_convert_feature_col_as_array_col(dataset, features_col_name):
Expand Down Expand Up @@ -547,6 +570,11 @@ def _fit(self, dataset):
booster_params["nthread"] = cpu_per_task
use_gpu = self.getOrDefault(self.use_gpu)

is_local = _is_local(
_get_spark_session()
.sparkContext
)

def _train_booster(pandas_df_iter):
"""
Takes in an RDD partition and outputs a booster for that partition after going through
Expand All @@ -558,10 +586,7 @@ def _train_booster(pandas_df_iter):
context.barrier()

if use_gpu:
# Set booster worker to use the first GPU allocated to the spark task.
booster_params["gpu_id"] = int(
context._resources["gpu"].addresses[0].strip()
)
booster_params["gpu_id"] = 0 if is_local else _get_gpu_id(context)

_rabit_args = ""
if context.partitionId() == 0:
Expand Down
22 changes: 22 additions & 0 deletions python-package/xgboost/spark/utils.py
Expand Up @@ -128,3 +128,25 @@ def _get_max_num_concurrent_tasks(spark_context):
spark_context._jsc.sc().resourceProfileManager().resourceProfileFromId(0)
)
return spark_context._jsc.sc().maxNumConcurrentTasks()


def _is_local(spark_context) -> bool:
"""Whether it is Spark local mode"""
# pylint: disable=protected-access
return spark_context._jsc.sc().isLocal()


def _get_gpu_id(task_context) -> int:
"""Get the gpu id from the task resources"""
if task_context is None:
# This is a safety check.
raise RuntimeError(
"_get_gpu_id should not be invoked from driver side."
)
resources = task_context.resources()
if 'gpu' not in resources:
raise RuntimeError(
"Couldn't get the gpu id, Please check the GPU resource configuration"
)
# return the first gpu id.
return int(resources['gpu'].addresses[0].strip())