Skip to content

Commit

Permalink
Merge pull request #34532 from ROCmSoftwarePlatform/r1.15-rccl-upstre…
Browse files Browse the repository at this point in the history
…am-patch

[ROCm] r1.15 rccl upstream patch
  • Loading branch information
mihaimaruseac committed Jan 21, 2020
2 parents cc1d12c + c954406 commit 360b2e3
Show file tree
Hide file tree
Showing 21 changed files with 292 additions and 94 deletions.
25 changes: 23 additions & 2 deletions tensorflow/core/common_runtime/gpu/gpu_device.cc
Expand Up @@ -249,6 +249,23 @@ class BaseGPUDevice::StreamGroupFactory {
VLOG(2) << "Created stream[" << stream_group_within_gpu
<< "] = " << group->compute;

#if TENSORFLOW_USE_ROCM
// ROCm streams are lightweight and will not necessarily trigger device
// queue init until they are first used. For optimal performance,
// compute and nccl streams must be immediate siblings.
group->nccl = new se::Stream(executor);
group->nccl->Init();
VLOG(2) << "Created nccl_stream[" << stream_group_within_gpu
<< "] = " << group->nccl;

// ROCm streams are lightweight and will not necessarily trigger device
// queue init until they are first used. For optimal performance,
// compute and nccl streams must be immediate siblings.
// Force underlying resource creation now.
group->compute->ThenWaitFor(group->nccl);
group->nccl->ThenWaitFor(group->compute);
#endif

group->host_to_device = new se::Stream(executor);
group->host_to_device->Init();
VLOG(2) << "Created host_to_device_stream[" << stream_group_within_gpu
Expand Down Expand Up @@ -371,8 +388,12 @@ Status BaseGPUDevice::Init(const SessionOptions& options) {
streams_.push_back(StreamGroupFactory::Global().GetOrCreate(
tf_gpu_id_, i, executor_, options.config.gpu_options()));
device_contexts_.push_back(new GPUDeviceContext(
i, streams_.back()->compute, streams_.back()->host_to_device,
streams_.back()->device_to_host, streams_.back()->device_to_device));
i, streams_.back()->compute,
#if TENSORFLOW_USE_ROCM
streams_.back()->nccl,
#endif
streams_.back()->host_to_device, streams_.back()->device_to_host,
streams_.back()->device_to_device));
}

em_ = EventMgrFactory::Singleton()->GetEventMgr(executor_,
Expand Down
3 changes: 3 additions & 0 deletions tensorflow/core/common_runtime/gpu/gpu_device.h
Expand Up @@ -137,6 +137,9 @@ class BaseGPUDevice : public LocalDevice {
friend class GPUDeviceTestHelper;
struct StreamGroup {
se::Stream* compute = nullptr;
#if TENSORFLOW_USE_ROCM
se::Stream* nccl = nullptr;
#endif
se::Stream* host_to_device = nullptr;
se::Stream* device_to_host = nullptr;
gtl::InlinedVector<se::Stream*, 4> device_to_device;
Expand Down
16 changes: 15 additions & 1 deletion tensorflow/core/common_runtime/gpu_device_context.h
Expand Up @@ -30,18 +30,28 @@ class GPUDeviceContext : public DeviceContext {
public:
// Does not take ownership of streams.
GPUDeviceContext(int stream_id, se::Stream* stream,
#if TENSORFLOW_USE_ROCM
se::Stream* nccl_stream,
#endif
se::Stream* host_to_device_stream,
se::Stream* device_to_host_stream,
gtl::InlinedVector<se::Stream*, 4> device_to_device_stream)
: stream_id_(stream_id),
stream_(stream),
#if TENSORFLOW_USE_ROCM
nccl_stream_(nccl_stream),
#endif
host_to_device_stream_(host_to_device_stream),
device_to_host_stream_(device_to_host_stream),
device_to_device_stream_(device_to_device_stream) {}
device_to_device_stream_(device_to_device_stream) {
}

~GPUDeviceContext() override {}

se::Stream* stream() const override { return stream_; }
#if TENSORFLOW_USE_ROCM
se::Stream* nccl_stream() const { return nccl_stream_; }
#endif
se::Stream* host_to_device_stream() const { return host_to_device_stream_; }
se::Stream* device_to_host_stream() const { return device_to_host_stream_; }
se::Stream* device_to_device_stream(int index) const {
Expand Down Expand Up @@ -72,6 +82,10 @@ class GPUDeviceContext : public DeviceContext {
// The default primary stream to use for this context.
// All the memory belongs to this stream.
se::Stream* stream_;
#if TENSORFLOW_USE_ROCM
// The stream to use for nccl operations.
se::Stream* nccl_stream_;
#endif
// The stream to use for copying data from host into GPU.
se::Stream* host_to_device_stream_;
// The stream to use for copying data from GPU to host.
Expand Down
19 changes: 17 additions & 2 deletions tensorflow/core/kernels/BUILD
Expand Up @@ -194,6 +194,17 @@ tf_cc_test(
],
)

# virtual targets since nested select statements not possible
tf_kernel_library(
name = "virtual_nccl",
deps = if_cuda(["@local_config_nccl//:nccl"]),
)

tf_kernel_library(
name = "virtual_rccl",
deps = if_rocm(["@local_config_rocm//rocm:rccl"]),
)

tf_kernel_library(
name = "collective_ops",
srcs = if_nccl([
Expand All @@ -213,7 +224,8 @@ tf_kernel_library(
"//tensorflow/core:protos_all_cc",
"//tensorflow/core/profiler/lib:traceme",
] + if_nccl([
"@local_config_nccl//:nccl",
":virtual_nccl",
":virtual_rccl",
"//tensorflow/core/nccl:nccl_lib",
]),
)
Expand Down Expand Up @@ -382,11 +394,14 @@ cc_library(

tf_kernel_library(
name = "nccl_kernels",
srcs = if_cuda([
srcs = if_cuda_or_rocm([
"nccl_ops.cc",
]),
deps = if_cuda([
"@local_config_nccl//:nccl",
]) + if_rocm([
"@local_config_rocm//rocm:rccl",
]) + if_cuda_or_rocm([
"//tensorflow/core/nccl:nccl_lib",
"//tensorflow/core:framework",
"//tensorflow/core:gpu_headers_lib",
Expand Down
4 changes: 2 additions & 2 deletions tensorflow/core/kernels/collective_nccl.cc
Expand Up @@ -14,7 +14,7 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/collective_nccl.h"

#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

#include "tensorflow/core/common_runtime/collective_util.h"
#include "tensorflow/core/nccl/nccl_manager.h"
Expand Down Expand Up @@ -79,4 +79,4 @@ const string NcclBase::NcclCollectiveKey(const string& exec_key, int step_id) {

} // namespace tensorflow

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
4 changes: 2 additions & 2 deletions tensorflow/core/kernels/collective_nccl.h
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
#include "tensorflow/core/framework/collective.h"

namespace tensorflow {
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

class NcclBase : public CollectiveImplementationInterface {
public:
Expand All @@ -44,7 +44,7 @@ class NcclBase : public CollectiveImplementationInterface {
const CollectiveParams* col_params_; // Not owned
};

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} // namespace tensorflow

#endif // TENSORFLOW_CORE_KERNELS_COLLECTIVE_NCCL_H_
9 changes: 4 additions & 5 deletions tensorflow/core/kernels/collective_nccl_broadcaster.cc
Expand Up @@ -14,7 +14,7 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/collective_nccl_broadcaster.h"

#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

#include "tensorflow/core/common_runtime/collective_util.h"
#include "tensorflow/core/nccl/nccl_manager.h"
Expand All @@ -32,9 +32,8 @@ void NcclBroadcaster::Run(StatusCallback done) {
string nccl_collective_key =
NcclCollectiveKey(col_ctx_->exec_key, col_ctx_->step_id);
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, col_ctx_->input, col_ctx_->output,
col_params_->default_rank, std::move(done));
compute_stream->parent(), compute_stream, gpu_info, col_ctx_->input,
col_ctx_->output, col_params_->default_rank, std::move(done));
VLOG(1)
<< "NcclBroadcast calling NcclManager::AddBroadcastSend/Recv num_tasks "
<< col_params_->group.num_tasks << " current task "
Expand Down Expand Up @@ -80,4 +79,4 @@ REGISTER_COLLECTIVE(NcclBroadcast, NcclBroadcaster);

} // namespace tensorflow

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
4 changes: 2 additions & 2 deletions tensorflow/core/kernels/collective_nccl_broadcaster.h
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
#include "tensorflow/core/kernels/collective_nccl.h"

namespace tensorflow {
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

class NcclBroadcaster : public NcclBase {
public:
Expand All @@ -29,7 +29,7 @@ class NcclBroadcaster : public NcclBase {
void Run(StatusCallback done) override;
};

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} // namespace tensorflow

#endif // TENSORFLOW_CORE_KERNELS_COLLECTIVE_NCCL_BROADCASTER_H_
9 changes: 4 additions & 5 deletions tensorflow/core/kernels/collective_nccl_gatherer.cc
Expand Up @@ -14,7 +14,7 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/collective_nccl_gatherer.h"

#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

#include "tensorflow/core/common_runtime/collective_util.h"
#include "tensorflow/core/nccl/nccl_manager.h"
Expand All @@ -32,9 +32,8 @@ void NcclGatherer::Run(StatusCallback done) {
string nccl_collective_key =
NcclCollectiveKey(col_ctx_->exec_key, col_ctx_->step_id);
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, col_ctx_->input, col_ctx_->output,
col_params_->default_rank, std::move(done));
compute_stream->parent(), compute_stream, gpu_info, col_ctx_->input,
col_ctx_->output, col_params_->default_rank, std::move(done));
VLOG(1) << "NcclGatherer calling NcclManager::AddToAllGather num_tasks "
<< col_params_->group.num_tasks << " current task "
<< col_params_->instance.task_names[col_params_->default_rank]
Expand Down Expand Up @@ -70,4 +69,4 @@ REGISTER_COLLECTIVE(NcclGather, NcclGatherer);

} // namespace tensorflow

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
4 changes: 2 additions & 2 deletions tensorflow/core/kernels/collective_nccl_gatherer.h
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
#include "tensorflow/core/kernels/collective_nccl.h"

namespace tensorflow {
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

class NcclGatherer : public NcclBase {
public:
Expand All @@ -29,7 +29,7 @@ class NcclGatherer : public NcclBase {
void Run(StatusCallback done) override;
};

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} // namespace tensorflow

#endif // TENSORFLOW_CORE_KERNELS_COLLECTIVE_NCCL_GATHERER_H_
8 changes: 4 additions & 4 deletions tensorflow/core/kernels/collective_nccl_reducer.cc
Expand Up @@ -14,7 +14,7 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/collective_nccl_reducer.h"

#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

#include "tensorflow/core/common_runtime/collective_util.h"
#include "tensorflow/core/nccl/nccl_manager.h"
Expand Down Expand Up @@ -109,8 +109,8 @@ void NcclReducer::Run(StatusCallback done) {
nccl_done.Notify();
};
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, col_ctx_->input, col_ctx_->output,
compute_stream->parent(), compute_stream, gpu_info,
col_ctx_->input, col_ctx_->output,
col_params_->default_rank, std::move(done_callback));
VLOG(1) << "NcclReducer calling NcclManager::AddToAllReduce num_tasks "
<< col_params_->group.num_tasks << " current task "
Expand Down Expand Up @@ -182,4 +182,4 @@ REGISTER_COLLECTIVE(NcclReduce, NcclReducer);

} // namespace tensorflow

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
4 changes: 2 additions & 2 deletions tensorflow/core/kernels/collective_nccl_reducer.h
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
#include "tensorflow/core/kernels/collective_nccl.h"

namespace tensorflow {
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

class NcclReducer : public NcclBase {
public:
Expand All @@ -29,7 +29,7 @@ class NcclReducer : public NcclBase {
void Run(StatusCallback done) override;
};

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} // namespace tensorflow

#endif // TENSORFLOW_CORE_KERNELS_COLLECTIVE_NCCL_REDUCER_H_
28 changes: 16 additions & 12 deletions tensorflow/core/kernels/nccl_ops.cc
Expand Up @@ -13,11 +13,15 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#if GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

#include <vector>

#if GOOGLE_CUDA
#include "third_party/nccl/nccl.h"
#elif TENSORFLOW_USE_ROCM
#include "rocm/include/rccl/rccl.h"
#endif
#include "tensorflow/core/framework/op_kernel.h"
#include "tensorflow/core/nccl/nccl_manager.h"

Expand Down Expand Up @@ -104,8 +108,8 @@ class NcclAllReduceOpKernel : public NcclReduceOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, input, output, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
input, output, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddToAllReduce(
std::move(participant),
Expand Down Expand Up @@ -136,8 +140,8 @@ class NcclReduceSendKernel : public NcclReduceOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, &c->input(0), /*output=*/nullptr, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
&c->input(0), /*output=*/nullptr, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddReduceSend(
std::move(participant),
Expand Down Expand Up @@ -173,8 +177,8 @@ class NcclReduceRecvKernel : public NcclReduceOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, input, output, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
input, output, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddReduceRecv(
std::move(participant),
Expand Down Expand Up @@ -208,8 +212,8 @@ class NcclBroadcastSendKernel : public NcclAsyncOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, &c->input(0), /*output=*/nullptr, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
&c->input(0), /*output=*/nullptr, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddBroadcastSend(
std::move(participant), {GetCollectiveKey(c),
Expand Down Expand Up @@ -245,8 +249,8 @@ class NcclBroadcastRecvKernel : public NcclAsyncOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, /*input=*/nullptr, output, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
/*input=*/nullptr, output, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddBroadcastRecv(
std::move(participant), {GetCollectiveKey(c),
Expand Down Expand Up @@ -276,4 +280,4 @@ REGISTER_KERNEL_BUILDER(Name("NcclReduce").Device(DEVICE_GPU), NcclStubKernel);
} // namespace
} // namespace tensorflow

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM

0 comments on commit 360b2e3

Please sign in to comment.