Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare gradient index for Quantile DMatrix. #8103

Merged
merged 2 commits into from Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
134 changes: 85 additions & 49 deletions src/data/gradient_index.cc
Expand Up @@ -7,6 +7,7 @@
#include <algorithm>
#include <limits>
#include <memory>
#include <utility> // std::forward

#include "../common/column_matrix.h"
#include "../common/hist_util.h"
Expand Down Expand Up @@ -43,7 +44,7 @@ GHistIndexMatrix::GHistIndexMatrix(DMatrix *p_fmat, bst_bin_t max_bins_per_feat,
auto ft = p_fmat->Info().feature_types.ConstHostSpan();

for (const auto &batch : p_fmat->GetBatches<SparsePage>()) {
this->PushBatch(batch, ft, nbins, n_threads);
this->PushBatch(batch, ft, n_threads);
}
this->columns_ = std::make_unique<common::ColumnMatrix>();

Expand All @@ -57,62 +58,29 @@ GHistIndexMatrix::GHistIndexMatrix(DMatrix *p_fmat, bst_bin_t max_bins_per_feat,
}
}

GHistIndexMatrix::GHistIndexMatrix(MetaInfo const &info, common::HistogramCuts &&cuts,
bst_bin_t max_bin_per_feat)
: row_ptr(info.num_row_ + 1, 0),
hit_count(cuts.TotalBins(), 0),
cut{std::forward<common::HistogramCuts>(cuts)},
max_num_bins(max_bin_per_feat),
isDense_{info.num_col_ * info.num_row_ == info.num_nonzero_} {}

GHistIndexMatrix::~GHistIndexMatrix() = default;

void GHistIndexMatrix::PushBatch(SparsePage const &batch, common::Span<FeatureType const> ft,
bst_bin_t n_total_bins, int32_t n_threads) {
int32_t n_threads) {
auto page = batch.GetView();
auto it = common::MakeIndexTransformIter([&](size_t ridx) { return page[ridx].size(); });
common::PartialSum(n_threads, it, it + page.Size(), static_cast<size_t>(0), row_ptr.begin());
// The number of threads is pegged to the batch size. If the OMP block is parallelized
// on anything other than the batch/block size, it should be reassigned
const size_t batch_threads =
std::max(static_cast<size_t>(1), std::min(batch.Size(), static_cast<size_t>(n_threads)));

const size_t n_index = row_ptr[batch.Size()]; // number of entries in this page
ResizeIndex(n_index, isDense_);

CHECK_GT(cut.Values().size(), 0U);

if (isDense_) {
index.SetBinOffset(cut.Ptrs());
}
uint32_t const *offsets = index.Offset();

auto n_bins_total = cut.TotalBins();
auto is_valid = [](auto) { return true; }; // SparsePage always contains valid entries
data::SparsePageAdapterBatch adapter_batch{page};
if (isDense_) {
// Inside the lambda functions, bin_idx is the index for cut value across all
// features. By subtracting it with starting pointer of each feature, we can reduce
// it to smaller value and compress it to smaller types.
common::DispatchBinType(index.GetBinTypeSize(), [&](auto dtype) {
using T = decltype(dtype);
common::Span<T> index_data_span = {index.data<T>(), index.Size()};
SetIndexData(
index_data_span, ft, batch_threads, adapter_batch, is_valid, n_bins_total,
[offsets](auto bin_idx, auto fidx) { return static_cast<T>(bin_idx - offsets[fidx]); });
});
} else {
/* For sparse DMatrix we have to store index of feature for each bin
in index field to chose right offset. So offset is nullptr and index is
not reduced */
common::Span<uint32_t> index_data_span = {index.data<uint32_t>(), n_index};
SetIndexData(index_data_span, ft, batch_threads, adapter_batch, is_valid, n_bins_total,
[](auto idx, auto) { return idx; });
}

common::ParallelFor(n_total_bins, n_threads, [&](bst_omp_uint idx) {
for (int32_t tid = 0; tid < n_threads; ++tid) {
hit_count[idx] += hit_count_tloc_[tid * n_total_bins + idx];
hit_count_tloc_[tid * n_total_bins + idx] = 0; // reset for next batch
}
});
auto is_valid = [](auto) { return true; }; // SparsePage always contains valid entries
PushBatchImpl(n_threads, adapter_batch, 0, is_valid, ft);
}

