/
ellpack_page_source.cu
89 lines (77 loc) · 3.14 KB
/
ellpack_page_source.cu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/*!
* Copyright 2019 XGBoost contributors
*/
#include <memory>
#include <utility>
#include "../common/hist_util.cuh"
#include "ellpack_page.cuh"
#include "ellpack_page_source.h"
#include "sparse_page_source.h"
namespace xgboost {
namespace data {
// Build the quantile sketch across the whole input data, then use the histogram cuts to compress
// each CSR page, and write the accumulated ELLPACK pages to disk.
EllpackPageSource::EllpackPageSource(DMatrix* dmat,
const std::string& cache_info,
const BatchParam& param) noexcept(false) {
cache_info_ = ParseCacheInfo(cache_info, kPageType_);
for (auto file : cache_info_.name_shards) {
CheckCacheFileExists(file);
}
if (param.gpu_page_size > 0) {
page_size_ = param.gpu_page_size;
}
monitor_.Init("ellpack_page_source");
dh::safe_cuda(cudaSetDevice(param.gpu_id));
monitor_.Start("Quantiles");
size_t row_stride = GetRowStride(dmat);
auto cuts = common::DeviceSketch(param.gpu_id, dmat, param.max_bin);
monitor_.Stop("Quantiles");
monitor_.Start("WriteEllpackPages");
WriteEllpackPages(param.gpu_id, dmat, cuts, cache_info, row_stride);
monitor_.Stop("WriteEllpackPages");
external_prefetcher_.reset(
new ExternalMemoryPrefetcher<EllpackPage>(cache_info_));
}
// Compress each CSR page to ELLPACK, and write the accumulated pages to disk.
void EllpackPageSource::WriteEllpackPages(int device, DMatrix* dmat,
const common::HistogramCuts& cuts,
const std::string& cache_info,
size_t row_stride) const {
auto cinfo = ParseCacheInfo(cache_info, kPageType_);
const size_t extra_buffer_capacity = 6;
SparsePageWriter<EllpackPage> writer(cinfo.name_shards, cinfo.format_shards,
extra_buffer_capacity);
std::shared_ptr<EllpackPage> page;
SparsePage temp_host_page;
writer.Alloc(&page);
auto* impl = page->Impl();
auto ft = dmat->Info().feature_types.ConstDeviceSpan();
size_t bytes_write = 0;
double tstart = dmlc::GetTime();
for (const auto& batch : dmat->GetBatches<SparsePage>()) {
temp_host_page.Push(batch);
size_t mem_cost_bytes =
EllpackPageImpl::MemCostBytes(temp_host_page.Size(), row_stride, cuts);
if (mem_cost_bytes >= page_size_) {
bytes_write += mem_cost_bytes;
*impl = EllpackPageImpl(device, cuts, temp_host_page, dmat->IsDense(),
row_stride, ft);
writer.PushWrite(std::move(page));
writer.Alloc(&page);
impl = page->Impl();
temp_host_page.Clear();
double tdiff = dmlc::GetTime() - tstart;
LOG(INFO) << "Writing " << kPageType_ << " to " << cache_info << " in "
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " written";
}
}
if (temp_host_page.Size() != 0) {
*impl = EllpackPageImpl(device, cuts, temp_host_page, dmat->IsDense(),
row_stride, ft);
writer.PushWrite(std::move(page));
}
}
} // namespace data
} // namespace xgboost