From f0c1b842bf09e83b0818145e0bed3a6113690d3e Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Thu, 23 Jun 2022 00:03:02 +0800 Subject: [PATCH] Implement sketching with adapter. (#8019) --- src/common/hist_util.cc | 34 ++++++++ src/common/hist_util.h | 37 +------- src/common/quantile.cc | 140 +++++++----------------------- src/common/quantile.h | 133 +++++++++++++++++++++++++--- src/data/gradient_index.h | 3 - tests/cpp/common/test_quantile.cc | 13 ++- 6 files changed, 196 insertions(+), 164 deletions(-) diff --git a/src/common/hist_util.cc b/src/common/hist_util.cc index 64ede3102811..c5de84704a87 100644 --- a/src/common/hist_util.cc +++ b/src/common/hist_util.cc @@ -33,6 +33,40 @@ HistogramCuts::HistogramCuts() { cut_ptrs_.HostVector().emplace_back(0); } +HistogramCuts SketchOnDMatrix(DMatrix *m, int32_t max_bins, int32_t n_threads, bool use_sorted, + Span const hessian) { + HistogramCuts out; + auto const& info = m->Info(); + std::vector reduced(info.num_col_, 0); + for (auto const &page : m->GetBatches()) { + auto const &entries_per_column = + CalcColumnSize(data::SparsePageAdapterBatch{page.GetView()}, info.num_col_, n_threads, + [](auto) { return true; }); + CHECK_EQ(entries_per_column.size(), info.num_col_); + for (size_t i = 0; i < entries_per_column.size(); ++i) { + reduced[i] += entries_per_column[i]; + } + } + + if (!use_sorted) { + HostSketchContainer container(max_bins, m->Info(), reduced, HostSketchContainer::UseGroup(info), + n_threads); + for (auto const& page : m->GetBatches()) { + container.PushRowPage(page, info, hessian); + } + container.MakeCuts(&out); + } else { + SortedSketchContainer container{max_bins, m->Info(), reduced, + HostSketchContainer::UseGroup(info), n_threads}; + for (auto const& page : m->GetBatches()) { + container.PushColPage(page, info, hessian); + } + container.MakeCuts(&out); + } + + return out; +} + /*! * \brief fill a histogram by zeros in range [begin, end) */ diff --git a/src/common/hist_util.h b/src/common/hist_util.h index 60b3688d8709..3b9f01c06c7f 100644 --- a/src/common/hist_util.h +++ b/src/common/hist_util.h @@ -152,41 +152,8 @@ class HistogramCuts { * \param use_sorted Whether should we use SortedCSC for sketching, it's more efficient * but consumes more memory. */ -inline HistogramCuts SketchOnDMatrix(DMatrix* m, int32_t max_bins, int32_t n_threads, - bool use_sorted = false, Span const hessian = {}) { - HistogramCuts out; - auto const& info = m->Info(); - std::vector> column_sizes(n_threads); - for (auto& column : column_sizes) { - column.resize(info.num_col_, 0); - } - std::vector reduced(info.num_col_, 0); - for (auto const& page : m->GetBatches()) { - auto const& entries_per_column = - HostSketchContainer::CalcColumnSize(page, info.num_col_, n_threads); - for (size_t i = 0; i < entries_per_column.size(); ++i) { - reduced[i] += entries_per_column[i]; - } - } - - if (!use_sorted) { - HostSketchContainer container(max_bins, m->Info(), reduced, HostSketchContainer::UseGroup(info), - n_threads); - for (auto const& page : m->GetBatches()) { - container.PushRowPage(page, info, hessian); - } - container.MakeCuts(&out); - } else { - SortedSketchContainer container{max_bins, m->Info(), reduced, - HostSketchContainer::UseGroup(info), n_threads}; - for (auto const& page : m->GetBatches()) { - container.PushColPage(page, info, hessian); - } - container.MakeCuts(&out); - } - - return out; -} +HistogramCuts SketchOnDMatrix(DMatrix* m, int32_t max_bins, int32_t n_threads, + bool use_sorted = false, Span const hessian = {}); enum BinTypeSize : uint8_t { kUint8BinsTypeSize = 1, diff --git a/src/common/quantile.cc b/src/common/quantile.cc index 42fe719708c7..9afc4880fb47 100644 --- a/src/common/quantile.cc +++ b/src/common/quantile.cc @@ -6,6 +6,7 @@ #include #include +#include "../data/adapter.h" #include "categorical.h" #include "hist_util.h" #include "rabit/rabit.h" @@ -31,72 +32,6 @@ SketchContainerImpl::SketchContainerImpl(std::vector column has_categorical_ = std::any_of(feature_types_.cbegin(), feature_types_.cend(), IsCatOp{}); } -template -std::vector SketchContainerImpl::CalcColumnSize(SparsePage const &batch, - bst_feature_t const n_columns, - size_t const nthreads) { - auto page = batch.GetView(); - std::vector> column_sizes(nthreads); - for (auto &column : column_sizes) { - column.resize(n_columns, 0); - } - - ParallelFor(page.Size(), nthreads, [&](omp_ulong i) { - auto &local_column_sizes = column_sizes.at(omp_get_thread_num()); - auto row = page[i]; - auto const *p_row = row.data(); - for (size_t j = 0; j < row.size(); ++j) { - local_column_sizes.at(p_row[j].index)++; - } - }); - std::vector entries_per_columns(n_columns, 0); - ParallelFor(n_columns, nthreads, [&](bst_omp_uint i) { - for (auto const &thread : column_sizes) { - entries_per_columns[i] += thread[i]; - } - }); - return entries_per_columns; -} - -template -std::vector SketchContainerImpl::LoadBalance(SparsePage const &batch, - bst_feature_t n_columns, - size_t const nthreads) { - /* Some sparse datasets have their mass concentrating on small number of features. To - * avoid waiting for a few threads running forever, we here distribute different number - * of columns to different threads according to number of entries. - */ - auto page = batch.GetView(); - size_t const total_entries = page.data.size(); - size_t const entries_per_thread = DivRoundUp(total_entries, nthreads); - - std::vector> column_sizes(nthreads); - for (auto& column : column_sizes) { - column.resize(n_columns, 0); - } - std::vector entries_per_columns = - CalcColumnSize(batch, n_columns, nthreads); - std::vector cols_ptr(nthreads + 1, 0); - size_t count {0}; - size_t current_thread {1}; - - for (auto col : entries_per_columns) { - cols_ptr.at(current_thread)++; // add one column to thread - count += col; - CHECK_LE(count, total_entries); - if (count > entries_per_thread) { - current_thread++; - count = 0; - cols_ptr.at(current_thread) = cols_ptr[current_thread-1]; - } - } - // Idle threads. - for (; current_thread < cols_ptr.size() - 1; ++current_thread) { - cols_ptr[current_thread+1] = cols_ptr[current_thread]; - } - return cols_ptr; -} - namespace { // Function to merge hessian and sample weights std::vector MergeWeights(MetaInfo const &info, Span hessian, bool use_group, @@ -143,54 +78,37 @@ void SketchContainerImpl::PushRowPage(SparsePage const &page, MetaInfo CHECK_EQ(weights.size(), info.num_row_); } - auto batch = page.GetView(); - // Parallel over columns. Each thread owns a set of consecutive columns. - auto const ncol = static_cast(info.num_col_); - auto thread_columns_ptr = LoadBalance(page, info.num_col_, n_threads_); - - dmlc::OMPException exc; -#pragma omp parallel num_threads(n_threads_) - { - exc.Run([&]() { - auto tid = static_cast(omp_get_thread_num()); - auto const begin = thread_columns_ptr[tid]; - auto const end = thread_columns_ptr[tid + 1]; - - // do not iterate if no columns are assigned to the thread - if (begin < end && end <= ncol) { - for (size_t i = 0; i < batch.Size(); ++i) { - size_t const ridx = page.base_rowid + i; - SparsePage::Inst const inst = batch[i]; - auto w = weights.empty() ? 1.0f : weights[ridx]; - auto p_inst = inst.data(); - if (is_dense) { - for (size_t ii = begin; ii < end; ii++) { - if (IsCat(feature_types_, ii)) { - categories_[ii].emplace(p_inst[ii].fvalue); - } else { - sketches_[ii].Push(p_inst[ii].fvalue, w); - } - } - } else { - for (size_t i = 0; i < inst.size(); ++i) { - auto const& entry = p_inst[i]; - if (entry.index >= begin && entry.index < end) { - if (IsCat(feature_types_, entry.index)) { - categories_[entry.index].emplace(entry.fvalue); - } else { - sketches_[entry.index].Push(entry.fvalue, w); - } - } - } - } - } - } - }); - } - exc.Rethrow(); + auto batch = data::SparsePageAdapterBatch{page.GetView()}; + this->PushRowPageImpl(batch, page.base_rowid, OptionalWeights{weights}, page.data.Size(), + info.num_col_, is_dense, [](auto) { return true; }); monitor_.Stop(__func__); } +template +void HostSketchContainer::PushAdapterBatch(Batch const &batch, size_t base_rowid, + MetaInfo const &info, size_t nnz, float missing) { + auto const &h_weights = + (use_group_ind_ ? detail::UnrollGroupWeights(info) : info.weights_.HostVector()); + + auto is_valid = data::IsValidFunctor{missing}; + auto weights = OptionalWeights{Span{h_weights}}; + // the nnz from info is not reliable as sketching might be the first place to go through + // the data. + auto is_dense = nnz == info.num_col_ * info.num_row_; + this->PushRowPageImpl(batch, base_rowid, weights, nnz, info.num_col_, is_dense, is_valid); +} + +#define INSTANTIATE(_type) \ + template void HostSketchContainer::PushAdapterBatch( \ + data::_type const &batch, size_t base_rowid, MetaInfo const &info, size_t nnz, \ + float missing); + +INSTANTIATE(ArrayAdapterBatch) +INSTANTIATE(CSRArrayAdapterBatch) +INSTANTIATE(CSCAdapterBatch) +INSTANTIATE(DataTableAdapterBatch) +INSTANTIATE(SparsePageAdapterBatch) + namespace { /** * \brief A view over gathered sketch values. diff --git a/src/common/quantile.h b/src/common/quantile.h index 7f08be442fd2..a17eb58437f9 100644 --- a/src/common/quantile.h +++ b/src/common/quantile.h @@ -8,15 +8,19 @@ #define XGBOOST_COMMON_QUANTILE_H_ #include -#include #include +#include + +#include #include -#include #include -#include #include #include +#include +#include "categorical.h" +#include "common.h" +#include "threading_utils.h" #include "timer.h" namespace xgboost { @@ -722,6 +726,69 @@ inline std::vector UnrollGroupWeights(MetaInfo const &info) { class HistogramCuts; +template +std::vector CalcColumnSize(Batch const &batch, bst_feature_t const n_columns, + size_t const n_threads, IsValid &&is_valid) { + std::vector> column_sizes_tloc(n_threads); + for (auto &column : column_sizes_tloc) { + column.resize(n_columns, 0); + } + + ParallelFor(batch.Size(), n_threads, [&](omp_ulong i) { + auto &local_column_sizes = column_sizes_tloc.at(omp_get_thread_num()); + auto const &line = batch.GetLine(i); + for (size_t j = 0; j < line.Size(); ++j) { + auto elem = line.GetElement(j); + if (is_valid(elem)) { + local_column_sizes[elem.column_idx]++; + } + } + }); + // reduce to first thread + auto &entries_per_columns = column_sizes_tloc.front(); + CHECK_EQ(entries_per_columns.size(), static_cast(n_columns)); + for (size_t i = 1; i < n_threads; ++i) { + CHECK_EQ(column_sizes_tloc[i].size(), static_cast(n_columns)); + for (size_t j = 0; j < n_columns; ++j) { + entries_per_columns[j] += column_sizes_tloc[i][j]; + } + } + return entries_per_columns; +} + +template +std::vector LoadBalance(Batch const &batch, size_t nnz, bst_feature_t n_columns, + size_t const nthreads, IsValid&& is_valid) { + /* Some sparse datasets have their mass concentrating on small number of features. To + * avoid waiting for a few threads running forever, we here distribute different number + * of columns to different threads according to number of entries. + */ + size_t const total_entries = nnz; + size_t const entries_per_thread = DivRoundUp(total_entries, nthreads); + + // Need to calculate the size for each batch. + std::vector entries_per_columns = CalcColumnSize(batch, n_columns, nthreads, is_valid); + std::vector cols_ptr(nthreads + 1, 0); + size_t count{0}; + size_t current_thread{1}; + + for (auto col : entries_per_columns) { + cols_ptr.at(current_thread)++; // add one column to thread + count += col; + CHECK_LE(count, total_entries); + if (count > entries_per_thread) { + current_thread++; + count = 0; + cols_ptr.at(current_thread) = cols_ptr[current_thread - 1]; + } + } + // Idle threads. + for (; current_thread < cols_ptr.size() - 1; ++current_thread) { + cols_ptr[current_thread + 1] = cols_ptr[current_thread]; + } + return cols_ptr; +} + /*! * A sketch matrix storing sketches for each feature. */ @@ -759,14 +826,6 @@ class SketchContainerImpl { return use_group_ind; } - static std::vector CalcColumnSize(SparsePage const &page, - bst_feature_t const n_columns, - size_t const nthreads); - - static std::vector LoadBalance(SparsePage const &page, - bst_feature_t n_columns, - size_t const nthreads); - static uint32_t SearchGroupIndFromRow(std::vector const &group_ptr, size_t const base_rowid) { CHECK_LT(base_rowid, group_ptr.back()) @@ -785,6 +844,54 @@ class SketchContainerImpl { void AllReduce(std::vector *p_reduced, std::vector *p_num_cuts); + template + void PushRowPageImpl(Batch const &batch, size_t base_rowid, OptionalWeights weights, size_t nnz, + size_t n_features, bool is_dense, IsValid is_valid) { + auto thread_columns_ptr = LoadBalance(batch, nnz, n_features, n_threads_, is_valid); + + dmlc::OMPException exc; +#pragma omp parallel num_threads(n_threads_) + { + exc.Run([&]() { + auto tid = static_cast(omp_get_thread_num()); + auto const begin = thread_columns_ptr[tid]; + auto const end = thread_columns_ptr[tid + 1]; + + // do not iterate if no columns are assigned to the thread + if (begin < end && end <= n_features) { + for (size_t ridx = 0; ridx < batch.Size(); ++ridx) { + auto const &line = batch.GetLine(ridx); + auto w = weights[ridx + base_rowid]; + if (is_dense) { + for (size_t ii = begin; ii < end; ii++) { + auto elem = line.GetElement(ii); + if (is_valid(elem)) { + if (IsCat(feature_types_, ii)) { + categories_[ii].emplace(elem.value); + } else { + sketches_[ii].Push(elem.value, w); + } + } + } + } else { + for (size_t i = 0; i < line.Size(); ++i) { + auto const &elem = line.GetElement(i); + if (is_valid(elem) && elem.column_idx >= begin && elem.column_idx < end) { + if (IsCat(feature_types_, elem.column_idx)) { + categories_[elem.column_idx].emplace(elem.value); + } else { + sketches_[elem.column_idx].Push(elem.value, w); + } + } + } + } + } + } + }); + } + exc.Rethrow(); + } + /* \brief Push a CSR matrix. */ void PushRowPage(SparsePage const &page, MetaInfo const &info, Span hessian = {}); @@ -798,6 +905,10 @@ class HostSketchContainer : public SketchContainerImpl columns_size, bool use_group, int32_t n_threads); + + template + void PushAdapterBatch(Batch const &batch, size_t base_rowid, MetaInfo const &info, size_t nnz, + float missing); }; /** diff --git a/src/data/gradient_index.h b/src/data/gradient_index.h index 7074a3d9dee4..8096b9c98e16 100644 --- a/src/data/gradient_index.h +++ b/src/data/gradient_index.h @@ -29,9 +29,6 @@ class GHistIndexMatrix { /** * \brief Push a page into index matrix, the function is only necessary because hist has * partial support for external memory. - * - * \param rbegin The beginning row index of current page. (total rows in previous pages) - * \param prev_sum Total number of entries in previous pages. */ void PushBatch(SparsePage const& batch, common::Span ft, bst_bin_t n_total_bins, int32_t n_threads); diff --git a/tests/cpp/common/test_quantile.cc b/tests/cpp/common/test_quantile.cc index 3e71b0a020d1..5dc4f81af8a6 100644 --- a/tests/cpp/common/test_quantile.cc +++ b/tests/cpp/common/test_quantile.cc @@ -1,10 +1,13 @@ /*! * Copyright 2020-2022 by XGBoost Contributors */ -#include #include "test_quantile.h" -#include "../../../src/common/quantile.h" + +#include + #include "../../../src/common/hist_util.h" +#include "../../../src/common/quantile.h" +#include "../../../src/data/adapter.h" namespace xgboost { namespace common { @@ -13,8 +16,9 @@ TEST(Quantile, LoadBalance) { size_t constexpr kRows = 1000, kCols = 100; auto m = RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix(); std::vector cols_ptr; - for (auto const &page : m->GetBatches()) { - cols_ptr = HostSketchContainer::LoadBalance(page, kCols, 13); + for (auto const& page : m->GetBatches()) { + data::SparsePageAdapterBatch adapter{page.GetView()}; + cols_ptr = LoadBalance(adapter, page.data.Size(), kCols, 13, [](auto) { return true; }); } size_t n_cols = 0; for (size_t i = 1; i < cols_ptr.size(); ++i) { @@ -22,6 +26,7 @@ TEST(Quantile, LoadBalance) { } CHECK_EQ(n_cols, kCols); } + namespace { template using ContainerType = std::conditional_t;