forked from dmlc/xgboost
-
Notifications
You must be signed in to change notification settings - Fork 0
/
engine.h
250 lines (246 loc) · 10.7 KB
/
engine.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
/*!
* Copyright (c) 2014 by Contributors
* \file engine.h
* \brief This file defines the core interface of rabit library
* \author Tianqi Chen, Nacho, Tianyi
*/
#ifndef RABIT_INTERNAL_ENGINE_H_
#define RABIT_INTERNAL_ENGINE_H_
#include <string>
#include "rabit/serializable.h"
namespace MPI { // NOLINT
/*! \brief MPI data type just to be compatible with MPI reduce function*/
class Datatype;
}
/*! \brief namespace of rabit */
namespace rabit {
/*! \brief core interface of the engine */
namespace engine {
/*! \brief interface of core Allreduce engine */
class IEngine {
public:
/*!
* \brief Preprocessing function, that is called before AllReduce,
* used to prepare the data used by AllReduce
* \param arg additional possible argument used to invoke the preprocessor
*/
typedef void (PreprocFunction) (void *arg); // NOLINT
/*!
* \brief reduce function, the same form of MPI reduce function is used,
* to be compatible with MPI interface
* In all the functions, the memory is ensured to aligned to 64-bit
* which means it is OK to cast src,dst to double* int* etc
* \param src pointer to source space
* \param dst pointer to destination reduction
* \param count total number of elements to be reduced (note this is total number of elements instead of bytes)
* the definition of the reduce function should be type aware
* \param dtype the data type object, to be compatible with MPI reduce
*/
typedef void (ReduceFunction) (const void *src, // NOLINT
void *dst, int count,
const MPI::Datatype &dtype);
/*! \brief virtual destructor */
~IEngine() = default;
/*!
* \brief Allgather function, each node have a segment of data in the ring of sendrecvbuf,
* the data provided by current node k is [slice_begin, slice_end),
* the next node's segment must start with slice_end
* after the call of Allgather, sendrecvbuf_ contains all the contents including all segments
* use a ring based algorithm
*
* \param sendrecvbuf_ buffer for both sending and receiving data, it is a ring conceptually
* \param total_size total size of data to be gathered
* \param slice_begin beginning of the current slice
* \param slice_end end of the current slice
* \param size_prev_slice size of the previous slice i.e. slice of node (rank - 1) % world_size
*/
virtual void Allgather(void *sendrecvbuf,
size_t total_size,
size_t slice_begin,
size_t slice_end,
size_t size_prev_slice) = 0;
/*!
* \brief performs in-place Allreduce, on sendrecvbuf
* this function is NOT thread-safe
* \param sendrecvbuf_ buffer for both sending and receiving data
* \param type_nbytes the number of bytes the type has
* \param count number of elements to be reduced
* \param reducer reduce function
* \param prepare_func Lazy preprocessing function, if it is not NULL, prepare_fun(prepare_arg)
* will be called by the function before performing Allreduce in order to initialize the data in sendrecvbuf.
* If the result of Allreduce can be recovered directly, then prepare_func will NOT be called
* \param prepare_arg argument used to pass into the lazy preprocessing function
*/
virtual void Allreduce(void *sendrecvbuf_,
size_t type_nbytes,
size_t count,
ReduceFunction reducer,
PreprocFunction prepare_fun = nullptr,
void *prepare_arg = nullptr) = 0;
/*!
* \brief broadcasts data from root to every other node
* \param sendrecvbuf_ buffer for both sending and receiving data
* \param size the size of the data to be broadcasted
* \param root the root worker id to broadcast the data
*/
virtual void Broadcast(void *sendrecvbuf_, size_t size, int root) = 0;
/*!
* \brief loads the latest check point
* \param global_model pointer to the globally shared model/state
* when calling this function, the caller needs to guarantee that the global_model
* is the same in all nodes
* \param local_model pointer to the local model that is specific to current node/rank
* this can be NULL when no local model is needed
*
* \return the version number of the model loaded
* if returned version == 0, this means no model has been CheckPointed
* the p_model is not touched, users should do necessary initialization by themselves
*
* Common usage example:
* int iter = rabit::LoadCheckPoint(&model);
* if (iter == 0) model.InitParameters();
* for (i = iter; i < max_iter; ++i) {
* do many things, include allreduce
* rabit::CheckPoint(model);
* }
*
* \sa CheckPoint, VersionNumber
*/
virtual int LoadCheckPoint(Serializable *global_model,
Serializable *local_model = nullptr) = 0;
/*!
* \brief checkpoints the model, meaning a stage of execution was finished
* every time we call check point, a version number increases by ones
*
* \param global_model pointer to the globally shared model/state
* when calling this function, the caller needs to guarantee that the global_model
* is the same in every node
* \param local_model pointer to the local model that is specific to current node/rank
* this can be NULL when no local state is needed
*
* NOTE: local_model requires explicit replication of the model for fault-tolerance, which will
* bring replication cost in CheckPoint function. global_model does not need explicit replication.
* So, only CheckPoint with global_model if possible
*
* \sa LoadCheckPoint, VersionNumber
*/
virtual void CheckPoint(const Serializable *global_model,
const Serializable *local_model = nullptr) = 0;
/*!
* \brief This function can be used to replace CheckPoint for global_model only,
* when certain condition is met (see detailed explanation).
*
* This is a "lazy" checkpoint such that only the pointer to global_model is
* remembered and no memory copy is taken. To use this function, the user MUST ensure that:
* The global_model must remain unchanged until the last call of Allreduce/Broadcast in the current version finishes.
* In other words, global_model can be changed only between the last call of
* Allreduce/Broadcast and LazyCheckPoint in the current version
*
* For example, suppose the calling sequence is:
* LazyCheckPoint, code1, Allreduce, code2, Broadcast, code3, LazyCheckPoint
*
* If the user can only change global_model in code3, then LazyCheckPoint can be used to
* improve the efficiency of the program.
* \param global_model pointer to the globally shared model/state
* when calling this function, the caller needs to guarantee that global_model
* is the same in every node
* \sa LoadCheckPoint, CheckPoint, VersionNumber
*/
virtual void LazyCheckPoint(const Serializable *global_model) = 0;
/*!
* \return version number of the current stored model,
* which means how many calls to CheckPoint we made so far
* \sa LoadCheckPoint, CheckPoint
*/
virtual int VersionNumber() const = 0;
/*! \brief gets rank of previous node in ring topology */
virtual int GetRingPrevRank() const = 0;
/*! \brief gets rank of current node */
virtual int GetRank() const = 0;
/*! \brief gets total number of nodes */
virtual int GetWorldSize() const = 0;
/*! \brief whether we run in distribted mode */
virtual bool IsDistributed() const = 0;
/*! \brief gets the host name of the current node */
virtual std::string GetHost() const = 0;
/*!
* \brief prints the msg in the tracker,
* this function can be used to communicate progress information to
* the user who monitors the tracker
* \param msg message to be printed in the tracker
*/
virtual void TrackerPrint(const std::string &msg) = 0;
};
/*! \brief initializes the engine module */
bool Init(int argc, char *argv[]);
/*! \brief finalizes the engine module */
bool Finalize();
/*! \brief singleton method to get engine */
IEngine *GetEngine();
/*! \brief namespace that contains stubs to be compatible with MPI */
namespace mpi {
/*!\brief enum of all operators */
enum OpType {
kMax = 0,
kMin = 1,
kSum = 2,
kBitwiseOR = 3
};
/*!\brief enum of supported data types */
enum DataType {
kChar = 0,
kUChar = 1,
kInt = 2,
kUInt = 3,
kLong = 4,
kULong = 5,
kFloat = 6,
kDouble = 7,
kLongLong = 8,
kULongLong = 9
};
} // namespace mpi
/*!
* \brief Allgather function, each node have a segment of data in the ring of sendrecvbuf,
* the data provided by current node k is [slice_begin, slice_end),
* the next node's segment must start with slice_end
* after the call of Allgather, sendrecvbuf_ contains all the contents including all segments
* use a ring based algorithm
*
* \param sendrecvbuf buffer for both sending and receiving data, it is a ring conceptually
* \param total_size total size of data to be gathered
* \param slice_begin beginning of the current slice
* \param slice_end end of the current slice
* \param size_prev_slice size of the previous slice i.e. slice of node (rank - 1) % world_size
*/
void Allgather(void* sendrecvbuf,
size_t total_size,
size_t slice_begin,
size_t slice_end,
size_t size_prev_slice);
/*!
* \brief perform in-place Allreduce, on sendrecvbuf
* this is an internal function used by rabit to be able to compile with MPI
* do not use this function directly
* \param sendrecvbuf buffer for both sending and receiving data
* \param type_nbytes the number of bytes the type has
* \param count number of elements to be reduced
* \param reducer reduce function
* \param dtype the data type
* \param op the reduce operator type
* \param prepare_func Lazy preprocessing function, lazy prepare_fun(prepare_arg)
* will be called by the function before performing Allreduce, to initialize the data in sendrecvbuf_.
* If the result of Allreduce can be recovered directly, then prepare_func will NOT be called
* \param prepare_arg argument used to pass into the lazy preprocessing function.
*/
void Allreduce_(void *sendrecvbuf, // NOLINT
size_t type_nbytes,
size_t count,
IEngine::ReduceFunction red,
mpi::DataType dtype,
mpi::OpType op,
IEngine::PreprocFunction prepare_fun = nullptr,
void *prepare_arg = nullptr);
} // namespace engine
} // namespace rabit
#endif // RABIT_INTERNAL_ENGINE_H_