Skip to content

Commit

Permalink
Xpups dev (#42692)
Browse files Browse the repository at this point in the history
* Adapt XPUPS - 1st version - 3.24

* Adapt XPUPS - update XPU PushSparse -  2nd version - 3.24

* Adapt XPUPS - add XPU PullSparseOp - 3nd version - 3.25

* refactor heter comm kernel

* update. test=develop

* Adapt XPUPS - modify by compilation - 4th version - 3.27

* update calc_shard_offset. test=develop

* update xpu kernel. test=develop

* update args of calc_shard_offset

* update. test=develop

* remove customGradMerger

* update. test=develop

* heter_comm update

* heter_comm update

* update calc_shard_offset. test=develop

* heter_comm update

* update args of calc_shard_offset

* update. test=develop

* remove customGradMerger

* update. test=develop

* fix. test=develop

* update. test=develop

* update. test=develop

* update optimizer kernel

* Adapt XPUPS - use WITH_XPU_KP and modify wrapper kernel function - 5th version - 3.30

* update. test=develop

* update pslib.cmake

* update. test=develop

* update. test=develop

* update. test=develop

* update. test=develop

* update. test=develop

* Adapt XPUPS - modify by kp compilation  - 6th version - 3.30

* update. test=develop

* update. test=develop

* update. test=develop

* update optimizer kernel

* update. test=develop

* update. test=develop

* update. test=develop

* update. test=develop

* update. test=develop

* update. test=develop

* update. test=develop

* update. test=develop

* fix. test=develop

* fix. test=develop

* used by minxu

* update heter_comm_inl

* fix. test=develop

* Adapt XPUPS - modify by kp compilation  - 7th version - 3.30

* fix. test=develop

* add optimizer kernel. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* 3.31 update

* Adapt XPUPS - update kp compilation path  - 8th version - 3.31

* add optimizer kernel. test=develop

* fix kunlun not support size_t. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix kunlun not support size_t. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* update heter_comm_kernel.kps 3.31

* fix. test=develop

* fix. test=develop

* update heter_comm_kernel.kps 3.31

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* update heter_comm.h 3.31

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* update hashtable. test=develop

* update. test=develop

* Adapt XPUPS - update by kp compilation  - 9th version - 4.1

* update hashtable. test=develop

* fix. test=develop

* update hashtable 4.1

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* Adapt XPUPS - update by kp compilation  - 10th version - 4.1

* fix. test=develop

* fix. test=develop

* fix. test=develop

* update. test=develop

* modify by compilation 4.1

* update. test=develop

* update. test=develop

* fix. test=develop

* modify by compilation 4.1

* update. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* modify by compilation 4.1

* fix. test=develop

* fix. test=develop

* fix. test=develop

* modify by compilation 4.1 19:30

* fix. test=develop

* update ps_gpu_wrapper.kps 4.1

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* Adapt XPUPS - update by kp compilation  - 11th version - 4.1

* fix. test=develop

* Adapt XPUPS - update by kp compilation  - 12nd version - 4.2

* fix. test=develop

* fix. test=develop

* modify by compilation 4.2

* 4.2 update

* fix. test=develop

* template init. test=develop

* update 4.6

* fix. test=develop

* template init. test=develop

* 4.6 modify by compilation

* hashtable template init. test=develop

* hashtable template init. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=devlop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=devlop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* Adapt XPUPS - update by kp compilation  - 13nd version - 4.7

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* 4.11 update

* fix. test=develop

* fix. test=develop

* 4.11 update

* update by pre-commit

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* 4.12 update

* fix. test=develop

* Adapt XPUPS - update by kp compilation  - 14th version - 4.13

* 4.13 update

* 4.14 update

* 4.14 update

* 4.14 update

* 4.14 modify by merged latest compilation

* retry CI 4.14

* 4.15 pass static check

* 4.15 modify by gpups CI

* 3.16 update by gpups CI - modify ps_gpu_wrapper.h

* 4.16 update

* 4.16 pass xpu compile

* 4.16 retry CI

* 4.16 update

* Adapt XPUPS - adapt BKCL comm for XPUPS - 4.24

* update by compilation

* Adapt XPUPS - register PSGPUTrainer for XPUPS - 4.25

* update device_worker_factory

* Adapt XPUPS - split heter_ps into .cu and .cc - 4.27

* Adapt XPUPS - register pull_box_sparse op under XPU_KP - 4.28

* update

* 5.7 modify ps_gpu_wrapper pull_sparse

* 5.11 update ps_gpu_wrapper CopyKeysKernel

Co-authored-by: zmxdream <zhangminxu01@baidu.com>
  • Loading branch information
WorgenZhang and zmxdream committed May 12, 2022
1 parent 190cf44 commit 272b7f1
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 40 deletions.
43 changes: 11 additions & 32 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.cc
Expand Up @@ -898,17 +898,9 @@ void PSGPUWrapper::PullSparse(const paddle::platform::Place& place,
all_timer.Start();
int64_t total_length =
std::accumulate(slot_lengths.begin(), slot_lengths.end(), 0UL);
#ifdef PADDLE_WITH_CUDA
VLOG(3) << "Begine Gpu Ps PullSparse";
VLOG(3) << "Begine Gpu/Xpu Ps PullSparse";
auto buf = memory::Alloc(place, total_length * sizeof(FeatureValue));
FeatureValue* total_values_gpu = reinterpret_cast<FeatureValue*>(buf->ptr());
#endif
#ifdef PADDLE_WITH_XPU_KP
VLOG(3) << "Begine Xpu Ps PullSparse";
FeatureValue* total_values_gpu = nullptr;
xpu_malloc(reinterpret_cast<void**>(&total_values_gpu),
total_length * sizeof(FeatureValue));
#endif
if (platform::is_cpu_place(place)) {
PADDLE_THROW(platform::errors::Unimplemented(
"Warning:: CPUPlace is not supported in GpuPs now."));
Expand Down Expand Up @@ -969,19 +961,11 @@ void PSGPUWrapper::PullSparse(const paddle::platform::Place& place,
slot_lengths_lod[i] += slot_lengths_lod[i - 1];
}

uint64_t* buf_key = nullptr;
int64_t* buf_length = nullptr;
PADDLE_ENFORCE_EQ(xpu_malloc(reinterpret_cast<void**>(&buf_key),
keys.size() * sizeof(uint64_t*)),
XPU_SUCCESS, platform::errors::ResourceExhausted(
"XPU has no enough memory"));
PADDLE_ENFORCE_EQ(xpu_malloc(reinterpret_cast<void**>(&buf_length),
slot_lengths.size() * sizeof(int64_t)),
XPU_SUCCESS, platform::errors::ResourceExhausted(
"XPU has no enough memory"));

uint64_t** xpu_keys = reinterpret_cast<uint64_t**>(&buf_key);
int64_t* xpu_len = reinterpret_cast<int64_t*>(buf_length);
auto buf_key = memory::Alloc(place, keys.size() * sizeof(uint64_t*));
auto buf_length =
memory::Alloc(place, slot_lengths.size() * sizeof(int64_t));
uint64_t** xpu_keys = reinterpret_cast<uint64_t**>(buf_key->ptr());
int64_t* xpu_len = reinterpret_cast<int64_t*>(buf_length->ptr());
PADDLE_ENFORCE_XPU_SUCCESS(xpu_memcpy(xpu_keys, keys.data(),
keys.size() * sizeof(uint64_t*),
XPU_HOST_TO_DEVICE));
Expand All @@ -997,8 +981,6 @@ void PSGPUWrapper::PullSparse(const paddle::platform::Place& place,
pull_gpups_timer.Start();
HeterPs_->pull_sparse(devid_2_index, total_keys, total_values_gpu,
static_cast<int>(total_length));
// PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
// "PullSparseGPU failed in GPUPS."));
pull_gpups_timer.Pause();

VLOG(3) << "Begin Copy result to tensor, total_length[" << total_length
Expand Down Expand Up @@ -1029,22 +1011,16 @@ void PSGPUWrapper::PushSparseGrad(const paddle::platform::Place& place,
all_timer.Start();
int64_t total_length =
std::accumulate(slot_lengths.begin(), slot_lengths.end(), 0UL);
#ifdef PADDLE_WITH_CUDA
// #ifdef PADDLE_WITH_CUDA
VLOG(3) << "Begin GPUPS PushSparseGrad";
auto buf = memory::Alloc(place, total_length * sizeof(FeaturePushValue));
FeaturePushValue* total_grad_values_gpu =
reinterpret_cast<FeaturePushValue*>(buf->ptr());
#endif
#ifdef PADDLE_WITH_XPU_KP
VLOG(3) << "Begine Xpu Ps PushSparseGrad";
FeaturePushValue* total_grad_values_gpu = nullptr;
xpu_malloc(reinterpret_cast<void**>(&total_grad_values_gpu),
total_length * sizeof(FeaturePushValue));
#endif
if (platform::is_cpu_place(place)) {
PADDLE_THROW(platform::errors::Unimplemented(
"Warning:: CPUPlace is not supported in GPUPS now."));
} else if (platform::is_gpu_place(place)) {
#ifdef PADDLE_WITH_CUDA
int device_id = place.GetDeviceId();
int devid_2_index = HeterPs_->get_index_by_devid(device_id);
LoDTensor& cached_total_keys_tensor = keys_tensor[devid_2_index];
Expand All @@ -1060,7 +1036,9 @@ void PSGPUWrapper::PushSparseGrad(const paddle::platform::Place& place,
HeterPs_->push_sparse(devid_2_index, total_keys, total_grad_values_gpu,
static_cast<int>(total_length));
push_gpups_timer.Pause();
#endif
} else if (platform::is_xpu_place(place)) {
#ifdef PADDLE_WITH_XPU_KP
int device_id = place.GetDeviceId();
int devid_2_index = HeterPs_->get_index_by_devid(device_id);
LoDTensor& cached_total_keys_tensor = keys_tensor[devid_2_index];
Expand All @@ -1076,6 +1054,7 @@ void PSGPUWrapper::PushSparseGrad(const paddle::platform::Place& place,
HeterPs_->push_sparse(devid_2_index, total_keys, total_grad_values_gpu,
static_cast<int>(total_length));
push_gpups_timer.Pause();
#endif
} else {
PADDLE_THROW(platform::errors::PreconditionNotMet(
"GPUPS: PushSparseGrad Only Support CUDAPlace Now."));
Expand Down
23 changes: 15 additions & 8 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.kps
Expand Up @@ -84,7 +84,7 @@ __global__ void PullCopy(float** dest, const FeatureValue* src,
}
}

__global__ void CopyKeysKernel(unsigned long long** src_keys,
__global__ void CopyKeysKernel(unsigned long long* src_keys,
unsigned long long* dest_total_keys,
const long long* len, int slot_num,
int total_len) {
Expand All @@ -95,21 +95,27 @@ __global__ void CopyKeysKernel(unsigned long long** src_keys,
}
int thread_id = ncores * cluster_id() + cid;
int nthreads = ncores * cluster_num();
__local__ int64_t local_len[slot_num];
GM2LM(len, local_len, slot_num * sizeof(int64_t));
__local__ long long local_len[slot_num];
GM2LM(len, local_len, slot_num * sizeof(long long));

__global_ptr__ unsigned long long* local_keys[slot_num];
GM2LM(src_keys, local_keys,
slot_num * sizeof(__global_ptr__ unsigned long long*));

for (int i = thread_id; i < slot_num; i += nthreads) {
// max core local memory = 8KB
int slot_len = i ? local_len[i] - local_len[i - 1] : local_len[0];
int read_len = min(slot_len, 1024);
// int read_len = min(slot_len, 1024);
int read_len = 100;
int dest_len = i ? local_len[i - 1] : 0;
__local__ uint64_t local_slot_keys[read_len];
__local__ unsigned long long local_slot_keys[read_len];

for (int k = 0; k < slot_len; k += read_len) {
int real_read_len = min(read_len, slot_len - k);
GM2LM(src_keys[i] + k, local_slot_keys, real_read_len * sizeof(uint64_t));
GM2LM(local_keys[i] + k, local_slot_keys,
real_read_len * sizeof(unsigned long long));
LM2GM(local_slot_keys, dest_total_keys + dest_len + k,
real_read_len * sizeof(uint64_t));
real_read_len * sizeof(unsigned long long));
}
}
}
Expand Down Expand Up @@ -199,7 +205,8 @@ void PSGPUWrapper::CopyKeys(const paddle::platform::Place& place,
stream = static_cast<platform::XPUDeviceContext*>(dev_ctx)
->x_context()
->xpu_stream;
unsigned long long** o_keys = (unsigned long long**)origin_keys;
unsigned long long* o_keys =
reinterpret_cast<unsigned long long*>(origin_keys);
unsigned long long* t_keys = (unsigned long long*)total_keys;
const long long* c_len = (const long long*)gpu_len;
CopyKeysKernel<<<2, 64, stream>>>(o_keys, t_keys, c_len, slot_num, total_len);
Expand Down

0 comments on commit 272b7f1

Please sign in to comment.