Skip to content

Commit

Permalink
[GPUPS]Optimize hbm for dymf (#43863)
Browse files Browse the repository at this point in the history
* fix merge_grad&push_sparse

* change typo

* fix code format. test=develop

* fix code format. test=develop

* fix code format. test=develop

* fix debug info

* optimize hbm

* fix size_t

* fix size_t
  • Loading branch information
zmxdream committed Jun 28, 2022
1 parent 6aeb60a commit 1dc2117
Showing 1 changed file with 90 additions and 23 deletions.
113 changes: 90 additions & 23 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.cc
Expand Up @@ -669,10 +669,10 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
#ifdef PADDLE_WITH_CUDA
HeterPs_->set_nccl_comm_and_size(inner_comms_, inter_comms_, node_size_);
#endif
auto build_dynamic_mf_func = [this, &gpu_task](int i, int j) {

auto build_dymf_mem_pool = [this, &gpu_task](int i, int j) {
this->HeterPs_->set_multi_mf_dim(multi_mf_dim_, max_mf_dim_);
int mf_dim = this->index_dim_vec_[j];
VLOG(0) << "building table: " << i << "with mf dim: " << mf_dim;
size_t feature_value_size =
TYPEALIGN(8, sizeof(FeatureValue) + ((mf_dim + 1) * sizeof(float)));
auto& device_dim_keys = gpu_task->device_dim_keys_[i][j];
Expand All @@ -681,8 +681,68 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
CHECK(len == device_dim_ptrs.size());
this->mem_pools_[i * this->multi_mf_dim_ + j] =
new MemoryPool(len, feature_value_size);
};
auto build_dymf_hbm_pool = [this, &gpu_task](int i, int j) {
auto& device_dim_keys = gpu_task->device_dim_keys_[i][j];
size_t len = device_dim_keys.size();
int mf_dim = this->index_dim_vec_[j];
size_t feature_value_size =
TYPEALIGN(8, sizeof(FeatureValue) + ((mf_dim + 1) * sizeof(float)));

auto& mem_pool = this->mem_pools_[i * this->multi_mf_dim_ + j];
platform::CUDADeviceGuard guard(resource_->dev_id(i));
this->hbm_pools_[i * this->multi_mf_dim_ + j] = new HBMMemoryPool(mem_pool);
auto& cur_pool = this->hbm_pools_[i * this->multi_mf_dim_ + j];

this->HeterPs_->build_ps(i,
device_dim_keys.data(),
cur_pool->mem(),
len,
feature_value_size,
500000,
2);
if (device_dim_keys.size() > 0) {
VLOG(3) << "show table: " << i
<< " table kv size: " << device_dim_keys.size()
<< "dim: " << mf_dim << " len: " << len;
HeterPs_->show_one_table(i);
}
delete mem_pool;
};
int thread_num = 16;
auto build_dynamic_mf_func = [this, &gpu_task, thread_num](
int i, int j, int z) {
// this->HeterPs_->set_multi_mf_dim(multi_mf_dim_, max_mf_dim_);
int mf_dim = this->index_dim_vec_[j];
VLOG(0) << "building table: " << i << "with mf dim: " << mf_dim;
// size_t feature_value_size =
// TYPEALIGN(8, sizeof(FeatureValue) + ((mf_dim + 1) * sizeof(float)));
auto& device_dim_keys = gpu_task->device_dim_keys_[i][j];
auto& device_dim_ptrs = gpu_task->device_dim_ptr_[i][j];
size_t len = device_dim_keys.size();
CHECK(len == device_dim_ptrs.size());
// this->mem_pools_[i * this->multi_mf_dim_ + j] =
// new MemoryPool(len, feature_value_size);
auto& mem_pool = this->mem_pools_[i * this->multi_mf_dim_ + j];
for (size_t k = 0; k < len; k++) {

// ============ add for multi-thread ================
size_t len_per_thread = len / thread_num;
size_t remain = len % thread_num;
size_t left = 0, right = 0;

size_t real_len = len_per_thread;
if ((size_t)z < remain) real_len++;

if ((size_t)z < remain) {
left = z * (len_per_thread + 1);
right = left + real_len;
} else {
left = remain * (len_per_thread + 1) + (z - remain) * len_per_thread;
right = left + real_len;
}
// ============ add for multi-thread ================

for (size_t k = left; k < right; k++) {
FeatureValue* val = (FeatureValue*)(mem_pool->mem_address(k));
float* ptr_val = device_dim_ptrs[k]->data();
size_t dim = device_dim_ptrs[k]->size();
Expand Down Expand Up @@ -734,38 +794,45 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
}
}
}
};

platform::CUDADeviceGuard guard(resource_->dev_id(i));

this->hbm_pools_[i * this->multi_mf_dim_ + j] = new HBMMemoryPool(mem_pool);
auto& cur_pool = this->hbm_pools_[i * this->multi_mf_dim_ + j];
threads.resize(device_num * multi_mf_dim_);
for (int i = 0; i < device_num; i++) {
for (int j = 0; j < multi_mf_dim_; j++) {
threads[i + j * device_num] = std::thread(build_dymf_mem_pool, i, j);
}
}

this->HeterPs_->build_ps(i,
device_dim_keys.data(),
cur_pool->mem(),
len,
feature_value_size,
500000,
2);
for (std::thread& t : threads) {
t.join();
}
threads.clear();

if (device_dim_keys.size() > 0) {
VLOG(0) << "show ptr table: " << i
<< " table kv size: " << device_dim_keys.size()
<< "dim: " << mf_dim << " len: " << len;
this->HeterPs_->show_one_table(i);
// multi-thread process
threads.resize(device_num * multi_mf_dim_ * thread_num);
for (int i = 0; i < device_num; i++) {
for (int j = 0; j < multi_mf_dim_; j++) {
for (int k = 0; k < thread_num; k++) {
threads[(i + j * device_num) * thread_num + k] =
std::thread(build_dynamic_mf_func, i, j, k);
}
}
delete mem_pool;
};
}
for (std::thread& t : threads) {
t.join();
}
threads.clear();
threads.resize(device_num * multi_mf_dim_);
for (int i = 0; i < device_num; i++) {
for (int j = 0; j < multi_mf_dim_; j++) {
threads[i + j * device_num] = std::thread(build_dynamic_mf_func, i, j);
threads[i + j * device_num] = std::thread(build_dymf_hbm_pool, i, j);
}
}

for (std::thread& t : threads) {
t.join();
}
threads.clear();

timeline.Pause();
VLOG(0) << "GpuPs build table total costs: " << timeline.ElapsedSec()
<< " s.";
Expand Down

0 comments on commit 1dc2117

Please sign in to comment.