Skip to content

Commit

Permalink
Use iterative DMatrix callbacks for external memory.
Browse files Browse the repository at this point in the history
* Re-implement sparse page DMatrix with callbacks.
* Define async fetching ring.
* Expose data iterator support in Python.
* [Breaking] Rename some methods.
* Add demo and tutorial on how to use the iterator.
  • Loading branch information
trivialfis committed Jul 6, 2021
1 parent f937f51 commit 54b4610
Show file tree
Hide file tree
Showing 61 changed files with 1,984 additions and 1,383 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/main.yml
Expand Up @@ -102,8 +102,7 @@ jobs:
cd build
cmake .. -GNinja -DCMAKE_PREFIX_PATH=$CONDA_PREFIX
ninja -v
cd ..
./build/api-demo
ctest
lint:
runs-on: ubuntu-latest
Expand Down
8 changes: 2 additions & 6 deletions amalgamation/xgboost-all0.cc
Expand Up @@ -37,18 +37,14 @@
#include "../src/data/simple_dmatrix.cc"
#include "../src/data/sparse_page_raw_format.cc"
#include "../src/data/ellpack_page.cc"
#include "../src/data/ellpack_page_source.cc"
#include "../src/data/gradient_index.cc"
#include "../src/data/sparse_page_dmatrix.cc"
#include "../src/data/proxy_dmatrix.cc"

// prediction
#include "../src/predictor/predictor.cc"
#include "../src/predictor/cpu_predictor.cc"

#if DMLC_ENABLE_STD_THREAD
#include "../src/data/sparse_page_dmatrix.cc"
#include "../src/data/sparse_page_source.cc"
#endif

// trees
#include "../src/tree/param.cc"
#include "../src/tree/tree_model.cc"
Expand Down
20 changes: 16 additions & 4 deletions demo/c-api/CMakeLists.txt
@@ -1,5 +1,17 @@
cmake_minimum_required(VERSION 3.13)
project(api-demo LANGUAGES C CXX VERSION 0.0.1)
find_package(xgboost REQUIRED)
add_executable(api-demo c-api-demo.c)
target_link_libraries(api-demo PRIVATE xgboost::xgboost)
project(xgboost-c-examples)

add_subdirectory(basic)
add_subdirectory(external-memory)

