From 568337f67986f5a8dedf23510d45818cd0b5cdfd Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Wed, 13 Jul 2022 17:41:17 +0800 Subject: [PATCH 1/4] [PySpark] add gpu support for spark local mode --- python-package/xgboost/spark/core.py | 57 +++++++++++++++++++-------- python-package/xgboost/spark/utils.py | 21 ++++++++++ 2 files changed, 62 insertions(+), 16 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 68a15a534f33..5bd43894ab53 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -53,6 +53,8 @@ _get_rabit_args, _get_args_from_message_list, _get_spark_session, + _is_local, + _get_gpu_id, ) from .params import ( HasArbitraryParamsDict, @@ -281,19 +283,40 @@ def _validate_params(self): .get("spark.task.resource.gpu.amount") ) - if not gpu_per_task or int(gpu_per_task) < 1: - raise RuntimeError( - "The spark cluster does not have the necessary GPU" - + "configuration for the spark task. Therefore, we cannot" - + "run xgboost training using GPU." - ) + is_local = _is_local( + _get_spark_session() + .sparkContext + ) - if int(gpu_per_task) > 1: - get_logger(self.__class__.__name__).warning( - "You configured %s GPU cores for each spark task, but in " - "XGBoost training, every Spark task will only use one GPU core.", - gpu_per_task - ) + if is_local: + # checking spark local mode. + if gpu_per_task: + raise RuntimeError( + "The spark cluster does not support gpu configuration for local mode. " + + "Please delete spark.executor.resource.gpu.amount and " + + "spark.task.resource.gpu.amount" + ) + + if self.getOrDefault(self.num_workers) > 1: + raise ValueError( + "Training XGBoost on the spark local mode only supports num_workers = 1, " + + "and only primary GPU device will be used." + ) + else: + # checking spark non-local mode. + if not gpu_per_task or int(gpu_per_task) < 1: + raise RuntimeError( + "The spark cluster does not have the necessary GPU" + + "configuration for the spark task. Therefore, we cannot" + + "run xgboost training using GPU." + ) + + if int(gpu_per_task) > 1: + get_logger(self.__class__.__name__).warning( + "You configured %s GPU cores for each spark task, but in " + "XGBoost training, every Spark task will only use one GPU core.", + gpu_per_task + ) def _validate_and_convert_feature_col_as_array_col(dataset, features_col_name): @@ -547,6 +570,11 @@ def _fit(self, dataset): booster_params["nthread"] = cpu_per_task use_gpu = self.getOrDefault(self.use_gpu) + is_local = _is_local( + _get_spark_session() + .sparkContext + ) + def _train_booster(pandas_df_iter): """ Takes in an RDD partition and outputs a booster for that partition after going through @@ -558,10 +586,7 @@ def _train_booster(pandas_df_iter): context.barrier() if use_gpu: - # Set booster worker to use the first GPU allocated to the spark task. - booster_params["gpu_id"] = int( - context._resources["gpu"].addresses[0].strip() - ) + booster_params["gpu_id"] = 0 if is_local else _get_gpu_id(context) _rabit_args = "" if context.partitionId() == 0: diff --git a/python-package/xgboost/spark/utils.py b/python-package/xgboost/spark/utils.py index b358e9be5db9..226badaf3dda 100644 --- a/python-package/xgboost/spark/utils.py +++ b/python-package/xgboost/spark/utils.py @@ -128,3 +128,24 @@ def _get_max_num_concurrent_tasks(spark_context): spark_context._jsc.sc().resourceProfileManager().resourceProfileFromId(0) ) return spark_context._jsc.sc().maxNumConcurrentTasks() + + +def _is_local(spark_context) -> bool: + """Whether it is Spark local mode""" + return spark_context._jsc.sc().isLocal() + + +def _get_gpu_id(task_context) -> int: + """Get the gpu id from the task resources""" + if task_context is None: + # This is a safety check. + raise RuntimeError( + "_get_gpu_id should not be invoked from driver side." + ) + resources = task_context.resources() + if 'gpu' not in resources: + raise RuntimeError( + "Couldn't get the gpu id, Please check the GPU resource configuration" + ) + # return the first gpu id. + return int(resources['gpu'].addresses[0].strip()) From ca2009ac87f17ff939848b959dae13dbb1457191 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Thu, 14 Jul 2022 09:39:09 +0800 Subject: [PATCH 2/4] Fix pylint issue --- python-package/xgboost/spark/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python-package/xgboost/spark/utils.py b/python-package/xgboost/spark/utils.py index 226badaf3dda..2d2a33fdf849 100644 --- a/python-package/xgboost/spark/utils.py +++ b/python-package/xgboost/spark/utils.py @@ -132,6 +132,7 @@ def _get_max_num_concurrent_tasks(spark_context): def _is_local(spark_context) -> bool: """Whether it is Spark local mode""" + # pylint: disable=protected-access return spark_context._jsc.sc().isLocal() From 9c8a3b43c12f63e1826a6016f7df85d713b618a2 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Fri, 15 Jul 2022 12:25:22 +0800 Subject: [PATCH 3/4] support multi gpus for spark local mode --- python-package/xgboost/spark/core.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 5bd43894ab53..9b385251877e 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -297,11 +297,13 @@ def _validate_params(self): + "spark.task.resource.gpu.amount" ) - if self.getOrDefault(self.num_workers) > 1: - raise ValueError( - "Training XGBoost on the spark local mode only supports num_workers = 1, " - + "and only primary GPU device will be used." - ) + # Supporting GPU training in Spark local mode is just for debugging purpose, + # so it's just okay for printing the below warning instead of checking the real + # gpu numbers and raising exceptions. + get_logger(self.__class__.__name__).warning( + "You enabled use_gpu in spark local mode. Please make sure your local node " + + "has %d GPUs" % self.getOrDefault(self.num_workers) + ) else: # checking spark non-local mode. if not gpu_per_task or int(gpu_per_task) < 1: @@ -586,7 +588,7 @@ def _train_booster(pandas_df_iter): context.barrier() if use_gpu: - booster_params["gpu_id"] = 0 if is_local else _get_gpu_id(context) + booster_params["gpu_id"] = context.partitionId() if is_local else _get_gpu_id(context) _rabit_args = "" if context.partitionId() == 0: From 9abd4f185c31e1bb3b2d32a7fa691d1b51204e44 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Fri, 15 Jul 2022 16:58:40 +0800 Subject: [PATCH 4/4] resolve pylint issue --- python-package/xgboost/spark/core.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index 9b385251877e..a2ab4df549aa 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -293,16 +293,16 @@ def _validate_params(self): if gpu_per_task: raise RuntimeError( "The spark cluster does not support gpu configuration for local mode. " - + "Please delete spark.executor.resource.gpu.amount and " - + "spark.task.resource.gpu.amount" + "Please delete spark.executor.resource.gpu.amount and " + "spark.task.resource.gpu.amount" ) - # Supporting GPU training in Spark local mode is just for debugging purpose, - # so it's just okay for printing the below warning instead of checking the real - # gpu numbers and raising exceptions. + # Support GPU training in Spark local mode is just for debugging purposes, + # so it's okay for printing the below warning instead of checking the real + # gpu numbers and raising the exception. get_logger(self.__class__.__name__).warning( "You enabled use_gpu in spark local mode. Please make sure your local node " - + "has %d GPUs" % self.getOrDefault(self.num_workers) + "has at least %d GPUs", self.getOrDefault(self.num_workers) ) else: # checking spark non-local mode. @@ -588,7 +588,8 @@ def _train_booster(pandas_df_iter): context.barrier() if use_gpu: - booster_params["gpu_id"] = context.partitionId() if is_local else _get_gpu_id(context) + booster_params["gpu_id"] = context.partitionId() if is_local \ + else _get_gpu_id(context) _rabit_args = "" if context.partitionId() == 0: