Skip to content

Commit

Permalink
Rewrite sparse dmatrix using callbacks. (#7092)
Browse files Browse the repository at this point in the history
- Reduce dependency on dmlc parsers and provide an interface for users to load data by themselves.
- Remove use of threaded iterator and IO queue.
- Remove `page_size`.
- Make sure the number of pages in memory is bounded.
- Make sure the cache can not be violated.
- Provide an interface for internal algorithms to process data asynchronously.
  • Loading branch information
trivialfis committed Jul 16, 2021
1 parent 2f524e9 commit bd1f3a3
Show file tree
Hide file tree
Showing 51 changed files with 1,436 additions and 1,382 deletions.
8 changes: 2 additions & 6 deletions amalgamation/xgboost-all0.cc
Expand Up @@ -37,18 +37,14 @@
#include "../src/data/simple_dmatrix.cc"
#include "../src/data/sparse_page_raw_format.cc"
#include "../src/data/ellpack_page.cc"
#include "../src/data/ellpack_page_source.cc"
#include "../src/data/gradient_index.cc"
#include "../src/data/sparse_page_dmatrix.cc"
#include "../src/data/proxy_dmatrix.cc"

// prediction
#include "../src/predictor/predictor.cc"
#include "../src/predictor/cpu_predictor.cc"

#if DMLC_ENABLE_STD_THREAD
#include "../src/data/sparse_page_dmatrix.cc"
#include "../src/data/sparse_page_source.cc"
#endif

// trees
#include "../src/tree/param.cc"
#include "../src/tree/tree_model.cc"
Expand Down
117 changes: 97 additions & 20 deletions include/xgboost/c_api.h
Expand Up @@ -223,19 +223,31 @@ XGB_DLL int XGDMatrixCreateFromDT(void** data,
* - XGBCallbackDataIterNext
* - XGDMatrixCreateFromDataIter
*
* Another set is used by Quantile based DMatrix (used by hist algorithm) for reducing
* memory usage. Currently only GPU implementation is available. It accept foreign data
* iterators as callbacks and works similar to external memory. For GPU Hist, the data is
* first compressed by quantile sketching then merged. This is particular useful for
* distributed setting as it eliminates 2 copies of data. 1 by a `concat` from external
* library to make the data into a blob for normal DMatrix initialization, another by the
* internal CSR copy of DMatrix. Related functions are:
*
* Another set is used by external data iterator. It accept foreign data iterators as
* callbacks. There are 2 different senarios where users might want to pass in callbacks
* instead of raw data. First it's the Quantile DMatrix used by GPU Hist. For this case,
* the data is first compressed by quantile sketching then merged. This is particular
* useful for distributed setting as it eliminates 2 copies of data. 1 by a `concat` from
* external library to make the data into a blob for normal DMatrix initialization,
* another by the internal CSR copy of DMatrix. The second use case is external memory
* support where users can pass a custom data iterator into XGBoost for loading data in
* batches. There are short notes on each of the use case in respected DMatrix factory
* function.
*
* Related functions are:
*
* # Factory functions
* - `XGDMatrixCreateFromCallback` for external memory
* - `XGDeviceQuantileDMatrixCreateFromCallback` for quantile DMatrix
*
* # Proxy that callers can use to pass data to XGBoost
* - XGProxyDMatrixCreate
* - XGDMatrixCallbackNext
* - DataIterResetCallback
* - XGProxyDMatrixSetDataCudaArrayInterface
* - XGProxyDMatrixSetDataCudaColumnar
* - XGProxyDMatrixSetDataDense
* - XGProxyDMatrixSetDataCSR
* - ... (data setters)
*/

Expand Down Expand Up @@ -308,17 +320,9 @@ XGB_DLL int XGDMatrixCreateFromDataIter(
const char* cache_info,
DMatrixHandle *out);

/* == Second set of callback functions, used by constructing Quantile based DMatrix. ===
*
* Short note for how to use the second set of callback for GPU Hist tree method.
*
* Step 0: Define a data iterator with 2 methods `reset`, and `next`.
* Step 1: Create a DMatrix proxy by `XGProxyDMatrixCreate` and hold the handle.
* Step 2: Pass the iterator handle, proxy handle and 2 methods into
* `XGDeviceQuantileDMatrixCreateFromCallback`.
* Step 3: Call appropriate data setters in `next` functions.
*
* See test_iterative_device_dmatrix.cu or Python interface for examples.
/**
* Second set of callback functions, used by constructing Quantile DMatrix or external
* memory DMatrix using custom iterator.
*/

/*!
Expand All @@ -344,8 +348,53 @@ XGB_EXTERN_C typedef int XGDMatrixCallbackNext(DataIterHandle iter); // NOLINT(
*/
XGB_EXTERN_C typedef void DataIterResetCallback(DataIterHandle handle); // NOLINT(*)


/*!
* \brief Create a device DMatrix with data iterator.
* \brief Create an external memory DMatrix with data iterator.
*
* Short note for how to use second set of callback for external memory data support:
*
* - Step 0: Define a data iterator with 2 methods `reset`, and `next`.
* - Step 1: Create a DMatrix proxy by `XGProxyDMatrixCreate` and hold the handle.
* - Step 2: Pass the iterator handle, proxy handle and 2 methods into
* `XGDMatrixCreateFromCallback`, along with other parameters encoded as a JSON object.
* - Step 3: Call appropriate data setters in `next` functions.
*
* For example usage see demo/c-api/external-memory
*
* \param iter A handle to external data iterator.
* \param proxy A DMatrix proxy handle created by `XGProxyDMatrixCreate`.
* \param reset Callback function resetting the iterator state.
* \param next Callback function yielding the next batch of data.
* \param c_json_config JSON encoded parameters for DMatrix construction. Accepted fields are:
*
* - missing: Which value to represent missing value
* - cache_prefix: The path of cache file, caller must initialize all the directories in this path.
* - nthread (optional): Number of threads used for initializing DMatrix.
*
* \param out The created external memory DMatrix
*
* \return 0 when success, -1 when failure happens
*/
XGB_DLL int XGDMatrixCreateFromCallback(DataIterHandle iter,
DMatrixHandle proxy,
DataIterResetCallback *reset,
XGDMatrixCallbackNext *next,
char const* c_json_config,
DMatrixHandle *out);

/*!
* \brief Create a Quantile DMatrix with data iterator.
*
* Short note for how to use the second set of callback for GPU Hist tree method:
*
* - Step 0: Define a data iterator with 2 methods `reset`, and `next`.
* - Step 1: Create a DMatrix proxy by `XGProxyDMatrixCreate` and hold the handle.
* - Step 2: Pass the iterator handle, proxy handle and 2 methods into
* `XGDeviceQuantileDMatrixCreateFromCallback`.
* - Step 3: Call appropriate data setters in `next` functions.
*
* See test_iterative_device_dmatrix.cu or Python interface for examples.
*
* \param iter A handle to external data iterator.
* \param proxy A DMatrix proxy handle created by `XGProxyDMatrixCreate`.
Expand All @@ -362,6 +411,7 @@ XGB_DLL int XGDeviceQuantileDMatrixCreateFromCallback(
DataIterHandle iter, DMatrixHandle proxy, DataIterResetCallback *reset,
XGDMatrixCallbackNext *next, float missing, int nthread, int max_bin,
DMatrixHandle *out);

/*!
* \brief Set data on a DMatrix proxy.
*
Expand All @@ -387,6 +437,33 @@ XGProxyDMatrixSetDataCudaArrayInterface(DMatrixHandle handle,
XGB_DLL int XGProxyDMatrixSetDataCudaColumnar(DMatrixHandle handle,
const char *c_interface_str);

/*!
* \brief Set data on a DMatrix proxy.
*
* \param handle A DMatrix proxy created by XGProxyDMatrixCreate
* \param c_interface_str Null terminated JSON document string representation of array
* interface.
*
* \return 0 when success, -1 when failure happens
*/
XGB_DLL int XGProxyDMatrixSetDataDense(DMatrixHandle handle,
char const *c_interface_str);

/*!
* \brief Set data on a DMatrix proxy.
*
* \param handle A DMatrix proxy created by XGProxyDMatrixCreate
* \param indptr JSON encoded __array_interface__ to row pointer in CSR.
* \param indices JSON encoded __array_interface__ to column indices in CSR.
* \param values JSON encoded __array_interface__ to values in CSR..
*
* \return 0 when success, -1 when failure happens
*/
XGB_DLL int XGProxyDMatrixSetDataCSR(DMatrixHandle handle, char const *indptr,
char const *indices, char const *data,
bst_ulong ncol);


/*
* ==========================- End data callback APIs ==========================
*/
Expand Down
68 changes: 47 additions & 21 deletions include/xgboost/data.h
Expand Up @@ -171,9 +171,12 @@ class MetaInfo {
* \param that The other MetaInfo object.
*
* \param accumulate_rows Whether rows need to be accumulated in this function. If
* client code knows number of rows in advance, set this parameter to false.
* client code knows number of rows in advance, set this
* parameter to false.
* \param check_column Whether the extend method should check the consistency of
* columns.
*/
void Extend(MetaInfo const& that, bool accumulate_rows);
void Extend(MetaInfo const& that, bool accumulate_rows, bool check_column);

private:
/*! \brief argsort of labels */
Expand Down Expand Up @@ -211,14 +214,12 @@ struct BatchParam {
int gpu_id;
/*! \brief Maximum number of bins per feature for histograms. */
int max_bin{0};
/*! \brief Page size for external memory mode. */
size_t gpu_page_size;
BatchParam() = default;
BatchParam(int32_t device, int32_t max_bin, size_t gpu_page_size = 0)
: gpu_id{device}, max_bin{max_bin}, gpu_page_size{gpu_page_size} {}
inline bool operator!=(const BatchParam& other) const {
return gpu_id != other.gpu_id || max_bin != other.max_bin ||
gpu_page_size != other.gpu_page_size;
BatchParam(int32_t device, int32_t max_bin)
: gpu_id{device}, max_bin{max_bin} {}

bool operator!=(const BatchParam& other) const {
return gpu_id != other.gpu_id || max_bin != other.max_bin;
}
};

Expand Down Expand Up @@ -390,27 +391,25 @@ class GHistIndexMatrix;
template<typename T>
class BatchIteratorImpl {
public:
using iterator_category = std::forward_iterator_tag; // NOLINT
virtual ~BatchIteratorImpl() = default;
virtual T& operator*() = 0;
virtual const T& operator*() const = 0;
virtual void operator++() = 0;
virtual BatchIteratorImpl& operator++() = 0;
virtual bool AtEnd() const = 0;
virtual std::shared_ptr<T const> Page() const = 0;
};

template<typename T>
class BatchIterator {
public:
using iterator_category = std::forward_iterator_tag; // NOLINT
explicit BatchIterator(BatchIteratorImpl<T>* impl) { impl_.reset(impl); }
explicit BatchIterator(std::shared_ptr<BatchIteratorImpl<T>> impl) { impl_ = impl; }

void operator++() {
BatchIterator &operator++() {
CHECK(impl_ != nullptr);
++(*impl_);
}

T& operator*() {
CHECK(impl_ != nullptr);
return *(*impl_);
return *this;
}

const T& operator*() const {
Expand All @@ -428,6 +427,10 @@ class BatchIterator {
return impl_->AtEnd();
}

std::shared_ptr<T const> Page() const {
return impl_->Page();
}

private:
std::shared_ptr<BatchIteratorImpl<T>> impl_;
};
Expand Down Expand Up @@ -499,8 +502,7 @@ class DMatrix {
static DMatrix* Load(const std::string& uri,
bool silent,
bool load_row_split,
const std::string& file_format = "auto",
size_t page_size = kPageSize);
const std::string& file_format = "auto");

/**
* \brief Creates a new DMatrix from an external data adapter.
Expand All @@ -516,8 +518,7 @@ class DMatrix {
*/
template <typename AdapterT>
static DMatrix* Create(AdapterT* adapter, float missing, int nthread,
const std::string& cache_prefix = "",
size_t page_size = kPageSize);
const std::string& cache_prefix = "");

/**
* \brief Create a new Quantile based DMatrix used for histogram based algorithm.
Expand Down Expand Up @@ -545,6 +546,31 @@ class DMatrix {
int nthread,
int max_bin);

/**
* \brief Create an external memory DMatrix with callbacks.
*
* \tparam DataIterHandle External iterator type, defined in C API.
* \tparam DMatrixHandle DMatrix handle, defined in C API.
* \tparam DataIterResetCallback Callback for reset, prototype defined in C API.
* \tparam XGDMatrixCallbackNext Callback for next, prototype defined in C API.
*
* \param iter External data iterator
* \param proxy A hanlde to ProxyDMatrix
* \param reset Callback for reset
* \param next Callback for next
* \param missing Value that should be treated as missing.
* \param nthread number of threads used for initialization.
* \param cache Prefix of cache file path.
*
* \return A created external memory DMatrix.
*/
template <typename DataIterHandle, typename DMatrixHandle,
typename DataIterResetCallback, typename XGDMatrixCallbackNext>
static DMatrix *Create(DataIterHandle iter, DMatrixHandle proxy,
DataIterResetCallback *reset,
XGDMatrixCallbackNext *next, float missing,
int32_t nthread, std::string cache);

virtual DMatrix *Slice(common::Span<int32_t const> ridxs) = 0;
/*! \brief Number of rows per page in external memory. Approximately 100MB per page for
* dataset with 100 features. */
Expand Down
6 changes: 0 additions & 6 deletions include/xgboost/generic_parameters.h
Expand Up @@ -29,8 +29,6 @@ struct GenericParameter : public XGBoostParameter<GenericParameter> {
int gpu_id;
// fail when gpu_id is invalid
bool fail_on_invalid_gpu_id {false};
// gpu page size in external memory mode, 0 means using the default.
size_t gpu_page_size;
bool validate_parameters {false};

/*!
Expand Down Expand Up @@ -66,10 +64,6 @@ struct GenericParameter : public XGBoostParameter<GenericParameter> {
DMLC_DECLARE_FIELD(fail_on_invalid_gpu_id)
.set_default(false)
.describe("Fail with error when gpu_id is invalid.");
DMLC_DECLARE_FIELD(gpu_page_size)
.set_default(0)
.set_lower_bound(0)
.describe("GPU page size when running in external memory mode.");
DMLC_DECLARE_FIELD(validate_parameters)
.set_default(false)
.describe("Enable checking whether parameters are used or not.");
Expand Down

0 comments on commit bd1f3a3

Please sign in to comment.