Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ROCm] r1.15 rccl upstream patch #34532

Merged
merged 3 commits into from Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 23 additions & 2 deletions tensorflow/core/common_runtime/gpu/gpu_device.cc
Expand Up @@ -249,6 +249,23 @@ class BaseGPUDevice::StreamGroupFactory {
VLOG(2) << "Created stream[" << stream_group_within_gpu
<< "] = " << group->compute;

#if TENSORFLOW_USE_ROCM
// ROCm streams are lightweight and will not necessarily trigger device
// queue init until they are first used. For optimal performance,
// compute and nccl streams must be immediate siblings.
group->nccl = new se::Stream(executor);
group->nccl->Init();
VLOG(2) << "Created nccl_stream[" << stream_group_within_gpu
<< "] = " << group->nccl;

// ROCm streams are lightweight and will not necessarily trigger device
// queue init until they are first used. For optimal performance,
// compute and nccl streams must be immediate siblings.
// Force underlying resource creation now.
group->compute->ThenWaitFor(group->nccl);
group->nccl->ThenWaitFor(group->compute);
#endif

group->host_to_device = new se::Stream(executor);
group->host_to_device->Init();
VLOG(2) << "Created host_to_device_stream[" << stream_group_within_gpu
Expand Down Expand Up @@ -371,8 +388,12 @@ Status BaseGPUDevice::Init(const SessionOptions& options) {
streams_.push_back(StreamGroupFactory::Global().GetOrCreate(
tf_gpu_id_, i, executor_, options.config.gpu_options()));
device_contexts_.push_back(new GPUDeviceContext(
i, streams_.back()->compute, streams_.back()->host_to_device,
streams_.back()->device_to_host, streams_.back()->device_to_device));
i, streams_.back()->compute,
#if TENSORFLOW_USE_ROCM
streams_.back()->nccl,
#endif
streams_.back()->host_to_device, streams_.back()->device_to_host,
streams_.back()->device_to_device));
}

