Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Support cudf as column-split input #9717

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/xgboost/data.h
Expand Up @@ -389,7 +389,8 @@ class SparsePage {
/**
* \brief Reindex the column index with an offset.
*/
void Reindex(uint64_t feature_offset, int32_t n_threads);
void ReindexCPU(uint64_t feature_offset, int32_t n_threads);
void ReindexCUDA(uint64_t feature_offset);

void SortRows(int32_t n_threads);

Expand Down
15 changes: 13 additions & 2 deletions python-package/xgboost/data.py
Expand Up @@ -853,13 +853,18 @@ def _from_cudf_df(
feature_names: Optional[FeatureNames],
feature_types: Optional[FeatureTypes],
enable_categorical: bool,
data_split_mode: DataSplitMode = DataSplitMode.ROW,
) -> DispatchedDataBackendReturnType:
data, cat_codes, feature_names, feature_types = _transform_cudf_df(
data, feature_names, feature_types, enable_categorical
)
interfaces_str = _cudf_array_interfaces(data, cat_codes)
handle = ctypes.c_void_p()
config = bytes(json.dumps({"missing": missing, "nthread": nthread}), "utf-8")
config = make_jcargs(
missing=float(missing),
nthread=int(nthread),
data_split_mode=int(data_split_mode),
)
_check_call(
_LIB.XGDMatrixCreateFromCudaColumnar(
interfaces_str,
Expand Down Expand Up @@ -1096,7 +1101,13 @@ def dispatch_data_backend(
)
if _is_cudf_df(data) or _is_cudf_ser(data):
return _from_cudf_df(
data, missing, threads, feature_names, feature_types, enable_categorical
data,
missing,
threads,
feature_names,
feature_types,
enable_categorical,
data_split_mode,
)
if _is_cupy_array(data):
return _from_cupy_array(data, missing, threads, feature_names, feature_types)
Expand Down
6 changes: 4 additions & 2 deletions src/c_api/c_api.cu
Expand Up @@ -96,9 +96,11 @@ XGB_DLL int XGDMatrixCreateFromCudaColumnar(char const *data,

float missing = GetMissing(config);
auto n_threads = OptionalArg<Integer, std::int64_t>(config, "nthread", 0);
auto data_split_mode =
static_cast<DataSplitMode>(OptionalArg<Integer, int64_t>(config, "data_split_mode", 0));
data::CudfAdapter adapter(json_str);
*out =
new std::shared_ptr<DMatrix>(DMatrix::Create(&adapter, missing, n_threads));
*out = new std::shared_ptr<DMatrix>(
DMatrix::Create(&adapter, missing, n_threads, "", data_split_mode));
API_END();
}

Expand Down
6 changes: 5 additions & 1 deletion src/data/data.cc
Expand Up @@ -1045,13 +1045,17 @@ void SparsePage::SortIndices(int32_t n_threads) {
});
}

void SparsePage::Reindex(uint64_t feature_offset, int32_t n_threads) {
void SparsePage::ReindexCPU(uint64_t feature_offset, int32_t n_threads) {
auto& h_data = this->data.HostVector();
common::ParallelFor(h_data.size(), n_threads, [&](auto i) {
h_data[i].index += feature_offset;
});
}

#if !defined(XGBOOST_USE_CUDA)
void SparsePage::ReindexCUDA(uint64_t feature_offset) { common::AssertGPUSupport(); }
#endif // !defined(XGBOOST_USE_CUDA)

void SparsePage::SortRows(int32_t n_threads) {
auto& h_offset = this->offset.HostVector();
auto& h_data = this->data.HostVector();
Expand Down
5 changes: 5 additions & 0 deletions src/data/data.cu
Expand Up @@ -169,6 +169,11 @@ void MetaInfo::SetInfoFromCUDA(Context const& ctx, StringView key, Json array) {
}
}

void SparsePage::ReindexCUDA(uint64_t feature_offset) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bst_feature_t.

auto d_data = this->data.DeviceSpan();
dh::LaunchN(d_data.size(), [=] __device__(size_t idx) { d_data[idx].index += feature_offset; });
}

template <typename AdapterT>
DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread,
const std::string& cache_prefix, DataSplitMode data_split_mode) {
Expand Down
21 changes: 11 additions & 10 deletions src/data/simple_dmatrix.cc
Expand Up @@ -74,14 +74,18 @@ DMatrix* SimpleDMatrix::SliceCol(int num_slices, int slice_id) {
return out;
}

void SimpleDMatrix::ReindexFeatures(Context const* ctx) {
void SimpleDMatrix::ReindexFeatures() {
if (info_.IsColumnSplit() && collective::GetWorldSize() > 1) {
auto const cols = collective::Allgather(info_.num_col_);
auto const offset = std::accumulate(cols.cbegin(), cols.cbegin() + collective::GetRank(), 0ul);
if (offset == 0) {
return;
}
sparse_page_->Reindex(offset, ctx->Threads());
if (fmat_ctx_.IsCUDA()) {
sparse_page_->ReindexCUDA(offset);
} else {
sparse_page_->ReindexCPU(offset, fmat_ctx_.Threads());
}
}
}

Expand Down Expand Up @@ -216,8 +220,7 @@ BatchSet<ExtSparsePage> SimpleDMatrix::GetExtBatches(Context const*, BatchParam
template <typename AdapterT>
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread,
DataSplitMode data_split_mode) {
Context ctx;
ctx.Init(Args{{"nthread", std::to_string(nthread)}});
fmat_ctx_.Init(Args{{"nthread", std::to_string(nthread)}});

std::vector<uint64_t> qids;
uint64_t default_max = std::numeric_limits<uint64_t>::max();
Expand All @@ -233,7 +236,7 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread,
// Iterate over batches of input data
while (adapter->Next()) {
auto& batch = adapter->Value();
auto batch_max_columns = sparse_page_->Push(batch, missing, ctx.Threads());
auto batch_max_columns = sparse_page_->Push(batch, missing, fmat_ctx_.Threads());
inferred_num_columns = std::max(batch_max_columns, inferred_num_columns);
total_batch_size += batch.Size();
// Append meta information if available
Expand Down Expand Up @@ -282,7 +285,7 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread,

// Synchronise worker columns
info_.data_split_mode = data_split_mode;
ReindexFeatures(&ctx);
ReindexFeatures();
info_.SynchronizeNumberOfColumns();

if (adapter->NumRows() == kAdapterUnknownSize) {
Expand Down Expand Up @@ -315,11 +318,9 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread,
info_.num_nonzero_ = data_vec.size();

// Sort the index for row partitioners used by variuos tree methods.
if (!sparse_page_->IsIndicesSorted(ctx.Threads())) {
sparse_page_->SortIndices(ctx.Threads());
if (!sparse_page_->IsIndicesSorted(fmat_ctx_.Threads())) {
sparse_page_->SortIndices(fmat_ctx_.Threads());
}

this->fmat_ctx_ = ctx;
}

SimpleDMatrix::SimpleDMatrix(dmlc::Stream* in_stream) {
Expand Down
8 changes: 2 additions & 6 deletions src/data/simple_dmatrix.cu
Expand Up @@ -17,16 +17,13 @@ namespace xgboost::data {
template <typename AdapterT>
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, std::int32_t nthread,
DataSplitMode data_split_mode) {
CHECK(data_split_mode != DataSplitMode::kCol)
<< "Column-wise data split is currently not supported on the GPU.";
auto device = (adapter->Device().IsCPU() || adapter->NumRows() == 0)
? DeviceOrd::CUDA(dh::CurrentDevice())
: adapter->Device();
CHECK(device.IsCUDA());
dh::safe_cuda(cudaSetDevice(device.ordinal));

Context ctx;
ctx.Init(Args{{"nthread", std::to_string(nthread)}, {"device", device.Name()}});
fmat_ctx_.Init(Args{{"nthread", std::to_string(nthread)}, {"device", device.Name()}});

CHECK(adapter->NumRows() != kAdapterUnknownSize);
CHECK(adapter->NumColumns() != kAdapterUnknownSize);
Expand All @@ -42,9 +39,8 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, std::int32_t nthr
info_.num_row_ = adapter->NumRows();
// Synchronise worker columns
info_.data_split_mode = data_split_mode;
ReindexFeatures();
info_.SynchronizeNumberOfColumns();

this->fmat_ctx_ = ctx;
}

template SimpleDMatrix::SimpleDMatrix(CudfAdapter* adapter, float missing,
Expand Down
2 changes: 1 addition & 1 deletion src/data/simple_dmatrix.h
Expand Up @@ -69,7 +69,7 @@ class SimpleDMatrix : public DMatrix {
* are globally indexed, so we reindex the features based on the offset needed to obtain the
* global view.
*/
void ReindexFeatures(Context const* ctx);
void ReindexFeatures();

private:
// Context used only for DMatrix initialization.
Expand Down
1 change: 1 addition & 0 deletions tests/ci_build/lint_python.py
Expand Up @@ -26,6 +26,7 @@ class LintersPaths:
"tests/python/test_tree_regularization.py",
"tests/python/test_shap.py",
"tests/python/test_with_pandas.py",
"tests/python-gpu/test_from_cudf.py",
"tests/python-gpu/test_gpu_data_iterator.py",
"tests/python-gpu/test_gpu_prediction.py",
"tests/python-gpu/load_pickle.py",
Expand Down