Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support categorical data in GPU sketching. #6137

Merged
merged 7 commits into from Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 60 additions & 8 deletions src/common/hist_util.cu
Expand Up @@ -24,6 +24,7 @@
#include "hist_util.cuh"
#include "math.h" // NOLINT
#include "quantile.h"
#include "categorical.h"
#include "xgboost/host_device_vector.h"


Expand Down Expand Up @@ -121,11 +122,59 @@ void SortByWeight(dh::XGBCachingDeviceAllocator<char>* alloc,
return a.index == b.index;
});
}

struct IsCatOp {
XGBOOST_DEVICE bool operator()(FeatureType ft) { return ft == FeatureType::kCategorical; }
};

void RemoveDuplicatedCategories(
int32_t device, MetaInfo const &info, Span<bst_row_t> d_cuts_ptr,
dh::device_vector<Entry> *p_sorted_entries,
dh::caching_device_vector<size_t> const &column_sizes_scan) {
auto d_feature_types = info.feature_types.ConstDeviceSpan();
if (!info.feature_types.Empty() &&
thrust::any_of(dh::tbegin(d_feature_types), dh::tend(d_feature_types),
IsCatOp{})) {
auto& sorted_entries = *p_sorted_entries;
// Removing duplicated entries in categorical features.
dh::caching_device_vector<size_t> new_column_scan(column_sizes_scan.size());
dh::SegmentedUnique(column_sizes_scan.data().get(),
column_sizes_scan.data().get() +
column_sizes_scan.size(),
sorted_entries.begin(), sorted_entries.end(),
new_column_scan.data().get(), sorted_entries.begin(),
[=] __device__(Entry const &l, Entry const &r) {
if (l.index == r.index) {
if (IsCat(d_feature_types, l.index)) {
return l.fvalue == r.fvalue;
}
}
return false;
});

// Renew the column scan and cut scan based on categorical data.
dh::caching_device_vector<SketchContainer::OffsetT> new_cuts_size(
info.num_col_ + 1);
auto d_new_cuts_size = dh::ToSpan(new_cuts_size);
auto d_new_columns_ptr = dh::ToSpan(new_column_scan);
CHECK_EQ(new_column_scan.size(), new_cuts_size.size());
dh::LaunchN(device, new_column_scan.size() - 1, [=] __device__(size_t idx) {
if (IsCat(d_feature_types, idx)) {
d_new_cuts_size[idx] =
d_new_columns_ptr[idx + 1] - d_new_columns_ptr[idx];
} else {
d_new_cuts_size[idx] = d_cuts_ptr[idx] - d_cuts_ptr[idx];
}
});
thrust::exclusive_scan(thrust::device, new_cuts_size.cbegin(),
new_cuts_size.cend(), d_cuts_ptr.data());
}
}
} // namespace detail

