forked from dmlc/xgboost
/
data_iterator.py
111 lines (80 loc) · 3.25 KB
/
data_iterator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
'''A demo for defining data iterator.
.. versionadded:: 1.2.0
The demo that defines a customized iterator for passing batches of data into
`xgboost.DeviceQuantileDMatrix` and use this `DeviceQuantileDMatrix` for
training. The feature is used primarily designed to reduce the required GPU
memory for training on distributed environment.
Aftering going through the demo, one might ask why don't we use more native
Python iterator? That's because XGBoost requires a `reset` function, while
using `itertools.tee` might incur significant memory usage according to:
https://docs.python.org/3/library/itertools.html#itertools.tee.
'''
import xgboost
import cupy
import numpy
COLS = 64
ROWS_PER_BATCH = 1000 # data is splited by rows
BATCHES = 32
class IterForDMatrixDemo(xgboost.core.DataIter):
'''A data iterator for XGBoost DMatrix.
`reset` and `next` are required for any data iterator, other functions here
are utilites for demonstration's purpose.
'''
def __init__(self):
'''Generate some random data for demostration.
Actual data can be anything that is currently supported by XGBoost.
'''
self.rows = ROWS_PER_BATCH
self.cols = COLS
rng = cupy.random.RandomState(1994)
self._data = [rng.randn(self.rows, self.cols)] * BATCHES
self._labels = [rng.randn(self.rows)] * BATCHES
self._weights = [rng.uniform(size=self.rows)] * BATCHES
self.it = 0 # set iterator to 0
super().__init__()
def as_array(self):
return cupy.concatenate(self._data)
def as_array_labels(self):
return cupy.concatenate(self._labels)
def as_array_weights(self):
return cupy.concatenate(self._weights)
def data(self):
'''Utility function for obtaining current batch of data.'''
return self._data[self.it]
def labels(self):
'''Utility function for obtaining current batch of label.'''
return self._labels[self.it]
def weights(self):
return self._weights[self.it]
def reset(self):
'''Reset the iterator'''
self.it = 0
def next(self, input_data):
'''Yield next batch of data.'''
if self.it == len(self._data):
# Return 0 when there's no more batch.
return 0
input_data(data=self.data(), label=self.labels(),
weight=self.weights())
self.it += 1
return 1
def main():
rounds = 100
it = IterForDMatrixDemo()
# Use iterator, must be `DeviceQuantileDMatrix`
m_with_it = xgboost.DeviceQuantileDMatrix(it)
# Use regular DMatrix.
m = xgboost.DMatrix(it.as_array(), it.as_array_labels(),
weight=it.as_array_weights())
assert m_with_it.num_col() == m.num_col()
assert m_with_it.num_row() == m.num_row()
reg_with_it = xgboost.train({'tree_method': 'gpu_hist'}, m_with_it,
num_boost_round=rounds)
predict_with_it = reg_with_it.predict(m_with_it)
reg = xgboost.train({'tree_method': 'gpu_hist'}, m,
num_boost_round=rounds)
predict = reg.predict(m)
numpy.testing.assert_allclose(predict_with_it, predict,
rtol=1e6)
if __name__ == '__main__':
main()