Skip to content

Commit

Permalink
Merge branch 'PaddlePaddle:develop' into softmax2d
Browse files Browse the repository at this point in the history
  • Loading branch information
Asthestarsfalll committed Apr 8, 2022
2 parents 59cab23 + 33abfbe commit d0484ba
Show file tree
Hide file tree
Showing 707 changed files with 26,176 additions and 11,275 deletions.
2 changes: 1 addition & 1 deletion cmake/external/libmct.cmake
Expand Up @@ -45,7 +45,7 @@ ExternalProject_Add(
PREFIX ${LIBMCT_PREFIX_DIR}
DOWNLOAD_DIR ${LIBMCT_DOWNLOAD_DIR}
DOWNLOAD_COMMAND wget --no-check-certificate ${LIBMCT_URL} -c -q -O ${LIBMCT_NAME}.tar.gz
&& tar zxvf ${LIBMCT_NAME}.tar.gz
&& tar --no-same-owner -zxvf ${LIBMCT_NAME}.tar.gz
DOWNLOAD_NO_PROGRESS 1
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${LIBMCT_INSTALL_ROOT}
Expand Down
2 changes: 1 addition & 1 deletion cmake/external/xpu.cmake
Expand Up @@ -36,7 +36,7 @@ ENDIF()

if(NOT DEFINED XPU_BASE_URL)
SET(XPU_BASE_URL_WITHOUT_DATE "https://baidu-kunlun-product.cdn.bcebos.com/KL-SDK/klsdk-dev")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20220331")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20220408")
else()
SET(XPU_BASE_URL "${XPU_BASE_URL}")
endif()
Expand Down
4 changes: 4 additions & 0 deletions cmake/flags.cmake
Expand Up @@ -244,3 +244,7 @@ if(WITH_ROCM)
string (REPLACE "-Werror" "-Wno-error" CMAKE_C_FLAGS ${CMAKE_C_FLAGS})
endif()

if(WITH_PSCORE OR WITH_PSLIB)
string (REPLACE "-Wnon-virtual-dtor" "-Wno-non-virtual-dtor" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
string (REPLACE "-Wnon-virtual-dtor" "-Wno-non-virtual-dtor" CMAKE_C_FLAGS ${CMAKE_C_FLAGS})
endif()
7 changes: 0 additions & 7 deletions cmake/inference_lib.cmake
Expand Up @@ -199,13 +199,6 @@ IF(WITH_XPU)
DSTS ${dst_dir} ${dst_dir})
ENDIF()

IF(WITH_IPU)
set(dst_dir "${PADDLE_INFERENCE_INSTALL_DIR}/third_party/install/ipu")
copy(inference_lib_dist
SRCS ${CMAKE_BINARY_DIR}/paddle/fluid/platform/device/ipu/libpaddle_ipu.so
DSTS ${dst_dir})
ENDIF()

