Skip to content

Commit

Permalink
Handle OMP_THREAD_LIMIT. (dmlc#7390) (dmlc#7391)
Browse files Browse the repository at this point in the history
  • Loading branch information
trivialfis committed Nov 3, 2021
1 parent fab3c05 commit a3d195e
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 5 deletions.
4 changes: 2 additions & 2 deletions R-package/R/xgboost.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ xgboost <- function(data = NULL, label = NULL, missing = NA, weight = NULL,
early_stopping_rounds = NULL, maximize = NULL,
save_period = NULL, save_name = "xgboost.model",
xgb_model = NULL, callbacks = list(), ...) {

dtrain <- xgb.get.DMatrix(data, label, missing, weight, nthread = params$nthread)
merged <- check.booster.params(params, ...)
dtrain <- xgb.get.DMatrix(data, label, missing, weight, nthread = merged$nthread)

watchlist <- list(train = dtrain)

Expand Down
32 changes: 30 additions & 2 deletions src/common/threading_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,28 @@
#define XGBOOST_COMMON_THREADING_UTILS_H_

#include <dmlc/common.h>
#include <vector>
#include <dmlc/omp.h>

#include <algorithm>
#include <limits>
#include <type_traits> // std::is_signed
#include <vector>

#include "xgboost/logging.h"

#if !defined(_OPENMP)
extern "C" {
inline int32_t omp_get_thread_limit() __GOMP_NOTHROW { return 1; } // NOLINT
}
#endif // !defined(_OPENMP)

// MSVC doesn't implement the thread limit.
#if defined(_OPENMP) && defined(_MSC_VER)
extern "C" {
inline int32_t omp_get_thread_limit() { return std::numeric_limits<int32_t>::max(); } // NOLINT
}
#endif // defined(_MSC_VER)

namespace xgboost {
namespace common {

Expand Down Expand Up @@ -153,7 +170,7 @@ struct Sched {
};

template <typename Index, typename Func>
void ParallelFor(Index size, size_t n_threads, Sched sched, Func fn) {
void ParallelFor(Index size, int32_t n_threads, Sched sched, Func fn) {
#if defined(_MSC_VER)
// msvc doesn't support unsigned integer as openmp index.
using OmpInd = std::conditional_t<std::is_signed<Index>::value, Index, omp_ulong>;
Expand Down Expand Up @@ -220,6 +237,13 @@ void ParallelFor(Index size, size_t n_threads, Func fn) {
template <typename Index, typename Func>
void ParallelFor(Index size, Func fn) {
ParallelFor(size, omp_get_max_threads(), Sched::Static(), fn);
} // !defined(_OPENMP)


inline int32_t OmpGetThreadLimit() {
int32_t limit = omp_get_thread_limit();
CHECK_GE(limit, 1) << "Invalid thread limit for OpenMP.";
return limit;
}

/* \brief Configure parallel threads.
Expand All @@ -235,15 +259,18 @@ inline int32_t OmpSetNumThreads(int32_t* p_threads) {
if (threads <= 0) {
threads = omp_get_num_procs();
}
threads = std::min(threads, OmpGetThreadLimit());
omp_set_num_threads(threads);
return nthread_original;
}

inline int32_t OmpSetNumThreadsWithoutHT(int32_t* p_threads) {
auto& threads = *p_threads;
int32_t nthread_original = omp_get_max_threads();
if (threads <= 0) {
threads = nthread_original;
}
threads = std::min(threads, OmpGetThreadLimit());
omp_set_num_threads(threads);
return nthread_original;
}
Expand All @@ -252,6 +279,7 @@ inline int32_t OmpGetNumThreads(int32_t n_threads) {
if (n_threads <= 0) {
n_threads = omp_get_num_procs();
}
n_threads = std::min(n_threads, OmpGetThreadLimit());
return n_threads;
}
} // namespace common
Expand Down
36 changes: 35 additions & 1 deletion tests/python/test_openmp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# -*- coding: utf-8 -*-
import os
import tempfile
import subprocess

import xgboost as xgb
import numpy as np
import pytest

import testing as tm


class TestOMP:
Expand Down Expand Up @@ -71,3 +77,31 @@ def consist_test(title, n):
assert auc_1 == auc_2 == auc_3
assert np.array_equal(auc_1, auc_2)
assert np.array_equal(auc_1, auc_3)

@pytest.mark.skipif(**tm.no_sklearn())
def test_with_omp_thread_limit(self):
args = [
"python", os.path.join(
tm.PROJECT_ROOT, "tests", "python", "with_omp_limit.py"
)
]
results = []
with tempfile.TemporaryDirectory() as tmpdir:
for i in (1, 2, 16):
path = os.path.join(tmpdir, str(i))
with open(path, "w") as fd:
fd.write("\n")
cp = args.copy()
cp.append(path)

env = os.environ.copy()
env["OMP_THREAD_LIMIT"] = str(i)

status = subprocess.call(cp, env=env)
assert status == 0

with open(path, "r") as fd:
results.append(float(fd.read()))

for auc in results:
np.testing.assert_allclose(auc, results[0])
26 changes: 26 additions & 0 deletions tests/python/with_omp_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
import xgboost as xgb
from sklearn.datasets import make_classification
from sklearn.metrics import roc_auc_score
import sys


def run_omp(output_path: str):
X, y = make_classification(
n_samples=200, n_features=32, n_classes=3, n_informative=8
)
Xy = xgb.DMatrix(X, y, nthread=16)
booster = xgb.train(
{"num_class": 3, "objective": "multi:softprob", "n_jobs": 16},
Xy,
num_boost_round=8,
)
score = booster.predict(Xy)
auc = roc_auc_score(y, score, average="weighted", multi_class="ovr")
with open(output_path, "w") as fd:
fd.write(str(auc))


if __name__ == "__main__":
out = sys.argv[1]
run_omp(out)

0 comments on commit a3d195e

Please sign in to comment.