em_ = EventMgrFactory::Singleton()->GetEventMgr(executor_,
Expand Down
3 changes: 3 additions & 0 deletions tensorflow/core/common_runtime/gpu/gpu_device.h
Expand Up @@ -137,6 +137,9 @@ class BaseGPUDevice : public LocalDevice {
friend class GPUDeviceTestHelper;
struct StreamGroup {
se::Stream* compute = nullptr;
#if TENSORFLOW_USE_ROCM
se::Stream* nccl = nullptr;
#endif
se::Stream* host_to_device = nullptr;
se::Stream* device_to_host = nullptr;
gtl::InlinedVector<se::Stream*, 4> device_to_device;
Expand Down
16 changes: 15 additions & 1 deletion tensorflow/core/common_runtime/gpu_device_context.h
Expand Up @@ -30,18 +30,28 @@ class GPUDeviceContext : public DeviceContext {
public:
// Does not take ownership of streams.
GPUDeviceContext(int stream_id, se::Stream* stream,
#if TENSORFLOW_USE_ROCM
se::Stream* nccl_stream,
#endif
se::Stream* host_to_device_stream,
se::Stream* device_to_host_stream,
gtl::InlinedVector<se::Stream*, 4> device_to_device_stream)
: stream_id_(stream_id),
stream_(stream),
#if TENSORFLOW_USE_ROCM
nccl_stream_(nccl_stream),
#endif
host_to_device_stream_(host_to_device_stream),
device_to_host_stream_(device_to_host_stream),
device_to_device_stream_(device_to_device_stream) {}
device_to_device_stream_(device_to_device_stream) {
}

~GPUDeviceContext() override {}

se::Stream* stream() const override { return stream_; }
#if TENSORFLOW_USE_ROCM
se::Stream* nccl_stream() const { return nccl_stream_; }
#endif
se::Stream* host_to_device_stream() const { return host_to_device_stream_; }
se::Stream* device_to_host_stream() const { return device_to_host_stream_; }
se::Stream* device_to_device_stream(int index) const {
Expand Down Expand Up @@ -72,6 +82,10 @@ class GPUDeviceContext : public DeviceContext {
// The default primary stream to use for this context.
// All the memory belongs to this stream.
se::Stream* stream_;
#if TENSORFLOW_USE_ROCM
// The stream to use for nccl operations.
se::Stream* nccl_stream_;
#endif
// The stream to use for copying data from host into GPU.
se::Stream* host_to_device_stream_;
// The stream to use for copying data from GPU to host.
Expand Down
19 changes: 17 additions & 2 deletions tensorflow/core/kernels/BUILD
Expand Up @@ -194,6 +194,17 @@ tf_cc_test(
],
)

# virtual targets since nested select statements not possible
tf_kernel_library(
name = "virtual_nccl",
deps = if_cuda(["@local_config_nccl//:nccl"]),
)

tf_kernel_library(
name = "virtual_rccl",
deps = if_rocm(["@local_config_rocm//rocm:rccl"]),
)

tf_kernel_library(
name = "collective_ops",
srcs = if_nccl([
Expand All @@ -213,7 +224,8 @@ tf_kernel_library(
"//tensorflow/core:protos_all_cc",
"//tensorflow/core/profiler/lib:traceme",
] + if_nccl([
"@local_config_nccl//:nccl",
":virtual_nccl",
":virtual_rccl",
"//tensorflow/core/nccl:nccl_lib",
]),
)
Expand Down Expand Up @@ -382,11 +394,14 @@ cc_library(

tf_kernel_library(
name = "nccl_kernels",
srcs = if_cuda([
srcs = if_cuda_or_rocm([
"nccl_ops.cc",
]),
deps = if_cuda([
"@local_config_nccl//:nccl",
]) + if_rocm([
"@local_config_rocm//rocm:rccl",
]) + if_cuda_or_rocm([
"//tensorflow/core/nccl:nccl_lib",
"//tensorflow/core:framework",
"//tensorflow/core:gpu_headers_lib",
Expand Down
4 changes: 2 additions & 2 deletions tensorflow/core/kernels/collective_nccl.cc
Expand Up @@ -14,7 +14,7 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/collective_nccl.h"

#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

#include "tensorflow/core/common_runtime/collective_util.h"
#include "tensorflow/core/nccl/nccl_manager.h"
Expand Down Expand Up @@ -79,4 +79,4 @@ const string NcclBase::NcclCollectiveKey(const string& exec_key, int step_id) {

} // namespace tensorflow

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
4 changes: 2 additions & 2 deletions tensorflow/core/kernels/collective_nccl.h
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
#include "tensorflow/core/framework/collective.h"

namespace tensorflow {
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

class NcclBase : public CollectiveImplementationInterface {
public:
Expand All @@ -44,7 +44,7 @@ class NcclBase : public CollectiveImplementationInterface {
const CollectiveParams* col_params_; // Not owned
};

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} // namespace tensorflow

#endif // TENSORFLOW_CORE_KERNELS_COLLECTIVE_NCCL_H_
9 changes: 4 additions & 5 deletions tensorflow/core/kernels/collective_nccl_broadcaster.cc
Expand Up @@ -14,7 +14,7 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/collective_nccl_broadcaster.h"

#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

#include "tensorflow/core/common_runtime/collective_util.h"
#include "tensorflow/core/nccl/nccl_manager.h"
Expand All @@ -32,9 +32,8 @@ void NcclBroadcaster::Run(StatusCallback done) {
string nccl_collective_key =
NcclCollectiveKey(col_ctx_->exec_key, col_ctx_->step_id);
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, col_ctx_->input, col_ctx_->output,
col_params_->default_rank, std::move(done));
compute_stream->parent(), compute_stream, gpu_info, col_ctx_->input,
col_ctx_->output, col_params_->default_rank, std::move(done));
VLOG(1)
<< "NcclBroadcast calling NcclManager::AddBroadcastSend/Recv num_tasks "
<< col_params_->group.num_tasks << " current task "
Expand Down Expand Up @@ -80,4 +79,4 @@ REGISTER_COLLECTIVE(NcclBroadcast, NcclBroadcaster);

} // namespace tensorflow

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
4 changes: 2 additions & 2 deletions tensorflow/core/kernels/collective_nccl_broadcaster.h
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
#include "tensorflow/core/kernels/collective_nccl.h"

namespace tensorflow {
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

class NcclBroadcaster : public NcclBase {
public:
Expand All @@ -29,7 +29,7 @@ class NcclBroadcaster : public NcclBase {
void Run(StatusCallback done) override;
};

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} // namespace tensorflow

#endif // TENSORFLOW_CORE_KERNELS_COLLECTIVE_NCCL_BROADCASTER_H_
9 changes: 4 additions & 5 deletions tensorflow/core/kernels/collective_nccl_gatherer.cc
Expand Up @@ -14,7 +14,7 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/collective_nccl_gatherer.h"

#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

#include "tensorflow/core/common_runtime/collective_util.h"
#include "tensorflow/core/nccl/nccl_manager.h"
Expand All @@ -32,9 +32,8 @@ void NcclGatherer::Run(StatusCallback done) {
string nccl_collective_key =
NcclCollectiveKey(col_ctx_->exec_key, col_ctx_->step_id);
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, col_ctx_->input, col_ctx_->output,
col_params_->default_rank, std::move(done));
compute_stream->parent(), compute_stream, gpu_info, col_ctx_->input,
col_ctx_->output, col_params_->default_rank, std::move(done));
VLOG(1) << "NcclGatherer calling NcclManager::AddToAllGather num_tasks "
<< col_params_->group.num_tasks << " current task "
<< col_params_->instance.task_names[col_params_->default_rank]
Expand Down Expand Up @@ -70,4 +69,4 @@ REGISTER_COLLECTIVE(NcclGather, NcclGatherer);

} // namespace tensorflow

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
4 changes: 2 additions & 2 deletions tensorflow/core/kernels/collective_nccl_gatherer.h
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
#include "tensorflow/core/kernels/collective_nccl.h"

namespace tensorflow {
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

class NcclGatherer : public NcclBase {
public:
Expand All @@ -29,7 +29,7 @@ class NcclGatherer : public NcclBase {
void Run(StatusCallback done) override;
};

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} // namespace tensorflow

#endif // TENSORFLOW_CORE_KERNELS_COLLECTIVE_NCCL_GATHERER_H_
8 changes: 4 additions & 4 deletions tensorflow/core/kernels/collective_nccl_reducer.cc
Expand Up @@ -14,7 +14,7 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/collective_nccl_reducer.h"

#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

#include "tensorflow/core/common_runtime/collective_util.h"
#include "tensorflow/core/nccl/nccl_manager.h"
Expand Down Expand Up @@ -109,8 +109,8 @@ void NcclReducer::Run(StatusCallback done) {
nccl_done.Notify();
};
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, col_ctx_->input, col_ctx_->output,
compute_stream->parent(), compute_stream, gpu_info,
col_ctx_->input, col_ctx_->output,
col_params_->default_rank, std::move(done_callback));
VLOG(1) << "NcclReducer calling NcclManager::AddToAllReduce num_tasks "
<< col_params_->group.num_tasks << " current task "
Expand Down Expand Up @@ -182,4 +182,4 @@ REGISTER_COLLECTIVE(NcclReduce, NcclReducer);

} // namespace tensorflow

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
4 changes: 2 additions & 2 deletions tensorflow/core/kernels/collective_nccl_reducer.h
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
#include "tensorflow/core/kernels/collective_nccl.h"

namespace tensorflow {
#ifdef GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

class NcclReducer : public NcclBase {
public:
Expand All @@ -29,7 +29,7 @@ class NcclReducer : public NcclBase {
void Run(StatusCallback done) override;
};

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
} // namespace tensorflow

#endif // TENSORFLOW_CORE_KERNELS_COLLECTIVE_NCCL_REDUCER_H_
28 changes: 16 additions & 12 deletions tensorflow/core/kernels/nccl_ops.cc
Expand Up @@ -13,11 +13,15 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#if GOOGLE_CUDA
#if GOOGLE_CUDA || TENSORFLOW_USE_ROCM

#include <vector>

#if GOOGLE_CUDA
#include "third_party/nccl/nccl.h"
#elif TENSORFLOW_USE_ROCM
#include "rocm/include/rccl/rccl.h"
#endif
#include "tensorflow/core/framework/op_kernel.h"
#include "tensorflow/core/nccl/nccl_manager.h"

Expand Down Expand Up @@ -104,8 +108,8 @@ class NcclAllReduceOpKernel : public NcclReduceOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, input, output, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
input, output, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddToAllReduce(
std::move(participant),
Expand Down Expand Up @@ -136,8 +140,8 @@ class NcclReduceSendKernel : public NcclReduceOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, &c->input(0), /*output=*/nullptr, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
&c->input(0), /*output=*/nullptr, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddReduceSend(
std::move(participant),
Expand Down Expand Up @@ -173,8 +177,8 @@ class NcclReduceRecvKernel : public NcclReduceOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, input, output, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
input, output, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddReduceRecv(
std::move(participant),
Expand Down Expand Up @@ -208,8 +212,8 @@ class NcclBroadcastSendKernel : public NcclAsyncOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, &c->input(0), /*output=*/nullptr, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
&c->input(0), /*output=*/nullptr, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddBroadcastSend(
std::move(participant), {GetCollectiveKey(c),
Expand Down Expand Up @@ -245,8 +249,8 @@ class NcclBroadcastRecvKernel : public NcclAsyncOpBase {
auto* compute_stream = c->op_device_context()->stream();
auto* gpu_info = c->device()->tensorflow_gpu_device_info();
auto participant = absl::make_unique<NcclManager::Participant>(
compute_stream->parent(), compute_stream, gpu_info->event_mgr,
gpu_info->gpu_id, /*input=*/nullptr, output, /*global_rank=*/-1,
compute_stream->parent(), compute_stream, gpu_info,
/*input=*/nullptr, output, /*global_rank=*/-1,
std::move(actual_done));
NcclManager::instance()->AddBroadcastRecv(
std::move(participant), {GetCollectiveKey(c),
Expand Down Expand Up @@ -276,4 +280,4 @@ REGISTER_KERNEL_BUILDER(Name("NcclReduce").Device(DEVICE_GPU), NcclStubKernel);
} // namespace
} // namespace tensorflow

#endif // GOOGLE_CUDA
#endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM