Skip to content

Commit

Permalink
Merge pull request #61632 from buptzyb:multistream-streammerge
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 630175692
  • Loading branch information
tensorflower-gardener committed May 2, 2024
2 parents 94b8e98 + 5aabb58 commit 7084c9f
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 46 deletions.
4 changes: 3 additions & 1 deletion tensorflow/core/common_runtime/gpu/gpu_device.cc
Expand Up @@ -459,7 +459,9 @@ BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name,
cpu_allocator_(cpu_allocator),
scoped_allocator_mgr_(new ScopedAllocatorMgr(name)),
tf_device_id_(tf_device_id),
sync_every_op_(sync_every_op) {
sync_every_op_(sync_every_op),
stream_merge_options_(
options.config.gpu_options().experimental().stream_merge_options()) {
// XLA device IDs for GPUs are arbitrary but must be unique, so we hash device
// names (which include a replica index even for multi-client).
set_xla_global_id(Fingerprint32(name) % std::numeric_limits<int32_t>::max());
Expand Down
13 changes: 13 additions & 0 deletions tensorflow/core/common_runtime/gpu/gpu_device.h
Expand Up @@ -181,6 +181,18 @@ class BaseGPUDevice : public LocalDevice {
// for the GPU or vGPU.
static std::optional<tsl::TfDeviceId> FindTfDeviceId(se::Stream* compute);

bool merge_host_to_device_stream() const override {
return stream_merge_options_.merge_host_to_device_stream();
}

bool merge_device_to_host_stream() const override {
return stream_merge_options_.merge_device_to_host_stream();
}

bool merge_device_to_device_stream() const override {
return stream_merge_options_.merge_device_to_device_stream();
}

protected:
Allocator* gpu_allocator_; // not owned
Allocator* cpu_allocator_; // not owned
Expand All @@ -207,6 +219,7 @@ class BaseGPUDevice : public LocalDevice {
int32 pending_cap_ = 0;
bool timestamped_allocator_ = false;
NodeFileWriter* node_file_writer_ = nullptr; // not owned
const GPUOptions::Experimental::StreamMergeOptions stream_merge_options_;

// Initialize scratch buffers used by Eigen.
Status InitScratchBuffers();
Expand Down
128 changes: 83 additions & 45 deletions tensorflow/core/common_runtime/gpu/gpu_util.cc
Expand Up @@ -227,19 +227,24 @@ void GPUUtil::DeviceToDeviceCopy(
done(s);
return;
}
auto send_device_to_device_stream =
static_cast<const GPUDeviceContext*>(send_dev_context)
->device_to_device_stream(dev_to_dev_stream_index);
if (send_device_to_device_stream == nullptr) {
done(errors::Internal("No send gpu copy-out-stream is available."));
return;
}
// Wait for the main stream on the sender to make sure the result is
// available.
s = send_device_to_device_stream->WaitFor(send_stream);
if (!s.ok()) {
done(s);
return;
se::Stream* send_device_to_device_stream = nullptr;
if (src->merge_device_to_device_stream()) {
send_device_to_device_stream = send_stream;
} else {
send_device_to_device_stream =
static_cast<const GPUDeviceContext*>(send_dev_context)
->device_to_device_stream(dev_to_dev_stream_index);
if (send_device_to_device_stream == nullptr) {
done(absl::AbortedError("No send gpu copy-out-stream is available."));
return;
}
// Wait for the main stream on the sender to make sure the result is
// available.
s = send_device_to_device_stream->WaitFor(send_stream);
if (!s.ok()) {
done(s);
return;
}
}

const int64_t total_bytes = input->TotalBytes();
Expand All @@ -264,10 +269,12 @@ void GPUUtil::DeviceToDeviceCopy(
// truly free.
// TODO(zhengxq): remove this dependency when we switch to a better way
// to make sure the memory is free.
s = send_device_to_device_stream->WaitFor(recv_stream);
if (!s.ok()) {
done(s);
return;
if (send_device_to_device_stream != recv_stream) {
s = send_device_to_device_stream->WaitFor(recv_stream);
if (!s.ok()) {
done(s);
return;
}
}

VLOG(2) << "src_ptr " << src_ptr << " dst_ptr " << dst_ptr;
Expand Down Expand Up @@ -322,18 +329,23 @@ void GPUUtil::CopyGPUTensorToCPU(Device* gpu_device,
return;
}

auto send_device_to_host_stream =
static_cast<const GPUDeviceContext*>(device_context)
->device_to_host_stream();
if (send_device_to_host_stream == nullptr) {
done(absl::InternalError("No send gpu copy-out-stream is available."));
return;
}
// Wait for the sender's main stream to make sure the data are available.
s = send_device_to_host_stream->WaitFor(send_stream);
if (!s.ok()) {
done(s);
return;
se::Stream* send_device_to_host_stream = nullptr;
if (gpu_device->merge_device_to_host_stream()) {
send_device_to_host_stream = send_stream;
} else {
send_device_to_host_stream =
static_cast<const GPUDeviceContext*>(device_context)
->device_to_host_stream();
if (send_device_to_host_stream == nullptr) {
done(absl::InternalError("No send gpu copy-out-stream is available."));
return;
}
// Wait for the sender's main stream to make sure the data are available.
s = send_device_to_host_stream->WaitFor(send_stream);
if (!s.ok()) {
done(s);
return;
}
}

#ifdef TF_GPU_USE_PJRT
Expand Down Expand Up @@ -398,20 +410,27 @@ void GPUUtil::CopyCPUTensorToGPU(const Tensor* cpu_tensor,
return;
}

auto recv_host_to_device_stream =
static_cast<const GPUDeviceContext*>(device_context)
->host_to_device_stream();
if (recv_host_to_device_stream == nullptr) {
done(errors::Internal("No send gpu copy-out-stream is available."));
return;
}
// Wait for the recv-stream to make sure the buffer is truly available.
if (sync_dst_compute) {
s = recv_host_to_device_stream->WaitFor(recv_stream);
if (!s.ok()) {
done(s);
const bool merge_host_to_device_stream =
gpu_device->merge_host_to_device_stream();
se::Stream* recv_host_to_device_stream = nullptr;
if (merge_host_to_device_stream) {
recv_host_to_device_stream = recv_stream;
} else {
recv_host_to_device_stream =
static_cast<const GPUDeviceContext*>(device_context)
->host_to_device_stream();
if (recv_host_to_device_stream == nullptr) {
done(absl::AbortedError("No send gpu copy-out-stream is available."));
return;
}
// Wait for the recv-stream to make sure the buffer is truly available.
if (sync_dst_compute) {
s = recv_host_to_device_stream->WaitFor(recv_stream);
if (!s.ok()) {
done(s);
return;
}
}
}

const int64_t total_bytes = cpu_tensor->TotalBytes();
Expand Down Expand Up @@ -457,19 +476,38 @@ void GPUUtil::CopyCPUTensorToGPU(const Tensor* cpu_tensor,
}
}

if (merge_host_to_device_stream) {
// It brings acceleration by moving these lines ahead of the event_mgr
// callback, because they mark the completion of this data copy, so that
// subsequent ops can be scheduled, without needing to wait for the
// callback. This is safe, because:
// 1) For `recv_host_to_device_stream->ok()`, it checks `Stream::status_`,
// which will not be modified by the event_mgr.
// 2) For `done(absl::OkStatus())`, it leads to the scheduling of
// subsequent ops. If one op needs to access the transferred data, it
// must be queued in the same stream as the copy, so there is a
// CUDA-promised dependency: the operations will not be executed until the
// copy is really finished.
if (!recv_host_to_device_stream->ok()) {
LOG(FATAL) << "CPU->GPU Memcpy failed"; // Crash OK
}
done(absl::OkStatus());
}
dev_info->event_mgr->ThenExecute(
recv_host_to_device_stream,
[recv_host_to_device_stream, done, input_ref, do_staging, staging_buffer,
host_memory_allocator]() {
host_memory_allocator, merge_host_to_device_stream]() {
if (do_staging) {
host_memory_allocator->DeallocateRaw(staging_buffer);
} else {
input_ref.Unref();
}
if (!recv_host_to_device_stream->ok()) {
LOG(FATAL) << "CPU->GPU Memcpy failed";
if (!merge_host_to_device_stream) {
if (!recv_host_to_device_stream->ok()) {
LOG(FATAL) << "CPU->GPU Memcpy failed"; // Crash OK
}
done(absl::OkStatus());
}
done(absl::OkStatus());
});
}

Expand Down
12 changes: 12 additions & 0 deletions tensorflow/core/framework/device.h
Expand Up @@ -193,6 +193,18 @@ class Device : public DeviceBase {
// Informs if this Device can be used as a caller in RemoteCall operation.
virtual bool IsRemoteCallAllowed() const;

// Whether to merge the host_to_device copy stream with the compute stream.
// Only useful for GPU devices.
virtual bool merge_host_to_device_stream() const { return false; }

// Whether to merge the device_to_host copy stream with the compute stream.
// Only useful for GPU devices.
virtual bool merge_device_to_host_stream() const { return false; }

// Whether to merge the device_to_device copy streams with the compute stream.
// Only useful for GPU devices.
virtual bool merge_device_to_device_stream() const { return false; }

protected:
void DeleteResourceMgr() {
delete rmgr_;
Expand Down
29 changes: 29 additions & 0 deletions tensorflow/core/protobuf/config.proto
Expand Up @@ -273,6 +273,35 @@ message GPUOptions {
// node_id for use when creating a PjRt GPU client with remote devices,
// which enumerates jobs*tasks from a ServerDef.
int32 node_id = 18;

// Whether to merge data transfer streams into the compute stream in the
// same stream group. Stream merging helps reduce the overhead caused by
// stream synchronization, especially when data transfers are frequent. For
// example, setting "merge_host_to_device_stream = true" will make the
// compute stream responsible for both computation and host to device memory
// copy.
message StreamMergeOptions {
// If true, the compute stream will be used for host_to_device copy as
// well. It's no longer necessary to record an event before the copy to
// let the copy stream wait for the compute stream to finish. There is
// also no need to wait for the copy to complete before executing the
// callback function.
bool merge_host_to_device_stream = 1;

// If true, the compute stream will be used for device_to_host copy as
// well. It's no longer necessary to record an event before the copy to
// let the copy stream wait for the compute stream to finish.
bool merge_device_to_host_stream = 2;

// If true, the compute stream will be used for device_to_device copy as
// well. It's no longer necessary to record an event before the copy to
// let the copy stream wait for the compute stream of the sending device
// to finish. There is also no need to wait for the compute stream of the
// receiving device to finish if the copy is within the same device.
bool merge_device_to_device_stream = 3;
}

StreamMergeOptions stream_merge_options = 19;
}

// Everything inside experimental is subject to change and is not subject
Expand Down
28 changes: 28 additions & 0 deletions tensorflow/tools/api/golden/v1/tensorflow.-g-p-u-options.pbtxt
Expand Up @@ -162,6 +162,13 @@ tf_proto {
label: LABEL_OPTIONAL
type: TYPE_INT32
}
field {
name: "stream_merge_options"
number: 19
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".tensorflow.GPUOptions.Experimental.StreamMergeOptions"
}
nested_type {
name: "VirtualDevices"
field {
Expand All @@ -183,6 +190,27 @@ tf_proto {
type: TYPE_INT32
}
}
nested_type {
name: "StreamMergeOptions"
field {
name: "merge_host_to_device_stream"
number: 1
label: LABEL_OPTIONAL
type: TYPE_BOOL
}
field {
name: "merge_device_to_host_stream"
number: 2
label: LABEL_OPTIONAL
type: TYPE_BOOL
}
field {
name: "merge_device_to_device_stream"
number: 3
label: LABEL_OPTIONAL
type: TYPE_BOOL
}
}
}
}
}

0 comments on commit 7084c9f

Please sign in to comment.