diff --git a/python-package/xgboost/spark/core.py b/python-package/xgboost/spark/core.py index f8697733c935..c38fcbffd48a 100644 --- a/python-package/xgboost/spark/core.py +++ b/python-package/xgboost/spark/core.py @@ -53,6 +53,8 @@ _get_rabit_args, _get_args_from_message_list, _get_spark_session, + _is_local, + _get_gpu_id, ) from .params import ( HasArbitraryParamsDict, @@ -281,19 +283,42 @@ def _validate_params(self): .get("spark.task.resource.gpu.amount") ) - if not gpu_per_task or int(gpu_per_task) < 1: - raise RuntimeError( - "The spark cluster does not have the necessary GPU" - + "configuration for the spark task. Therefore, we cannot" - + "run xgboost training using GPU." - ) + is_local = _is_local( + _get_spark_session() + .sparkContext + ) - if int(gpu_per_task) > 1: + if is_local: + # checking spark local mode. + if gpu_per_task: + raise RuntimeError( + "The spark cluster does not support gpu configuration for local mode. " + "Please delete spark.executor.resource.gpu.amount and " + "spark.task.resource.gpu.amount" + ) + + # Support GPU training in Spark local mode is just for debugging purposes, + # so it's okay for printing the below warning instead of checking the real + # gpu numbers and raising the exception. get_logger(self.__class__.__name__).warning( - "You configured %s GPU cores for each spark task, but in " - "XGBoost training, every Spark task will only use one GPU core.", - gpu_per_task + "You enabled use_gpu in spark local mode. Please make sure your local node " + "has at least %d GPUs", self.getOrDefault(self.num_workers) ) + else: + # checking spark non-local mode. + if not gpu_per_task or int(gpu_per_task) < 1: + raise RuntimeError( + "The spark cluster does not have the necessary GPU" + + "configuration for the spark task. Therefore, we cannot" + + "run xgboost training using GPU." + ) + + if int(gpu_per_task) > 1: + get_logger(self.__class__.__name__).warning( + "You configured %s GPU cores for each spark task, but in " + "XGBoost training, every Spark task will only use one GPU core.", + gpu_per_task + ) def _validate_and_convert_feature_col_as_array_col(dataset, features_col_name): @@ -547,6 +572,11 @@ def _fit(self, dataset): booster_params["nthread"] = cpu_per_task use_gpu = self.getOrDefault(self.use_gpu) + is_local = _is_local( + _get_spark_session() + .sparkContext + ) + def _train_booster(pandas_df_iter): """ Takes in an RDD partition and outputs a booster for that partition after going through @@ -558,10 +588,8 @@ def _train_booster(pandas_df_iter): context.barrier() if use_gpu: - # Set booster worker to use the first GPU allocated to the spark task. - booster_params["gpu_id"] = int( - context._resources["gpu"].addresses[0].strip() - ) + booster_params["gpu_id"] = context.partitionId() if is_local \ + else _get_gpu_id(context) _rabit_args = "" if context.partitionId() == 0: diff --git a/python-package/xgboost/spark/utils.py b/python-package/xgboost/spark/utils.py index b358e9be5db9..2d2a33fdf849 100644 --- a/python-package/xgboost/spark/utils.py +++ b/python-package/xgboost/spark/utils.py @@ -128,3 +128,25 @@ def _get_max_num_concurrent_tasks(spark_context): spark_context._jsc.sc().resourceProfileManager().resourceProfileFromId(0) ) return spark_context._jsc.sc().maxNumConcurrentTasks() + + +def _is_local(spark_context) -> bool: + """Whether it is Spark local mode""" + # pylint: disable=protected-access + return spark_context._jsc.sc().isLocal() + + +def _get_gpu_id(task_context) -> int: + """Get the gpu id from the task resources""" + if task_context is None: + # This is a safety check. + raise RuntimeError( + "_get_gpu_id should not be invoked from driver side." + ) + resources = task_context.resources() + if 'gpu' not in resources: + raise RuntimeError( + "Couldn't get the gpu id, Please check the GPU resource configuration" + ) + # return the first gpu id. + return int(resources['gpu'].addresses[0].strip())