Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR #61632: Allow merging compute-copy streams #66555

Merged
merged 12 commits into from May 2, 2024
4 changes: 3 additions & 1 deletion tensorflow/core/common_runtime/gpu/gpu_device.cc
Expand Up @@ -459,7 +459,9 @@ BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name,
cpu_allocator_(cpu_allocator),
scoped_allocator_mgr_(new ScopedAllocatorMgr(name)),
tf_device_id_(tf_device_id),
sync_every_op_(sync_every_op) {
sync_every_op_(sync_every_op),
stream_merge_options_(
options.config.gpu_options().experimental().stream_merge_options()) {
// XLA device IDs for GPUs are arbitrary but must be unique, so we hash device
// names (which include a replica index even for multi-client).
set_xla_global_id(Fingerprint32(name) % std::numeric_limits<int32_t>::max());
Expand Down
13 changes: 13 additions & 0 deletions tensorflow/core/common_runtime/gpu/gpu_device.h
Expand Up @@ -181,6 +181,18 @@ class BaseGPUDevice : public LocalDevice {
// for the GPU or vGPU.
static std::optional<tsl::TfDeviceId> FindTfDeviceId(se::Stream* compute);

bool merge_host_to_device_stream() const override {
return stream_merge_options_.merge_host_to_device_stream();
}

bool merge_device_to_host_stream() const override {
return stream_merge_options_.merge_device_to_host_stream();
}

bool merge_device_to_device_stream() const override {
return stream_merge_options_.merge_device_to_device_stream();
}

protected:
Allocator* gpu_allocator_; // not owned
Allocator* cpu_allocator_; // not owned
Expand All @@ -207,6 +219,7 @@ class BaseGPUDevice : public LocalDevice {
int32 pending_cap_ = 0;
bool timestamped_allocator_ = false;
NodeFileWriter* node_file_writer_ = nullptr; // not owned
const GPUOptions::Experimental::StreamMergeOptions stream_merge_options_;

// Initialize scratch buffers used by Eigen.
Status InitScratchBuffers();
Expand Down
128 changes: 83 additions & 45 deletions tensorflow/core/common_runtime/gpu/gpu_util.cc
Expand Up @@ -227,19 +227,24 @@ void GPUUtil::DeviceToDeviceCopy(
done(s);
return;
}
auto send_device_to_device_stream =
static_cast<const GPUDeviceContext*>(send_dev_context)
->device_to_device_stream(dev_to_dev_stream_index);
if (send_device_to_device_stream == nullptr) {
done(errors::Internal("No send gpu copy-out-stream is available."));
return;
}
// Wait for the main stream on the sender to make sure the result is
// available.
s = send_device_to_device_stream->WaitFor(send_stream);
if (!s.ok()) {
done(s);
return;
se::Stream* send_device_to_device_stream = nullptr;
if (src->merge_device_to_device_stream()) {
send_device_to_device_stream = send_stream;
} else {
send_device_to_device_stream =
static_cast<const GPUDeviceContext*>(send_dev_context)
->device_to_device_stream(dev_to_dev_stream_index);
if (send_device_to_device_stream == nullptr) {
done(absl::AbortedError("No send gpu copy-out-stream is available."));
return;
}
// Wait for the main stream on the sender to make sure the result is
// available.
s = send_device_to_device_stream->WaitFor(send_stream);
if (!s.ok()) {
done(s);
return;
}
}

const int64_t total_bytes = input->TotalBytes();
Expand All @@ -264,10 +269,12 @@ void GPUUtil::DeviceToDeviceCopy(
// truly free.
// TODO(zhengxq): remove this dependency when we switch to a better way
// to make sure the memory is free.
s = send_device_to_device_stream->WaitFor(recv_stream);
if (!s.ok()) {
done(s);
return;
if (send_device_to_device_stream != recv_stream) {
s = send_device_to_device_stream->WaitFor(recv_stream);
if (!s.ok()) {
done(s);
return;
}
}

VLOG(2) << "src_ptr " << src_ptr << " dst_ptr " << dst_ptr;
Expand Down Expand Up @@ -322,18 +329,23 @@ void GPUUtil::CopyGPUTensorToCPU(Device* gpu_device,
return;
}

auto send_device_to_host_stream =
static_cast<const GPUDeviceContext*>(device_context)
->device_to_host_stream();
if (send_device_to_host_stream == nullptr) {
done(absl::InternalError("No send gpu copy-out-stream is available."));
return;
}
// Wait for the sender's main stream to make sure the data are available.
s = send_device_to_host_stream->WaitFor(send_stream);
if (!s.ok()) {
done(s);
return;
se::Stream* send_device_to_host_stream = nullptr;
if (gpu_device->merge_device_to_host_stream()) {
send_device_to_host_stream = send_stream;
} else {
send_device_to_host_stream =
static_cast<const GPUDeviceContext*>(device_context)
->device_to_host_stream();
if (send_device_to_host_stream == nullptr) {
done(absl::InternalError("No send gpu copy-out-stream is available."));
return;
}
// Wait for the sender's main stream to make sure the data are available.
s = send_device_to_host_stream->WaitFor(send_stream);
if (!s.ok()) {
done(s);
return;
}
}

#ifdef TF_GPU_USE_PJRT
Expand Down Expand Up @@ -398,20 +410,27 @@ void GPUUtil::CopyCPUTensorToGPU(const Tensor* cpu_tensor,
return;
}

auto recv_host_to_device_stream =
static_cast<const GPUDeviceContext*>(device_context)
->host_to_device_stream();
if (recv_host_to_device_stream == nullptr) {
done(errors::Internal("No send gpu copy-out-stream is available."));
return;
}
// Wait for the recv-stream to make sure the buffer is truly available.
if (sync_dst_compute) {
s = recv_host_to_device_stream->WaitFor(recv_stream);
if (!s.ok()) {
done(s);
const bool merge_host_to_device_stream =
gpu_device->merge_host_to_device_stream();
se::Stream* recv_host_to_device_stream = nullptr;
if (merge_host_to_device_stream) {
recv_host_to_device_stream = recv_stream;
} else {
recv_host_to_device_stream =
static_cast<const GPUDeviceContext*>(device_context)
->host_to_device_stream();
if (recv_host_to_device_stream == nullptr) {
done(absl::AbortedError("No send gpu copy-out-stream is available."));
return;
}
// Wait for the recv-stream to make sure the buffer is truly available.
if (sync_dst_compute) {
s = recv_host_to_device_stream->WaitFor(recv_stream);
if (!s.ok()) {
done(s);
return;
}
}
}

const int64_t total_bytes = cpu_tensor->TotalBytes();
Expand Down Expand Up @@ -457,19 +476,38 @@ void GPUUtil::CopyCPUTensorToGPU(const Tensor* cpu_tensor,
}
}

if (merge_host_to_device_stream) {
// It brings acceleration by moving these lines ahead of the event_mgr
// callback, because they mark the completion of this data copy, so that
// subsequent ops can be scheduled, without needing to wait for the
// callback. This is safe, because:
// 1) For `recv_host_to_device_stream->ok()`, it checks `Stream::status_`,
// which will not be modified by the event_mgr.
// 2) For `done(absl::OkStatus())`, it leads to the scheduling of
// subsequent ops. If one op needs to access the transferred data, it
// must be queued in the same stream as the copy, so there is a
// CUDA-promised dependency: the operations will not be executed until the
// copy is really finished.
if (!recv_host_to_device_stream->ok()) {
LOG(FATAL) << "CPU->GPU Memcpy failed"; // Crash OK
}
done(absl::OkStatus());
}
dev_info->event_mgr->ThenExecute(
recv_host_to_device_stream,
[recv_host_to_device_stream, done, input_ref, do_staging, staging_buffer,
host_memory_allocator]() {
host_memory_allocator, merge_host_to_device_stream]() {
if (do_staging) {
host_memory_allocator->DeallocateRaw(staging_buffer);
} else {
input_ref.Unref();
}
if (!recv_host_to_device_stream->ok()) {
LOG(FATAL) << "CPU->GPU Memcpy failed";
if (!merge_host_to_device_stream) {
if (!recv_host_to_device_stream->ok()) {
LOG(FATAL) << "CPU->GPU Memcpy failed"; // Crash OK
}
done(absl::OkStatus());
}
done(absl::OkStatus());
});
}

Expand Down
12 changes: 12 additions & 0 deletions tensorflow/core/framework/device.h
Expand Up @@ -193,6 +193,18 @@ class Device : public DeviceBase {
// Informs if this Device can be used as a caller in RemoteCall operation.
virtual bool IsRemoteCallAllowed() const;

// Whether to merge the host_to_device copy stream with the compute stream.
// Only useful for GPU devices.
virtual bool merge_host_to_device_stream() const { return false; }

// Whether to merge the device_to_host copy stream with the compute stream.
// Only useful for GPU devices.
virtual bool merge_device_to_host_stream() const { return false; }

// Whether to merge the device_to_device copy streams with the compute stream.
// Only useful for GPU devices.
virtual bool merge_device_to_device_stream() const { return false; }

protected:
void DeleteResourceMgr() {
delete rmgr_;
Expand Down
29 changes: 29 additions & 0 deletions tensorflow/core/protobuf/config.proto
Expand Up @@ -273,6 +273,35 @@ message GPUOptions {
// node_id for use when creating a PjRt GPU client with remote devices,
// which enumerates jobs*tasks from a ServerDef.
int32 node_id = 18;

// Whether to merge data transfer streams into the compute stream in the
// same stream group. Stream merging helps reduce the overhead caused by
// stream synchronization, especially when data transfers are frequent. For
// example, setting "merge_host_to_device_stream = true" will make the
// compute stream responsible for both computation and host to device memory
// copy.
message StreamMergeOptions {
// If true, the compute stream will be used for host_to_device copy as
// well. It's no longer necessary to record an event before the copy to
// let the copy stream wait for the compute stream to finish. There is
// also no need to wait for the copy to complete before executing the
// callback function.
bool merge_host_to_device_stream = 1;

// If true, the compute stream will be used for device_to_host copy as
// well. It's no longer necessary to record an event before the copy to
// let the copy stream wait for the compute stream to finish.
bool merge_device_to_host_stream = 2;

// If true, the compute stream will be used for device_to_device copy as
// well. It's no longer necessary to record an event before the copy to
// let the copy stream wait for the compute stream of the sending device
// to finish. There is also no need to wait for the compute stream of the
// receiving device to finish if the copy is within the same device.
bool merge_device_to_device_stream = 3;
}

StreamMergeOptions stream_merge_options = 19;
}

// Everything inside experimental is subject to change and is not subject
Expand Down
28 changes: 28 additions & 0 deletions tensorflow/tools/api/golden/v1/tensorflow.-g-p-u-options.pbtxt
Expand Up @@ -162,6 +162,13 @@ tf_proto {
label: LABEL_OPTIONAL
type: TYPE_INT32
}
field {
name: "stream_merge_options"
number: 19
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".tensorflow.GPUOptions.Experimental.StreamMergeOptions"
}
nested_type {
name: "VirtualDevices"
field {
Expand All @@ -183,6 +190,27 @@ tf_proto {
type: TYPE_INT32
}
}
nested_type {
name: "StreamMergeOptions"
field {
name: "merge_host_to_device_stream"
number: 1
label: LABEL_OPTIONAL
type: TYPE_BOOL
}
field {
name: "merge_device_to_host_stream"
number: 2
label: LABEL_OPTIONAL
type: TYPE_BOOL
}
field {
name: "merge_device_to_device_stream"
number: 3
label: LABEL_OPTIONAL
type: TYPE_BOOL
}
}
}
}
}