# CMakeCache Info
copy(inference_lib_dist
SRCS ${CMAKE_CURRENT_BINARY_DIR}/CMakeCache.txt
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/distributed/collective/CMakeLists.txt
Expand Up @@ -7,14 +7,14 @@ endif()

if(WITH_NCCL)
cc_library(processgroup_nccl SRCS ProcessGroupNCCL.cc NCCLTools.cc Common.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
if (WITH_DISTRIBUTE)
if (WITH_DISTRIBUTE AND WITH_PSCORE)
cc_library(processgroup_heter SRCS ProcessGroupHeter.cc NCCLTools.cc Common.cc DEPS place cuda_stream enforce collective_helper device_context phi phi_api eager_api)
endif()
endif()

if(WITH_ASCEND_CL)
cc_library(processgroup_hccl SRCS ProcessGroupHCCL.cc HCCLTools.cc Common.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api)
if (WITH_DISTRIBUTE)
if (WITH_DISTRIBUTE AND WITH_PSCORE)
cc_library(processgroup_heter SRCS ProcessGroupHeter.cc HCCLTools.cc Common.cc DEPS place npu_stream enforce collective_helper device_context phi phi_api eager_api)
endif()
endif()
4 changes: 2 additions & 2 deletions paddle/fluid/distributed/collective/ProcessGroup.cc
Expand Up @@ -35,10 +35,10 @@ bool ProcessGroup::Task::Wait(std::chrono::milliseconds timeout) {
void ProcessGroup::Task::Synchronize() {}

ProcessGroup::ProcessGroup(int rank, int size, int gid)
: rank_(rank), size_(size) {
: rank_(rank), size_(size), gid_(gid) {
if (gid != IGNORE_ID) {
auto map = ProcessGroupMapFromGid::getInstance();
map->insert(gid, this);
map->insert(gid_, this);
}
}

Expand Down
18 changes: 11 additions & 7 deletions paddle/fluid/distributed/collective/ProcessGroup.h
Expand Up @@ -93,8 +93,8 @@ class ProcessGroup {
}

virtual void Broadcast(const phi::DenseTensor* in, phi::DenseTensor* out) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support broadcast for static",
PADDLE_THROW(platform::errors::Fatal(
"ProcessGroup%s does not support broadcast for static mode runtime",
GetBackendName()));
}

Expand Down Expand Up @@ -148,6 +148,7 @@ class ProcessGroup {
protected:
const int rank_;
const int size_;
const int gid_;
};

class ProcessGroupMapFromGid {
Expand All @@ -158,17 +159,20 @@ class ProcessGroupMapFromGid {
}

void insert(int gid, ProcessGroup* pg) {
// TODO(sandyhouse): address ut and uncomment the following codes
// PADDLE_ENFORCE_EQ(has(gid), false,
// platform::errors::PreconditionNotMet(
// "The process group with id %d does exist.", gid));
// platform::errors::PreconditionNotMet(
// "The process group with id %d doesnot exist.",
// gid));
map_[gid] = pg;
}

ProcessGroup* get(int gid) {
// TODO(sandyhouse): address ut and uncomment the following codes
// PADDLE_ENFORCE_EQ(has(gid), true,
// platform::errors::PreconditionNotMet(
// "The process group with id %d doesnot exist.",
// gid));
// platform::errors::PreconditionNotMet(
// "The process group with id %d doesnot exist.",
// gid));
return map_.find(gid)->second;
}

Expand Down
6 changes: 0 additions & 6 deletions paddle/fluid/distributed/collective/ProcessGroupHCCL.cc
Expand Up @@ -30,12 +30,6 @@ constexpr int64_t kWaitBlockTImeout = 10;
namespace paddle {
namespace distributed {

// bool CheckTensorsInNPUPlace(const std::vector<Tensor>& tensors) {
// return std::all_of(tensors.cbegin(), tensors.cend(), [&](const Tensor& t) {
// return t.place() == platform::DeviceType::NPU;
// });
// }

void SyncDefaultStream(
const std::vector<Place>& places,
std::vector<NPUEventManager>& hcclEvents, // NOLINT
Expand Down
188 changes: 156 additions & 32 deletions paddle/fluid/distributed/collective/ProcessGroupHeter.cc
Expand Up @@ -56,29 +56,35 @@ ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr<Store>& store,
local_size_(local_size),
gloo_rank_(gloo_rank),
gloo_size_(gloo_size),
with_switch_(with_switch) {
with_switch_(with_switch),
switch_endpoint_(switch_endpoint) {
#if defined(PADDLE_WITH_NCCL)
inner_pg_ = std::make_shared<ProcessGroupNCCL>(store, local_rank, local_size,
IGNORE_ID);
#elif defined(PADDLE_WITH_ASCEND_CL)
inner_pg_ = std::make_shared<ProcessGroupHCCL>(store, local_rank, local_size,
IGNORE_ID);
#else
PADDLE_THROW(platform::errors::InvalidArgument(
PADDLE_THROW(platform::errors::Fatal(
"ProcessGroupHeter only supports NCCL and HCCL now.");
#endif
if (with_switch_) {
// TODO(sandyhouse) starts a client to connect the cloud switch module
// std::shared_ptr<HeterClient> client_ =
// HeterClient::GetInstance({switch_endpoint}, {}, 0);
} else if (local_rank_ == 0) {
if (local_rank_ == 0 && !with_switch_) {
auto opts = ProcessGroupGloo::GlooOptions::create();
opts->device = ProcessGroupGloo::createDefaultDevice();
inter_pg_ = std::make_shared<ProcessGroupGloo>(store, gloo_rank_,
gloo_size_, IGNORE_ID, opts);
}
}

template <typename T>
static void _do_add(T* dst, T* src, size_t size) {
for (size_t i = 0; i < size; i++) {
*dst += *src;
dst++;
src++;
}
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
std::vector<Tensor>& tensors, const AllreduceOptions& opts) {
#if defined(PADDLE_WITH_NCCL)
Expand All @@ -93,33 +99,92 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(

// Step2: copy tensors to CPU
if (local_rank_ == 0) {
std::vector<Tensor> cpu_tensors(tensors.size());
std::vector<Tensor> cpu_tensors;
cpu_tensors.reserve(tensors.size());
for (size_t i = 0; i < tensors.size(); i++) {
auto dense_gpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl());
dense_cpu_tensor->Resize(tensors[i].dims());
phi::DenseTensorMeta meta = phi::DenseTensorMeta(
dense_gpu_tensor->dtype(), dense_gpu_tensor->dims());
std::shared_ptr<phi::DenseTensor> dense_cpu_tensor =
std::make_shared<phi::DenseTensor>(
std::make_unique<paddle::experimental::DefaultAllocator>(
paddle::platform::CPUPlace())
.get(),
meta);
dense_cpu_tensor->ResizeAndAllocate(dense_gpu_tensor->dims());
cpu_tensors[i] = paddle::experimental::Tensor(dense_cpu_tensor);
framework::TensorCopySync(*dense_gpu_tensor, platform::CPUPlace(),
dense_cpu_tensor.get());
}
// Step3: do inter cluster allreduce
if (with_switch_) {
// TODO(sandyhouse) send to and recv from switch, and do add
if (local_rank_ == 0) {
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[0].impl());
std::vector<int> send_size;
send_size.push_back(dense_cpu_tensor->numel());
int ret = client_->Send(
gid_, {dense_cpu_tensor->name()}, send_size,
dense_cpu_tensor->data(),
dense_cpu_tensor->numel() *
framework::DataTypeSize(dense_cpu_tensor->dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"Send to the switch module error."));
phi::DenseTensorMeta meta = phi::DenseTensorMeta(
dense_cpu_tensor->dtype(), dense_cpu_tensor->dims());
std::shared_ptr<phi::DenseTensor> dense_cpu_tensor2 =
std::make_shared<phi::DenseTensor>(
std::make_unique<paddle::experimental::DefaultAllocator>(
paddle::platform::CPUPlace())
.get(),
meta);
dense_cpu_tensor2->ResizeAndAllocate(dense_cpu_tensor->dims());
Tensor cpu_tensor_temp =
paddle::experimental::Tensor(dense_cpu_tensor2);
ret = client_->Recv(
gid_, {dense_cpu_tensor->name()}, dense_cpu_tensor2->data(),
dense_cpu_tensor2->numel() *
framework::DataTypeSize(dense_cpu_tensor2->dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"Recv from the switch module error."));

switch (dense_cpu_tensor->dtype()) {
case DataType::FLOAT32:
_do_add<float>(reinterpret_cast<float*>(dense_cpu_tensor->data()),
reinterpret_cast<float*>(dense_cpu_tensor2->data()),
dense_cpu_tensor->numel());
break;
case DataType::FLOAT64:
_do_add<double>(
reinterpret_cast<double*>(dense_cpu_tensor->data()),
reinterpret_cast<double*>(dense_cpu_tensor2->data()),
dense_cpu_tensor->numel());
break;
case DataType::INT32:
_do_add<int>(reinterpret_cast<int*>(dense_cpu_tensor->data()),
reinterpret_cast<int*>(dense_cpu_tensor2->data()),
dense_cpu_tensor->numel());
break;
default:
PADDLE_THROW(platform::errors::PreconditionNotMet(
"Unsupported data type (%s) to do add.",
framework::DataType2String(dense_cpu_tensor->dtype())));
}
}
} else {
auto gloo_task = inter_pg_->AllReduce(cpu_tensors, opts);
gloo_task->Wait();
}
// Step4: copy cpu tensors to gpu
// TODO(sandyhouse)
// copy cpu tensors to gpu
for (size_t i = 0; i < tensors.size(); i++) {
auto dense_gpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl());
// framework::TensorCopySync(*dense_cpu_tensor, tensors[i].place(),
// dense_gpu_tensor.get());
framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(),
dense_gpu_tensor.get());
}
Expand Down Expand Up @@ -147,18 +212,57 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
inner_pg_->Broadcast(tensors, b_opts);

if (local_rank_ == 0) {
std::vector<Tensor> cpu_tensors(tensors.size());
std::vector<Tensor> cpu_tensors;
cpu_tensors.reserve(tensors.size());
for (size_t i = 0; i < tensors.size(); i++) {
auto dense_gpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl());
dense_cpu_tensor->Resize(tensors[i].dims());
phi::DenseTensorMeta meta = phi::DenseTensorMeta(
dense_gpu_tensor->dtype(), dense_gpu_tensor->dims());
std::shared_ptr<phi::DenseTensor> dense_cpu_tensor =
std::make_shared<phi::DenseTensor>(
std::make_unique<paddle::experimental::DefaultAllocator>(
paddle::platform::CPUPlace())
.get(),
meta);
dense_cpu_tensor->ResizeAndAllocate(dense_gpu_tensor->dims());
cpu_tensors[i] = paddle::experimental::Tensor(dense_cpu_tensor);
framework::TensorCopySync(*dense_gpu_tensor, platform::CPUPlace(),
dense_cpu_tensor.get());
}
if (with_switch_) {
// TODO(sandyhouse) send to and recv
if (local_rank_ == 0) {
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[0].impl());
if (gloo_rank_ == 0) {
std::vector<int> send_size;
send_size.push_back(dense_cpu_tensor->numel());
int ret = client_->Send(
gid_, {dense_cpu_tensor->name()}, send_size,
dense_cpu_tensor->data(),
dense_cpu_tensor->numel() *
framework::DataTypeSize(dense_cpu_tensor->dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"Send to the switch module error."));
} else {
int ret = client_->Recv(
gid_, {dense_cpu_tensor->name()}, dense_cpu_tensor->data(),
dense_cpu_tensor->numel() *
framework::DataTypeSize(dense_cpu_tensor->dtype()));
PADDLE_ENFORCE_EQ(ret, 0,
platform::errors::PreconditionNotMet(
"Receive from the switch module error."));
ret = client_->Recv(
gid_, {dense_cpu_tensor->name()}, dense_cpu_tensor->data(),
dense_cpu_tensor->numel() *
framework::DataTypeSize(dense_cpu_tensor->dtype()));
PADDLE_ENFORCE_EQ(ret, 0,
platform::errors::PreconditionNotMet(
"Receive from the switch module error."));
}
}
} else {
auto gloo_task = inter_pg_->Broadcast(cpu_tensors, opts);
gloo_task->Wait();
Expand All @@ -168,8 +272,6 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl());
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl());
// framework::TensorCopySync(*dense_cpu_tensor, tensors[i].place(),
// dense_gpu_tensor.get());
framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(),
dense_gpu_tensor.get());
}
Expand All @@ -185,22 +287,44 @@ void ProcessGroupHeter::Broadcast(const phi::DenseTensor* in,
inner_pg_->Broadcast(in, out);

if (local_rank_ == 0) {
Tensor cpu_tensor;
auto dense_cpu_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensor.impl());
dense_cpu_tensor->Resize(in->dims());
phi::DenseTensorMeta meta = phi::DenseTensorMeta(in->dtype(), in->dims());
std::shared_ptr<phi::DenseTensor> dense_cpu_tensor =
std::make_shared<phi::DenseTensor>(
std::make_unique<paddle::experimental::DefaultAllocator>(
paddle::platform::CPUPlace())
.get(),
meta);
dense_cpu_tensor->ResizeAndAllocate(in->dims());
Tensor cpu_tensor = paddle::experimental::Tensor(dense_cpu_tensor);
framework::TensorCopySync(*in, platform::CPUPlace(),
dense_cpu_tensor.get());
if (with_switch_) {
// TODO(sandyhouse) send to and recv
if (local_rank_ == 0) {
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
if (gloo_rank_ == 0) {
std::vector<int> send_size;
send_size.push_back(in->numel());
int ret = client_->Send(
gid_, {in->name()}, send_size, dense_cpu_tensor->data(),
in->numel() * framework::DataTypeSize(in->dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"Send to the switch module error."));
} else {
int ret =
client_->Recv(gid_, {in->name()}, dense_cpu_tensor->data(),
in->numel() * framework::DataTypeSize(in->dtype()));
PADDLE_ENFORCE_EQ(ret, 0,
platform::errors::PreconditionNotMet(
"Receive from the switch module error."));
}
}
} else {
std::vector<Tensor> cpu_tensors = {cpu_tensor};
// auto gloo_task = inter_pg_->Broadcast(cpu_tensors);
// gloo_task->Wait();
inter_pg_->Broadcast(cpu_tensors);
auto gloo_task = inter_pg_->Broadcast(cpu_tensors);
gloo_task->Wait();
}
framework::TensorCopySync(*dense_cpu_tensor, dense_cpu_tensor->place(),
out);
framework::TensorCopySync(*dense_cpu_tensor, out->place(), out);
}
inner_pg_->Broadcast(out, out);
}
Expand Down

1 comment on commit d0484ba

@paddle-bot-old
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Congratulation! Your pull request passed all required CI. You could ask reviewer(s) to approve and merge. 🎉

Please sign in to comment.