void ProcessBatch(int device, const SparsePage &page, size_t begin, size_t end,
SketchContainer *sketch_container, int num_cuts_per_feature,
size_t num_columns) {
void ProcessBatch(int device, MetaInfo const &info, const SparsePage &page,
size_t begin, size_t end, SketchContainer *sketch_container,
int num_cuts_per_feature, size_t num_columns) {
dh::XGBCachingDeviceAllocator<char> alloc;
const auto& host_data = page.data.ConstHostVector();
dh::device_vector<Entry> sorted_entries(host_data.begin() + begin,
Expand All @@ -145,9 +194,10 @@ void ProcessBatch(int device, const SparsePage &page, size_t begin, size_t end,
batch_it, dummy_is_valid,
0, sorted_entries.size(),
&cuts_ptr, &column_sizes_scan);

auto d_cuts_ptr = cuts_ptr.DeviceSpan();
detail::RemoveDuplicatedCategories(device, info, d_cuts_ptr, &sorted_entries,
column_sizes_scan);
auto const& h_cuts_ptr = cuts_ptr.ConstHostVector();
auto d_cuts_ptr = cuts_ptr.ConstDeviceSpan();
CHECK_EQ(d_cuts_ptr.size(), column_sizes_scan.size());

// add cuts into sketches
Expand Down Expand Up @@ -221,6 +271,8 @@ void ProcessWeightedBatch(int device, const SparsePage& page,

HistogramCuts DeviceSketch(int device, DMatrix* dmat, int max_bins,
size_t sketch_batch_num_elements) {
dmat->Info().feature_types.SetDevice(device);
dmat->Info().feature_types.ConstDevicePointer(); // pull to device early
// Configure batch size based on available memory
bool has_weights = dmat->Info().weights_.Size() > 0;
size_t num_cuts_per_feature =
Expand All @@ -233,7 +285,7 @@ HistogramCuts DeviceSketch(int device, DMatrix* dmat, int max_bins,
device, num_cuts_per_feature, has_weights);

HistogramCuts cuts;
SketchContainer sketch_container(max_bins, dmat->Info().num_col_,
SketchContainer sketch_container(dmat->Info().feature_types, max_bins, dmat->Info().num_col_,
dmat->Info().num_row_, device);

dmat->Info().weights_.SetDevice(device);
Expand All @@ -253,8 +305,8 @@ HistogramCuts DeviceSketch(int device, DMatrix* dmat, int max_bins,
dmat->Info().num_col_,
is_ranking, dh::ToSpan(groups));
} else {
ProcessBatch(device, batch, begin, end, &sketch_container, num_cuts_per_feature,
dmat->Info().num_col_);
ProcessBatch(device, dmat->Info(), batch, begin, end, &sketch_container,
num_cuts_per_feature, dmat->Info().num_col_);
}
}
}
Expand Down
65 changes: 41 additions & 24 deletions src/common/quantile.cu
Expand Up @@ -15,6 +15,7 @@
#include "quantile.cuh"
#include "hist_util.h"
#include "device_helpers.cuh"
#include "categorical.h"
#include "common.h"

namespace xgboost {
Expand Down Expand Up @@ -57,6 +58,7 @@ void PruneImpl(int device,
common::Span<SketchContainer::OffsetT const> cuts_ptr,
Span<InEntry const> sorted_data,
Span<size_t const> columns_ptr_in, // could be ptr for data or cuts
Span<FeatureType const> feature_types,
Span<SketchEntry> out_cuts,
ToSketchEntry to_sketch_entry) {
dh::LaunchN(device, out_cuts.size(), [=] __device__(size_t idx) {
Expand All @@ -71,7 +73,8 @@ void PruneImpl(int device,
auto front = to_sketch_entry(0ul, in_column, column_id);
auto back = to_sketch_entry(in_column.size() - 1, in_column, column_id);

if (in_column.size() <= to) {
auto is_cat = IsCat(feature_types, column_id);
if (in_column.size() <= to || is_cat) {
// cut idx equals sample idx
out_column[idx] = to_sketch_entry(idx, in_column, column_id);
return;
Expand Down Expand Up @@ -316,7 +319,7 @@ void SketchContainer::Push(Span<Entry const> entries, Span<size_t> columns_ptr,
this->Current().resize(total_cuts);
out = dh::ToSpan(this->Current());
}

auto ft = this->feature_types_.ConstDeviceSpan();
if (weights.empty()) {
auto to_sketch_entry = [] __device__(size_t sample_idx,
Span<Entry const> const &column,
Expand All @@ -325,7 +328,7 @@ void SketchContainer::Push(Span<Entry const> entries, Span<size_t> columns_ptr,
float rmax = sample_idx + 1;
return SketchEntry{rmin, rmax, 1, column[sample_idx].fvalue};
}; // NOLINT
PruneImpl<Entry>(device_, cuts_ptr, entries, columns_ptr, out,
PruneImpl<Entry>(device_, cuts_ptr, entries, columns_ptr, ft, out,
to_sketch_entry);
} else {
auto to_sketch_entry = [weights, columns_ptr] __device__(
Expand All @@ -340,7 +343,7 @@ void SketchContainer::Push(Span<Entry const> entries, Span<size_t> columns_ptr,
wmin = wmin < 0 ? kRtEps : wmin; // GPU scan can generate floating error.
return SketchEntry{rmin, rmax, wmin, column[sample_idx].fvalue};
}; // NOLINT
PruneImpl<Entry>(device_, cuts_ptr, entries, columns_ptr, out,
PruneImpl<Entry>(device_, cuts_ptr, entries, columns_ptr, ft, out,
to_sketch_entry);
}

Expand Down Expand Up @@ -388,26 +391,31 @@ void SketchContainer::Prune(size_t to) {

this->Unique();
OffsetT to_total = 0;
HostDeviceVector<OffsetT> new_columns_ptr{to_total};
auto& h_columns_ptr = columns_ptr_b_.HostVector();
h_columns_ptr[0] = to_total;
auto const& h_feature_types = feature_types_.ConstHostSpan();
for (bst_feature_t i = 0; i < num_columns_; ++i) {
size_t length = this->Column(i).size();
length = std::min(length, to);
if (IsCat(h_feature_types, i)) {
length = this->Column(i).size();
}
to_total += length;
new_columns_ptr.HostVector().emplace_back(to_total);
h_columns_ptr[i+1] = to_total;
}
new_columns_ptr.SetDevice(device_);
this->Other().resize(to_total);

auto d_columns_ptr_in = this->columns_ptr_.ConstDeviceSpan();
auto d_columns_ptr_out = new_columns_ptr.ConstDeviceSpan();
auto d_columns_ptr_out = columns_ptr_b_.ConstDeviceSpan();
auto out = dh::ToSpan(this->Other());
auto in = dh::ToSpan(this->Current());
auto no_op = [] __device__(size_t sample_idx,
Span<SketchEntry const> const &entries,
size_t) { return entries[sample_idx]; }; // NOLINT
PruneImpl<SketchEntry>(device_, d_columns_ptr_out, in, d_columns_ptr_in, out,
no_op);
this->columns_ptr_.HostVector() = new_columns_ptr.HostVector();
auto ft = this->feature_types_.ConstDeviceSpan();
PruneImpl<SketchEntry>(device_, d_columns_ptr_out, in, d_columns_ptr_in, ft,
out, no_op);
this->columns_ptr_.Copy(columns_ptr_b_);
this->Alternate();
timer_.Stop(__func__);
}
Expand All @@ -433,15 +441,11 @@ void SketchContainer::Merge(Span<OffsetT const> d_that_columns_ptr,
this->Other().resize(this->Current().size() + that.size());
CHECK_EQ(d_that_columns_ptr.size(), this->columns_ptr_.Size());

HostDeviceVector<OffsetT> new_columns_ptr;
new_columns_ptr.SetDevice(device_);
new_columns_ptr.Resize(this->ColumnsPtr().size());
MergeImpl(device_, this->Data(), this->ColumnsPtr(),
that, d_that_columns_ptr,
dh::ToSpan(this->Other()), new_columns_ptr.DeviceSpan());
this->columns_ptr_ = std::move(new_columns_ptr);
dh::ToSpan(this->Other()), columns_ptr_b_.DeviceSpan());
this->columns_ptr_.Copy(columns_ptr_b_);
CHECK_EQ(this->columns_ptr_.Size(), num_columns_ + 1);
CHECK_EQ(new_columns_ptr.Size(), 0);
this->Alternate();
timer_.Stop(__func__);
}
Expand Down Expand Up @@ -528,7 +532,8 @@ void SketchContainer::AllReduce() {
}

// Merge them into a new sketch.
SketchContainer new_sketch(num_bins_, this->num_columns_, global_sum_rows,
SketchContainer new_sketch(this->feature_types_, num_bins_,
this->num_columns_, global_sum_rows,
this->device_);
for (size_t i = 0; i < allworkers.size(); ++i) {
auto worker = allworkers[i];
Expand Down Expand Up @@ -568,11 +573,16 @@ void SketchContainer::MakeCuts(HistogramCuts* p_cuts) {
auto& h_out_columns_ptr = p_cuts->cut_ptrs_.HostVector();
h_out_columns_ptr.clear();
h_out_columns_ptr.push_back(0);
auto const& h_feature_types = this->feature_types_.ConstHostSpan();
for (bst_feature_t i = 0; i < num_columns_; ++i) {
h_out_columns_ptr.push_back(
std::min(static_cast<size_t>(std::max(static_cast<size_t>(1ul),
this->Column(i).size())),
static_cast<size_t>(num_bins_)));
size_t column_size = std::max(static_cast<size_t>(1ul),
this->Column(i).size());
if (IsCat(h_feature_types, i)) {
h_out_columns_ptr.push_back(static_cast<size_t>(column_size));
} else {
h_out_columns_ptr.push_back(std::min(static_cast<size_t>(column_size),
static_cast<size_t>(num_bins_)));
}
}
std::partial_sum(h_out_columns_ptr.begin(), h_out_columns_ptr.end(),
h_out_columns_ptr.begin());
Expand All @@ -583,6 +593,7 @@ void SketchContainer::MakeCuts(HistogramCuts* p_cuts) {
p_cuts->cut_values_.SetDevice(device_);
p_cuts->cut_values_.Resize(total_bins);
auto out_cut_values = p_cuts->cut_values_.DeviceSpan();
auto d_ft = feature_types_.ConstDeviceSpan();

dh::LaunchN(0, total_bins, [=] __device__(size_t idx) {
auto column_id = dh::SegmentId(d_out_columns_ptr, idx);
Expand All @@ -605,11 +616,17 @@ void SketchContainer::MakeCuts(HistogramCuts* p_cuts) {
return;
}

// First thread is responsible for setting min values.
if (idx == 0) {
if (idx == 0 && !IsCat(d_ft, column_id)) {
auto mval = in_column[idx].value;
d_min_values[column_id] = mval - (fabs(mval) + 1e-5);
}

if (IsCat(d_ft, column_id)) {
assert(out_column.size() == in_column.size());
out_column[idx] = in_column[idx].value;
return;
}

// Last thread is responsible for setting a value that's greater than other cuts.
if (idx == out_column.size() - 1) {
const bst_float cpt = in_column.back().value;
Expand Down
23 changes: 20 additions & 3 deletions src/common/quantile.cuh
Expand Up @@ -4,6 +4,7 @@
#include <memory>

#include "xgboost/span.h"
#include "xgboost/data.h"
#include "device_helpers.cuh"
#include "quantile.h"
#include "timer.h"
Expand All @@ -28,6 +29,7 @@ class SketchContainer {
private:
Monitor timer_;
std::unique_ptr<dh::AllReducer> reducer_;
HostDeviceVector<FeatureType> feature_types_;
bst_row_t num_rows_;
bst_feature_t num_columns_;
int32_t num_bins_;
Expand All @@ -39,6 +41,7 @@ class SketchContainer {
bool current_buffer_ {true};
// The container is just a CSC matrix.
HostDeviceVector<OffsetT> columns_ptr_;
HostDeviceVector<OffsetT> columns_ptr_b_;

dh::caching_device_vector<SketchEntry>& Current() {
if (current_buffer_) {
Expand Down Expand Up @@ -80,12 +83,25 @@ class SketchContainer {
* \param num_rows Total number of rows in known dataset (typically the rows in current worker).
* \param device GPU ID.
*/
SketchContainer(int32_t max_bin, bst_feature_t num_columns, bst_row_t num_rows, int32_t device) :
num_rows_{num_rows}, num_columns_{num_columns}, num_bins_{max_bin}, device_{device} {
SketchContainer(HostDeviceVector<FeatureType> const& feature_types,
int32_t max_bin,
bst_feature_t num_columns, bst_row_t num_rows,
int32_t device)
: num_rows_{num_rows},
num_columns_{num_columns}, num_bins_{max_bin}, device_{device} {
CHECK_GE(device, 0);
// Initialize Sketches for this dmatrix
this->columns_ptr_.SetDevice(device_);
this->columns_ptr_.Resize(num_columns + 1);
CHECK_GE(device, 0);
this->columns_ptr_b_.SetDevice(device_);
this->columns_ptr_b_.Resize(num_columns + 1);

this->feature_types_.Resize(feature_types.Size());
this->feature_types_.Copy(feature_types);
// Pull to device.
this->feature_types_.SetDevice(device);
this->feature_types_.ConstDeviceSpan();
this->feature_types_.ConstHostSpan();
timer_.Init(__func__);
}
/* \brief Return GPU ID for this container. */
Expand Down Expand Up @@ -127,6 +143,7 @@ class SketchContainer {
Span<SketchEntry const> Data() const {
return {this->Current().data().get(), this->Current().size()};
}
HostDeviceVector<FeatureType> const& FeatureTypes() const { return feature_types_; }

Span<OffsetT const> ColumnsPtr() const { return this->columns_ptr_.ConstDeviceSpan(); }

Expand Down
8 changes: 6 additions & 2 deletions src/data/iterative_device_dmatrix.cu
Expand Up @@ -79,7 +79,8 @@ void IterativeDeviceDMatrix::Initialize(DataIterHandle iter_handle, float missin
} else {
CHECK_EQ(cols, num_cols()) << "Inconsistent number of columns.";
}
sketch_containers.emplace_back(batch_param_.max_bin, cols, num_rows(), get_device());
sketch_containers.emplace_back(proxy->Info().feature_types,
batch_param_.max_bin, cols, num_rows(), get_device());
auto* p_sketch = &sketch_containers.back();
proxy->Info().weights_.SetDevice(get_device());
Dispatch(proxy, [&](auto const &value) {
Expand All @@ -101,7 +102,10 @@ void IterativeDeviceDMatrix::Initialize(DataIterHandle iter_handle, float missin
}
iter.Reset();
dh::safe_cuda(cudaSetDevice(get_device()));
common::SketchContainer final_sketch(batch_param_.max_bin, cols, accumulated_rows, get_device());
HostDeviceVector<FeatureType> ft;
common::SketchContainer final_sketch(
sketch_containers.empty() ? ft : sketch_containers.front().FeatureTypes(),
batch_param_.max_bin, cols, accumulated_rows, get_device());
for (auto const& sketch : sketch_containers) {
final_sketch.Merge(sketch.ColumnsPtr(), sketch.Data());
final_sketch.FixError();
Expand Down