enable_testing()
add_test(
NAME test_xgboost_demo_c_basic
COMMAND api-demo
WORKING_DIRECTORY ${xgboost-c-examples_BINARY_DIR}
)
add_test(
NAME test_xgboost_demo_c_external_memory
COMMAND external-memory-demo
WORKING_DIRECTORY ${xgboost-c-examples_BINARY_DIR}
)
6 changes: 6 additions & 0 deletions demo/c-api/basic/CMakeLists.txt
@@ -0,0 +1,6 @@
cmake_minimum_required(VERSION 3.13)
project(api-demo LANGUAGES C CXX VERSION 0.0.1)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/../../../cmake/modules")
find_package(xgboost REQUIRED)
add_executable(api-demo c-api-demo.c)
target_link_libraries(api-demo PRIVATE xgboost::xgboost)
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions demo/c-api/c-api-demo.c → demo/c-api/basic/c-api-demo.c
Expand Up @@ -24,8 +24,8 @@ int main(int argc, char** argv) {

// load the data
DMatrixHandle dtrain, dtest;
safe_xgboost(XGDMatrixCreateFromFile("../data/agaricus.txt.train", silent, &dtrain));
safe_xgboost(XGDMatrixCreateFromFile("../data/agaricus.txt.test", silent, &dtest));
safe_xgboost(XGDMatrixCreateFromFile("../../data/agaricus.txt.train", silent, &dtrain));
safe_xgboost(XGDMatrixCreateFromFile("../../data/agaricus.txt.test", silent, &dtest));

// create the booster
BoosterHandle booster;
Expand Down
9 changes: 9 additions & 0 deletions demo/c-api/external-memory/CMakeLists.txt
@@ -0,0 +1,9 @@
cmake_minimum_required(VERSION 3.13)
project(external-memory-demo LANGUAGES C CXX VERSION 0.0.1)

list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/../../../cmake/modules")
find_package(OpenMP REQUIRED)
find_package(xgboost REQUIRED)

add_executable(external-memory-demo external_memory.c)
target_link_libraries(external-memory-demo PRIVATE xgboost::xgboost OpenMP::OpenMP_CXX)
10 changes: 10 additions & 0 deletions demo/c-api/external-memory/README.md
@@ -0,0 +1,10 @@
Data Callback
=============

A simple demo for using custom data iterator with XGBoost. The primary function for this
is external-memory training with user provided data loaders. In the example, we have
defined a custom data iterator with 2 methods: `reset` and `next`. The `next` method
passes data into XGBoost and tells XGBoost whether the iterator has reached its end.
During training, XGBoost will generate some caches for internal data structures in current
directory, which can be changed by `cache_prefix` parameter during construction of
`DMatrix`.
177 changes: 177 additions & 0 deletions demo/c-api/external-memory/external_memory.c
@@ -0,0 +1,177 @@
/*!
* Copyright 2021 XGBoost contributors
*
* \brief A simple example of using xgboost data callback API.
*/

#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#include <xgboost/c_api.h>

#define safe_xgboost(err) \
if ((err) != 0) { \
fprintf(stderr, "%s:%d: error in %s: %s\n", __FILE__, __LINE__, #err, \
XGBGetLastError()); \
exit(1); \
}

#define N_BATCHS 32
#define BATCH_LEN 512

/* Shorthands. */
typedef DMatrixHandle DMatrix;
typedef BoosterHandle Booster;

typedef struct _DataIter {
/* Data of each batch. */
float **data;
/* Labels of each batch */
float **labels;
/* Length of each batch. */
size_t *lengths;
/* Total number of batches. */
size_t n;
/* Current iteration. */
size_t cur_it;

/* Private fields */
DMatrix _proxy;
char _array[128];
} DataIter;

#define safe_malloc(ptr) \
if ((ptr) == NULL) { \
fprintf(stderr, "%s:%d: Failed to allocate memory.\n", __FILE__, \
__LINE__); \
exit(1); \
}

/**
* Initialize with random data for demo. In practice the data should be loaded
* from external memory. We jsut demonstrate how to use the iterator in
* XGBoost.
*
* \param batch_size Number of elements for each batch. The demo here is only using 1
* column.
* \param n_batches Number of batches.
*/
void DataIterator_Init(DataIter *self, size_t batch_size, size_t n_batches) {
self->n = n_batches;

self->lengths = (size_t *)malloc(self->n * sizeof(size_t));
safe_malloc(self->lengths);
for (size_t i = 0; i < self->n; ++i) {
self->lengths[i] = batch_size;
}

self->data = (float **)malloc(self->n * sizeof(float *));
safe_malloc(self->data);
self->labels = (float **)malloc(self->n * sizeof(float *));
safe_malloc(self->labels);

/* Generate some random data. */
for (size_t i = 0; i < self->n; ++i) {
self->data[i] = (float *)malloc(self->lengths[i] * sizeof(float));
safe_malloc(self->data[i]);
for (size_t j = 0; j < self->lengths[i]; ++j) {
float x = (float)rand() / (float)(RAND_MAX);
self->data[i][j] = x;
}

self->labels[i] = (float *)malloc(self->lengths[i] * sizeof(float));
safe_malloc(self->labels[i]);
for (size_t j = 0; j < self->lengths[i]; ++j) {
float y = (float)rand() / (float)(RAND_MAX);
self->labels[i][j] = y;
}
}

self->cur_it = 0;
safe_xgboost(XGProxyDMatrixCreate(&self->_proxy));
}

void DataIterator_Free(DataIter *self) {
for (size_t i = 0; i < self->n; ++i) {
free(self->data[i]);
free(self->labels[i]);
}
free(self->data);
free(self->lengths);
safe_xgboost(XGDMatrixFree(self->_proxy));
};

int DataIterator_Next(DataIterHandle handle) {
DataIter *self = (DataIter *)(handle);
if (self->cur_it == self->n) {
self->cur_it = 0;
return 0; /* At end */
}

/* A JSON string encoding array interface (standard from numpy). */
char array[] = "{\"data\": [%lu, false], \"shape\":[%lu, 1], \"typestr\": "
"\"<f4\", \"version\": 3}";
memset(self->_array, '\0', sizeof(self->_array));
sprintf(self->_array, array, (size_t)self->data[self->cur_it],
self->lengths[self->cur_it]);

safe_xgboost(XGProxyDMatrixSetDataDense(self->_proxy, self->_array));
safe_xgboost(XGDMatrixSetDenseInfo(self->_proxy, "label",
self->labels[self->cur_it],
self->lengths[self->cur_it], 1));
self->cur_it++;
return 1; /* Continue. */
}

void DataIterator_Reset(DataIterHandle handle) {
DataIter *self = (DataIter *)(handle);
self->cur_it = 0;
}

/**
* Train a regression model and save it into JSON model file.
*/
void TrainModel(DMatrix Xy) {
/* Create booster for training. */
Booster booster;
DMatrix cache[] = {Xy};
safe_xgboost(XGBoosterCreate(cache, 1, &booster));
/* Use approx for external memory training. */
safe_xgboost(XGBoosterSetParam(booster, "tree_method", "approx"));
safe_xgboost(XGBoosterSetParam(booster, "objective", "reg:squarederror"));

/* Start training. */
const char *validation_names[1] = {"train"};
const char *validation_result = NULL;
size_t n_rounds = 10;
for (size_t i = 0; i < n_rounds; ++i) {
safe_xgboost(XGBoosterUpdateOneIter(booster, i, Xy));
safe_xgboost(XGBoosterEvalOneIter(booster, i, cache, validation_names, 1,
&validation_result));
printf("%s\n", validation_result);
}

/* Save the model to a JSON file. */
safe_xgboost(XGBoosterSaveModel(booster, "model.json"));

safe_xgboost(XGBoosterFree(booster));
}

int main() {
DataIter iter;
DataIterator_Init(&iter, BATCH_LEN, N_BATCHS);

/* Create DMatrix from iterator. During training, some cache files with the
* prefix "cache-" will be generated in current directory */
char config[] = "{\"missing\": NaN, \"cache_prefix\": \"cache\"}";
DMatrix Xy;
safe_xgboost(XGDMatrixCreateFromCallback(
&iter, iter._proxy, DataIterator_Reset, DataIterator_Next, config, &Xy));

TrainModel(Xy);

safe_xgboost(XGDMatrixFree(Xy));

DataIterator_Free(&iter);
return 0;
}
41 changes: 32 additions & 9 deletions demo/guide-python/external_memory.py
@@ -1,22 +1,45 @@
import os
import xgboost as xgb
import numpy as np

### simple example for using external memory version
## simple example for using external memory version

# this is the only difference, add a # followed by a cache prefix name
# several cache file with the prefix will be generated
# currently only support convert from libsvm file
CURRENT_DIR = os.path.dirname(__file__)
dtrain = xgb.DMatrix(os.path.join(CURRENT_DIR, '../data/agaricus.txt.train#dtrain.cache'))
dtest = xgb.DMatrix(os.path.join(CURRENT_DIR, '../data/agaricus.txt.test#dtest.cache'))
dtrain = xgb.DMatrix(
os.path.join(CURRENT_DIR, "../data/agaricus.txt.train?indexing_mode=1#dtrain.cache")
)
assert dtrain.num_col() == 126
dtest = xgb.DMatrix(
os.path.join(CURRENT_DIR, "../data/agaricus.txt.test?indexing_mode=1#dtest.cache")
)
assert dtest.num_col() == 126

# performance notice: set nthread to be the number of your real cpu some cpu offer two
# threads per core, for example, a 4 core cpu with 8 threads, in such case set nthread=4
# param['nthread'] = num_real_cpu
param = {
"max_depth": 2,
"eta": 1,
"objective": "binary:logistic",
"tree_method": "approx",
}

# specify validations set to watch performance
param = {'max_depth':2, 'eta':1, 'objective':'binary:logistic'}
watchlist = [(dtest, "eval"), (dtrain, "train")]
num_round = 2
from_ext = xgb.train(param, dtrain, num_round, watchlist)
predt_from_ext = from_ext.predict(dtrain)

# performance notice: set nthread to be the number of your real cpu
# some cpu offer two threads per core, for example, a 4 core cpu with 8 threads, in such case set nthread=4
#param['nthread']=num_real_cpu
dtrain = xgb.DMatrix(os.path.join(CURRENT_DIR, "../data/agaricus.txt.train"))
dtest = xgb.DMatrix(os.path.join(CURRENT_DIR, "../data/agaricus.txt.test"))

watchlist = [(dtest, 'eval'), (dtrain, 'train')]
watchlist = [(dtest, "eval"), (dtrain, "train")]
num_round = 2
bst = xgb.train(param, dtrain, num_round, watchlist)
from_blob = xgb.train(param, dtrain, num_round, watchlist)

predt_from_blob = from_blob.predict(dtrain)

np.testing.assert_allclose(predt_from_ext, predt_from_blob)

0 comments on commit 54b4610

Please sign in to comment.