Skip to content

Commit

Permalink
support send_partial, recv_partial and allgather_partial in ProcessGr…
Browse files Browse the repository at this point in the history
…oupNCCL (PaddlePaddle#44444)
  • Loading branch information
haohongxiang authored and Aurelius84 committed Jul 29, 2022
1 parent e0ba939 commit 91a7488
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 93 deletions.
9 changes: 9 additions & 0 deletions paddle/fluid/distributed/collective/ProcessGroup.h
Expand Up @@ -137,6 +137,15 @@ class ProcessGroup {
"ProcessGroup%s does not support AllGather", GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
int offset,
int length) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support AllGather_Partial", GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&) { // NOLINT
Expand Down
48 changes: 40 additions & 8 deletions paddle/fluid/distributed/collective/ProcessGroupNCCL.cc
Expand Up @@ -85,26 +85,27 @@ bool ProcessGroupNCCL::NCCLTask::IsCompleted() {
return true;
}

void ProcessGroupNCCL::CheckSplitSizes(std::vector<int64_t>& split_sizes,
void ProcessGroupNCCL::CheckSplitSizes(std::vector<int64_t>* split_sizes,
std::vector<int64_t> tensor_shape) {
int64_t len_size = split_sizes.size();
int64_t len_size = (*split_sizes).size();
if (len_size == 0) {
PADDLE_ENFORCE_EQ(tensor_shape[0] % size_ == 0,
true,
platform::errors::InvalidArgument(
"Tensor's dim[0] must be divisible by group size "
"when split_sizes not given."));
split_sizes.insert(split_sizes.end(),
size_,
static_cast<int64_t>(tensor_shape[0] / size_));
(*split_sizes)
.insert((*split_sizes).end(),
size_,
static_cast<int64_t>(tensor_shape[0] / size_));
} else {
PADDLE_ENFORCE_EQ(
len_size == size_,
true,
platform::errors::InvalidArgument(
"The length of split_sizes must be equal to group size."));
auto sum_size = std::accumulate(
split_sizes.begin(), split_sizes.end(), static_cast<int64_t>(0));
(*split_sizes).begin(), (*split_sizes).end(), static_cast<int64_t>(0));
PADDLE_ENFORCE_EQ(
sum_size == tensor_shape[0],
true,
Expand Down Expand Up @@ -626,6 +627,37 @@ void* GetPointerByOffset(void* raw_pointer,
return nullptr;
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
int offset,
int length) {
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(in_tensors),
true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(out_tensors),
true,
platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
return Collective(
in_tensors,
out_tensors,
[&](phi::DenseTensor& input,
phi::DenseTensor& output,
ncclComm_t comm,
const gpuStream_t& stream) {
return platform::dynload::ncclAllGather(
GetPointerByOffset(input.data(), offset, input.dtype()),
output.data(),
length,
platform::ToNCCLDataType(input.dtype()),
comm,
stream);
},
CommType::ALLGATHER);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllToAll(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors) {
Expand Down Expand Up @@ -695,8 +727,8 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllToAll_Single(

std::vector<int64_t> in_dims = phi::vectorize(input.dims());
std::vector<int64_t> out_dims = phi::vectorize(output.dims());
CheckSplitSizes(in_sizes, in_dims);
CheckSplitSizes(out_sizes, out_dims);
CheckSplitSizes(&in_sizes, in_dims);
CheckSplitSizes(&out_sizes, out_dims);

size_t in_offset = 0, out_offset = 0;
size_t in_length = 0, out_length = 0;
Expand Down
8 changes: 7 additions & 1 deletion paddle/fluid/distributed/collective/ProcessGroupNCCL.h
Expand Up @@ -125,6 +125,12 @@ class ProcessGroupNCCL : public ProcessGroup {
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors) override;

std::shared_ptr<ProcessGroup::Task> AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
int offset,
int length) override;

std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>& in,
std::vector<phi::DenseTensor>& out) override;
Expand Down Expand Up @@ -206,7 +212,7 @@ class ProcessGroupNCCL : public ProcessGroup {
void CreateNCCLManagerCache(const std::string& places_key,
const std::vector<Place>& places);

void CheckSplitSizes(std::vector<int64_t>& split_sizes,
void CheckSplitSizes(std::vector<int64_t>* split_sizes,
std::vector<int64_t> tensor_shape);
};

Expand Down
45 changes: 30 additions & 15 deletions paddle/fluid/operators/collective/partial_allgather_op.cu.cc
Expand Up @@ -15,6 +15,7 @@ limitations under the License. */
#include "paddle/fluid/operators/collective/partial_allgather_op.h"

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
Expand Down Expand Up @@ -61,24 +62,38 @@ class PartialAllGatherOpCUDAKernel : public framework::OpKernel<T> {

int64_t send_numel = numel / nranks;
int offset = send_numel * rank;
const T* send_buff = in->data<T>() + offset;
T* recv_buff = out->data<T>();

gpuStream_t stream = nullptr;
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::CUDADeviceContext*>(dev_ctx)->stream();
auto map = distributed::ProcessGroupMapFromGid::getInstance();
if (map->has(rid)) {
// Use ProcessGroup
distributed::ProcessGroup* pg = map->get(rid);
std::vector<phi::DenseTensor> in_tensors;
std::vector<phi::DenseTensor> out_tensors;
in_tensors.push_back(*in);
out_tensors.push_back(*out);
auto task =
pg->AllGather_Partial(in_tensors, out_tensors, offset, send_numel);
task->Wait();
} else {
stream = comm->stream();
const T* send_buff = in->data<T>() + offset;
T* recv_buff = out->data<T>();

gpuStream_t stream = nullptr;
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::CUDADeviceContext*>(dev_ctx)->stream();
} else {
stream = comm->stream();
}

PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclAllGather(send_buff,
recv_buff,
send_numel,
static_cast<ncclDataType_t>(dtype),
comm->comm(),
stream));
}

PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclAllGather(send_buff,
recv_buff,
send_numel,
static_cast<ncclDataType_t>(dtype),
comm->comm(),
stream));
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
Expand Down
60 changes: 34 additions & 26 deletions paddle/fluid/operators/collective/partial_recv_op.cu.cc
Expand Up @@ -15,6 +15,7 @@ limitations under the License. */
#include "paddle/fluid/operators/collective/partial_recv_op.h"

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
Expand Down Expand Up @@ -65,37 +66,44 @@ class PartialRecvOpCUDAKernel : public framework::OpKernel<T> {
platform::errors::InvalidArgument(
"The input numel (%d) must be divisible by num(%d)", numel, num));

gpuStream_t stream = nullptr;
auto place = ctx.GetPlace();
auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::CUDADeviceContext *>(dev_ctx)->stream();
} else {
stream = comm->stream();
}
PADDLE_ENFORCE_LT(
peer,
comm->nranks(),
platform::errors::InvalidArgument("The value of peer (%d) you set must "
"be less than comm->nranks (%d).",
peer,
comm->nranks()));

out->mutable_data<T>(out_dims, place);
ncclDataType_t dtype = platform::ToNCCLDataType(type);
int recv_numel = numel / num;
int offset = recv_numel * id;

PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclRecv(out->data<T>() + offset,
recv_numel,
dtype,
peer,
comm->comm(),
stream));
VLOG(3) << "rank " << comm->rank() << " recv " << recv_numel
<< " from offset[" << offset << "] from " << peer;
auto map = distributed::ProcessGroupMapFromGid::getInstance();
if (map->has(rid)) {
// Use ProcessGroup
distributed::ProcessGroup *pg = map->get(rid);
auto task = pg->Recv_Partial(*out, peer, offset, recv_numel);
task->Wait();
} else {
gpuStream_t stream = nullptr;
auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::CUDADeviceContext *>(dev_ctx)->stream();
} else {
stream = comm->stream();
}
PADDLE_ENFORCE_LT(peer,
comm->nranks(),
platform::errors::InvalidArgument(
"The value of peer (%d) you set must "
"be less than comm->nranks (%d).",
peer,
comm->nranks()));
ncclDataType_t dtype = platform::ToNCCLDataType(type);
PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclRecv(out->data<T>() + offset,
recv_numel,
dtype,
peer,
comm->comm(),
stream));
VLOG(3) << "rank " << comm->rank() << " recv " << recv_numel
<< " from offset[" << offset << "] from " << peer;
}
#else
PADDLE_THROW(platform::errors::Unavailable(
"PaddlePaddle should be compiled with NCCL and "
Expand Down
62 changes: 39 additions & 23 deletions paddle/fluid/operators/collective/partial_send_op.cu.cc
Expand Up @@ -15,6 +15,7 @@ limitations under the License. */
#include "paddle/fluid/operators/collective/partial_send_op.h"

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
Expand Down Expand Up @@ -61,32 +62,47 @@ class PartialSendCUDAKernel : public framework::OpKernel<T> {
platform::errors::InvalidArgument(
"The input numel (%d) must be divisible by num(%d)", numel, num));

gpuStream_t stream = nullptr;
auto place = ctx.GetPlace();
auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::CUDADeviceContext*>(dev_ctx)->stream();
} else {
stream = comm->stream();
}
PADDLE_ENFORCE_LT(
peer,
comm->nranks(),
platform::errors::InvalidArgument("The value of peer (%d) you set must "
"be less than comm->nranks (%d).",
peer,
comm->nranks()));

ncclDataType_t dtype =
platform::ToNCCLDataType(framework::TransToProtoVarType(x->dtype()));
int send_numel = numel / num;
int offset = send_numel * id;

PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
x->data<T>() + offset, send_numel, dtype, peer, comm->comm(), stream));
VLOG(3) << "rank " << comm->rank() << " send " << send_numel
<< " from offset[" << offset << "] to " << peer;
auto map = distributed::ProcessGroupMapFromGid::getInstance();
if (map->has(rid)) {
// Use ProcessGroup
distributed::ProcessGroup* pg = map->get(rid);
phi::DenseTensor tmp = *x;
auto task = pg->Send_Partial(tmp, peer, offset, send_numel);
task->Wait();
} else {
gpuStream_t stream = nullptr;
auto place = ctx.GetPlace();
auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::CUDADeviceContext*>(dev_ctx)->stream();
} else {
stream = comm->stream();
}
PADDLE_ENFORCE_LT(peer,
comm->nranks(),
platform::errors::InvalidArgument(
"The value of peer (%d) you set must "
"be less than comm->nranks (%d).",
peer,
comm->nranks()));

ncclDataType_t dtype =
platform::ToNCCLDataType(framework::TransToProtoVarType(x->dtype()));

PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclSend(x->data<T>() + offset,
send_numel,
dtype,
peer,
comm->comm(),
stream));
VLOG(3) << "rank " << comm->rank() << " send " << send_numel
<< " from offset[" << offset << "] to " << peer;
}
#else
PADDLE_THROW(platform::errors::Unavailable(
"PaddlePaddle should be compiled with NCCL "
Expand Down

0 comments on commit 91a7488

Please sign in to comment.