void GHistIndexMatrix::Init(SparsePage const &batch, common::Span<FeatureType const> ft,
common::HistogramCuts const &cuts, int32_t max_bins_per_feat,
bool isDense, double sparse_thresh, int32_t n_threads) {
GHistIndexMatrix::GHistIndexMatrix(SparsePage const &batch, common::Span<FeatureType const> ft,
common::HistogramCuts const &cuts, int32_t max_bins_per_feat,
bool isDense, double sparse_thresh, int32_t n_threads) {
CHECK_GE(n_threads, 1);
base_rowid = batch.base_rowid;
isDense_ = isDense;
Expand All @@ -127,13 +95,30 @@ void GHistIndexMatrix::Init(SparsePage const &batch, common::Span<FeatureType co
hit_count.resize(nbins, 0);
hit_count_tloc_.resize(n_threads * nbins, 0);

this->PushBatch(batch, ft, nbins, n_threads);
this->PushBatch(batch, ft, n_threads);
this->columns_ = std::make_unique<common::ColumnMatrix>();
if (!std::isnan(sparse_thresh)) {
this->columns_->Init(batch, *this, sparse_thresh, n_threads);
}
}

template <typename Batch>
void GHistIndexMatrix::PushAdapterBatchColumns(Context const *ctx, Batch const &batch,
float missing, size_t rbegin) {
CHECK(columns_);
this->columns_->PushBatch(ctx->Threads(), batch, missing, *this, rbegin);
}

#define INSTANTIATION_PUSH(BatchT) \
template void GHistIndexMatrix::PushAdapterBatchColumns<BatchT>( \
Context const *ctx, BatchT const &batch, float missing, size_t rbegin);

INSTANTIATION_PUSH(data::CSRArrayAdapterBatch)
INSTANTIATION_PUSH(data::ArrayAdapterBatch)
INSTANTIATION_PUSH(data::SparsePageAdapterBatch)

#undef INSTANTIATION_PUSH

void GHistIndexMatrix::ResizeIndex(const size_t n_index, const bool isDense) {
if ((max_num_bins - 1 <= static_cast<int>(std::numeric_limits<uint8_t>::max())) && isDense) {
// compress dense index to uint8
Expand All @@ -156,6 +141,57 @@ common::ColumnMatrix const &GHistIndexMatrix::Transpose() const {
return *columns_;
}

float GHistIndexMatrix::GetFvalue(size_t ridx, size_t fidx, bool is_cat) const {
auto const &values = cut.Values();
auto const &mins = cut.MinValues();
auto const &ptrs = cut.Ptrs();
if (is_cat) {
auto f_begin = ptrs[fidx];
auto f_end = ptrs[fidx + 1];
auto begin = RowIdx(ridx);
auto end = RowIdx(ridx + 1);
auto gidx = BinarySearchBin(begin, end, index, f_begin, f_end);
if (gidx == -1) {
return std::numeric_limits<float>::quiet_NaN();
}
return values[gidx];
}

auto lower = static_cast<bst_bin_t>(cut.Ptrs()[fidx]);
auto get_bin_idx = [&](auto &column) {
auto bin_idx = column[ridx];
if (bin_idx == common::DenseColumnIter<uint8_t, true>::kMissingId) {
return std::numeric_limits<float>::quiet_NaN();
}
if (bin_idx == lower) {
return mins[fidx];
}
return values[bin_idx - 1];
};

if (columns_->GetColumnType(fidx) == common::kDenseColumn) {
if (columns_->AnyMissing()) {
return common::DispatchBinType(columns_->GetTypeSize(), [&](auto dtype) {
auto column = columns_->DenseColumn<decltype(dtype), true>(fidx);
return get_bin_idx(column);
});
} else {
return common::DispatchBinType(columns_->GetTypeSize(), [&](auto dtype) {
auto column = columns_->DenseColumn<decltype(dtype), false>(fidx);
return get_bin_idx(column);
});
}
} else {
return common::DispatchBinType(columns_->GetTypeSize(), [&](auto dtype) {
auto column = columns_->SparseColumn<decltype(dtype)>(fidx, 0);
return get_bin_idx(column);
});
}

SPAN_CHECK(false);
return std::numeric_limits<float>::quiet_NaN();
}

bool GHistIndexMatrix::ReadColumnPage(dmlc::SeekStream *fi) {
return this->columns_->Read(fi, this->cut.Ptrs().data());
}
Expand Down
127 changes: 114 additions & 13 deletions src/data/gradient_index.h
Expand Up @@ -4,46 +4,64 @@
*/
#ifndef XGBOOST_DATA_GRADIENT_INDEX_H_
#define XGBOOST_DATA_GRADIENT_INDEX_H_

#include <algorithm> // std::min
#include <memory>
#include <vector>

#include "../common/categorical.h"
#include "../common/hist_util.h"
#include "../common/numeric.h"
#include "../common/threading_utils.h"
#include "adapter.h"
#include "proxy_dmatrix.h"
#include "xgboost/base.h"
#include "xgboost/data.h"

namespace xgboost {
namespace common {
class ColumnMatrix;
} // namespace common

/*!
* \brief preprocessed global index matrix, in CSR format
*
* Transform floating values to integer index in histogram This is a global histogram
* index for CPU histogram. On GPU ellpack page is used.
*/
class GHistIndexMatrix {
// Get the size of each row
template <typename AdapterBatchT>
auto GetRowCounts(AdapterBatchT const& batch, float missing, int32_t n_threads) {
std::vector<size_t> valid_counts(batch.Size(), 0);
common::ParallelFor(batch.Size(), n_threads, [&](size_t i) {
auto line = batch.GetLine(i);
for (size_t j = 0; j < line.Size(); ++j) {
data::COOTuple elem = line.GetElement(j);
if (data::IsValidFunctor {missing}(elem)) {
valid_counts[i]++;
}
}
});
return valid_counts;
}

/**
* \brief Push a page into index matrix, the function is only necessary because hist has
* partial support for external memory.
*/
void PushBatch(SparsePage const& batch, common::Span<FeatureType const> ft,
bst_bin_t n_total_bins, int32_t n_threads);
void PushBatch(SparsePage const& batch, common::Span<FeatureType const> ft, int32_t n_threads);

template <typename Batch, typename BinIdxType, typename GetOffset, typename IsValid>
void SetIndexData(common::Span<BinIdxType> index_data_span, common::Span<FeatureType const> ft,
size_t batch_threads, Batch const& batch, IsValid&& is_valid, size_t nbins,
GetOffset&& get_offset) {
void SetIndexData(common::Span<BinIdxType> index_data_span, size_t rbegin,
common::Span<FeatureType const> ft, size_t batch_threads, Batch const& batch,
IsValid&& is_valid, size_t nbins, GetOffset&& get_offset) {
auto batch_size = batch.Size();
BinIdxType* index_data = index_data_span.data();
auto const& ptrs = cut.Ptrs();
auto const& values = cut.Values();
common::ParallelFor(batch_size, batch_threads, [&](size_t i) {
auto line = batch.GetLine(i);
size_t ibegin = row_ptr[i]; // index of first entry for current block
size_t ibegin = row_ptr[rbegin + i]; // index of first entry for current block
size_t k = 0;
auto tid = omp_get_thread_num();
for (size_t j = 0; j < line.Size(); ++j) {
Expand All @@ -63,6 +81,49 @@ class GHistIndexMatrix {
});
}

template <typename Batch, typename IsValid>
void PushBatchImpl(int32_t n_threads, Batch const& batch, size_t rbegin, IsValid&& is_valid,
common::Span<FeatureType const> ft) {
// The number of threads is pegged to the batch size. If the OMP block is parallelized
// on anything other than the batch/block size, it should be reassigned
size_t batch_threads =
std::max(static_cast<size_t>(1), std::min(batch.Size(), static_cast<size_t>(n_threads)));

auto n_bins_total = cut.TotalBins();
const size_t n_index = row_ptr[rbegin + batch.Size()]; // number of entries in this page
ResizeIndex(n_index, isDense_);
if (isDense_) {
index.SetBinOffset(cut.Ptrs());
}
uint32_t const* offsets = index.Offset();
if (isDense_) {
// Inside the lambda functions, bin_idx is the index for cut value across all
// features. By subtracting it with starting pointer of each feature, we can reduce
// it to smaller value and compress it to smaller types.
common::DispatchBinType(index.GetBinTypeSize(), [&](auto dtype) {
using T = decltype(dtype);
common::Span<T> index_data_span = {index.data<T>(), index.Size()};
SetIndexData(
index_data_span, rbegin, ft, batch_threads, batch, is_valid, n_bins_total,
[offsets](auto bin_idx, auto fidx) { return static_cast<T>(bin_idx - offsets[fidx]); });
});
} else {
/* For sparse DMatrix we have to store index of feature for each bin
in index field to chose right offset. So offset is nullptr and index is
not reduced */
common::Span<uint32_t> index_data_span = {index.data<uint32_t>(), n_index};
SetIndexData(index_data_span, rbegin, ft, batch_threads, batch, is_valid, n_bins_total,
[](auto idx, auto) { return idx; });
}

common::ParallelFor(n_bins_total, n_threads, [&](bst_omp_uint idx) {
for (int32_t tid = 0; tid < n_threads; ++tid) {
hit_count[idx] += hit_count_tloc_[tid * n_bins_total + idx];
hit_count_tloc_[tid * n_bins_total + idx] = 0; // reset for next batch
}
});
}

public:
/*! \brief row pointer to rows by element position */
std::vector<size_t> row_ptr;
Expand All @@ -77,15 +138,53 @@ class GHistIndexMatrix {
/*! \brief base row index for current page (used by external memory) */
size_t base_rowid{0};

GHistIndexMatrix();
~GHistIndexMatrix();
/**
* \brief Constrcutor for SimpleDMatrix.
*/
GHistIndexMatrix(DMatrix* x, bst_bin_t max_bins_per_feat, double sparse_thresh,
bool sorted_sketch, int32_t n_threads, common::Span<float> hess = {});
~GHistIndexMatrix();
/**
* \brief Constructor for Iterative DMatrix. Initialize basic information and prepare
* for push batch.
*/
GHistIndexMatrix(MetaInfo const& info, common::HistogramCuts&& cuts, bst_bin_t max_bin_per_feat);
/**
* \brief Constructor for external memory.
*/
GHistIndexMatrix(SparsePage const& page, common::Span<FeatureType const> ft,
common::HistogramCuts const& cuts, int32_t max_bins_per_feat, bool is_dense,
double sparse_thresh, int32_t n_threads);
GHistIndexMatrix(); // also for ext mem, empty ctor so that we can read the cache back.

template <typename Batch>
void PushAdapterBatch(Context const* ctx, size_t rbegin, size_t prev_sum, Batch const& batch,
float missing, common::Span<FeatureType const> ft, double sparse_thresh,
size_t n_samples_total) {
auto n_bins_total = cut.TotalBins();
hit_count_tloc_.clear();
hit_count_tloc_.resize(ctx->Threads() * n_bins_total, 0);

auto n_threads = ctx->Threads();
auto valid_counts = GetRowCounts(batch, missing, n_threads);

auto it = common::MakeIndexTransformIter([&](size_t ridx) { return valid_counts[ridx]; });
common::PartialSum(n_threads, it, it + batch.Size(), prev_sum, row_ptr.begin() + rbegin);
auto is_valid = data::IsValidFunctor{missing};

PushBatchImpl(ctx->Threads(), batch, rbegin, is_valid, ft);

if (rbegin + batch.Size() == n_samples_total) {
// finished
CHECK(!std::isnan(sparse_thresh));
this->columns_ = std::make_unique<common::ColumnMatrix>(*this, sparse_thresh);
}
}

// Create a global histogram matrix, given cut. Used by external memory
void Init(SparsePage const& page, common::Span<FeatureType const> ft,
common::HistogramCuts const& cuts, int32_t max_bins_per_feat, bool is_dense,
double sparse_thresh, int32_t n_threads);
// Call ColumnMatrix::PushBatch
template <typename Batch>
void PushAdapterBatchColumns(Context const* ctx, Batch const& batch, float missing,
size_t rbegin);

void ResizeIndex(const size_t n_index, const bool isDense);

Expand Down Expand Up @@ -117,6 +216,8 @@ class GHistIndexMatrix {

common::ColumnMatrix const& Transpose() const;

float GetFvalue(size_t ridx, size_t fidx, bool is_cat) const;

private:
std::unique_ptr<common::ColumnMatrix> columns_;
std::vector<size_t> hit_count_tloc_;
Expand Down
5 changes: 2 additions & 3 deletions src/data/gradient_index_page_source.cc
Expand Up @@ -15,10 +15,9 @@ void GradientIndexPageSource::Fetch() {
// This is not read from cache so we still need it to be synced with sparse page source.
CHECK_EQ(count_, source_->Iter());
auto const& csr = source_->Page();
this->page_.reset(new GHistIndexMatrix());
CHECK_NE(cuts_.Values().size(), 0);
this->page_->Init(*csr, feature_types_, cuts_, max_bin_per_feat_, is_dense_, sparse_thresh_,
nthreads_);
this->page_.reset(new GHistIndexMatrix(*csr, feature_types_, cuts_, max_bin_per_feat_,
is_dense_, sparse_thresh_, nthreads_));
this->WriteCache();
}
}
Expand Down