Skip to content

Commit

Permalink
PR #61632: Allow merging compute-copy streams
Browse files Browse the repository at this point in the history
Imported from GitHub PR #61632

This PR works as a part of the whole Multi-Stream feature in TF, which is proposed in #61185.

Allow merging the host_to_device/device_to_host/device_to_device data copy streams into the compute stream in one stream group. This is useful to reduce the overhead caused by GPU stream synchronization, especially when data transfers are frequent. Another benefit is, for host_to_device copy, merging streams allows early scheduling of subsequent ops, doesn't have to wait until the data copy is really finished.

As a part of the multi-stream feature, it can help multi-stream reach a much higher throughput. Taking our proto models as an example, the original model inference throughput is **1524** samples/second, and **2229** samples/ second with multi-stream, and **2471** samples/second further with stream-merging.

However, stream-merging can also be used separately. We got inference throughput gain from **1028** samples/second to **1187** samples/second by enabling stream-merging.

Please refer to the 'Performance' part in our [document](https://docs.google.com/document/d/1yL3lWk_iFKqLTyekkuaiKXZ78I0lPmD5kM1fghHRs4Y/edit?usp=sharing) for detailed and more experiment results.
Copybara import of the project:

--
9e51f38 by Robin Zhang <robinz@nvidia.com>:

Allow merging compute-copy streams

--
a45967f by Robin Zhang <robinz@nvidia.com>:

Improve coding style

--
ccae79b by Robin Zhang <robinz@nvidia.com>:

Rename stream_merge_options_

--
332e1fe by Robin Zhang <robinz@nvidia.com>:

Put stream checking out of callback

--
4a0c789 by Robin Zhang <robinz@nvidia.com>:

Move StreamMergeOptions to Experimental

--
efe56d7 by Robin Zhang <robinz@nvidia.com>:

add some comments

Merging this change closes #61632

Reverts changelist 525613555

FUTURE_COPYBARA_INTEGRATE_REVIEW=#61632 from buptzyb:multistream-streammerge 5aabb58
PiperOrigin-RevId: 628618396
  • Loading branch information
buptzyb authored and tensorflower-gardener committed May 2, 2024
1 parent 0155a11 commit ba97100
Show file tree
Hide file tree
Showing 21 changed files with 294 additions and 428 deletions.
Expand Up @@ -934,16 +934,3 @@ func.func @gather_nd(%arg0: tensor<*x!tf_type.resource<tensor<80xf32>>>,
} : (tensor<*x!tf_type.resource<tensor<80xf32>>>, tensor<i32>) -> tensor<1x80xf32>
func.return
}

// -----

// Check conflicting device names
// CHECK: "tf_device.cluster"()
// CHECK: "tf.opA"()
// CHECK: "tf.opB"()
// CHECK-NOT: device =
func.func @do_nothing_if_short_names_conflict() {
"tf.opA"() { _xla_compile_device_type = "TPU", device = "/replica:1/task:2/device:TPU:1"} : () -> ()
"tf.opB"() { _xla_compile_device_type = "TPU", device = "/replica:3/task:4/device:TPU:1"} : () -> ()
func.return
}
Expand Up @@ -200,58 +200,6 @@ LogicalResult HasValidDeviceTypeAttribute(Block* block) {
return success();
}

LogicalResult HasValidDeviceAttribute(Block* block) {
absl::flat_hash_map<std::string, OpDevice> devices;
for (Operation& op : *block) {
auto device_attr = op.getAttrOfType<StringAttr>(kDeviceAttr);
if (!device_attr || device_attr.str().empty()) continue;
tensorflow::DeviceNameUtils::ParsedName parsed;
if (!tensorflow::DeviceNameUtils::ParseFullOrLocalName(device_attr.str(),
&parsed)) {
op.emitWarning() << "Invalid device name " << device_attr.str();
return mlir::failure();
}

std::string device_local_name =
tensorflow::DeviceNameUtils::LocalName(parsed.type, parsed.id);

if (device_local_name.empty()) continue;

// It is possible that a device may be same Local Name but
// different fullname. Devices with same Local name are identical
// so they should only be added once in 'devices'.
// and we need the fullname which is longer since longer name has more
// information such as task, replica, job etc. An example fullname is
// "/job:foo_bar/replica:1/task:2/device:GPU:3"
if (devices.count(device_local_name)) {
std::string device1 = devices[device_local_name].device;
std::string device2 = device_attr.str();
// Is either of the two devices just a substring of the other? If
// not, we treat them as different devices, and we have a collision.
if (device1.find(device2) == std::string::npos &&
device2.find(device1) == std::string::npos) {
Operation* previous_op = devices[device_local_name].op;

LOG_FIRST_N(WARNING, 1)
<< "Found two devices with same local name " << device_local_name
<< " but conflicting fullname: " << device1 << " and " << device2
<< ".";
LOG_FIRST_N(WARNING, 1)
<< "Previous assignment came from op: "
<< tensorflow::OpAsString(*previous_op)
<< ". Current op is: " << tensorflow::OpAsString(op);
}
// Always keep the longer name.
if (devices[device_local_name].device.size() < device_attr.str().size()) {
devices[device_local_name] = {&op, device_attr.str()};
}
} else {
devices.insert({device_local_name, {&op, device_attr.str()}});
}
}
return success();
}

// Collects and clusters ops based on `_replication_info` attribute. Returns
// an error in case of invalid compilation or replication attribute(s).
LogicalResult CollectAndGroupClusterOps(Block* block, ClusterMap* clusters) {
Expand All @@ -260,8 +208,6 @@ LogicalResult CollectAndGroupClusterOps(Block* block, ClusterMap* clusters) {

LogicalResult result = HasValidDeviceTypeAttribute(block);
if (failed(result)) return result;
result = HasValidDeviceAttribute(block);
if (failed(result)) return result;

for (Operation& op : *block) {
LogicalResult result =
Expand Down
4 changes: 3 additions & 1 deletion tensorflow/core/common_runtime/gpu/gpu_device.cc
Expand Up @@ -459,7 +459,9 @@ BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name,
cpu_allocator_(cpu_allocator),
scoped_allocator_mgr_(new ScopedAllocatorMgr(name)),
tf_device_id_(tf_device_id),
sync_every_op_(sync_every_op) {
sync_every_op_(sync_every_op),
stream_merge_options_(
options.config.gpu_options().experimental().stream_merge_options()) {
// XLA device IDs for GPUs are arbitrary but must be unique, so we hash device
// names (which include a replica index even for multi-client).
set_xla_global_id(Fingerprint32(name) % std::numeric_limits<int32_t>::max());
Expand Down
13 changes: 13 additions & 0 deletions tensorflow/core/common_runtime/gpu/gpu_device.h
Expand Up @@ -181,6 +181,18 @@ class BaseGPUDevice : public LocalDevice {
// for the GPU or vGPU.
static std::optional<tsl::TfDeviceId> FindTfDeviceId(se::Stream* compute);

bool merge_host_to_device_stream() const override {
return stream_merge_options_.merge_host_to_device_stream();
}

bool merge_device_to_host_stream() const override {
return stream_merge_options_.merge_device_to_host_stream();
}

bool merge_device_to_device_stream() const override {
return stream_merge_options_.merge_device_to_device_stream();
}

protected:
Allocator* gpu_allocator_; // not owned
Allocator* cpu_allocator_; // not owned
Expand All @@ -207,6 +219,7 @@ class BaseGPUDevice : public LocalDevice {
int32 pending_cap_ = 0;
bool timestamped_allocator_ = false;
NodeFileWriter* node_file_writer_ = nullptr; // not owned
const GPUOptions::Experimental::StreamMergeOptions stream_merge_options_;

// Initialize scratch buffers used by Eigen.
Status InitScratchBuffers();
Expand Down
128 changes: 83 additions & 45 deletions tensorflow/core/common_runtime/gpu/gpu_util.cc
Expand Up @@ -227,19 +227,24 @@ void GPUUtil::DeviceToDeviceCopy(
done(s);
return;
}
auto send_device_to_device_stream =
static_cast<const GPUDeviceContext*>(send_dev_context)
->device_to_device_stream(dev_to_dev_stream_index);
if (send_device_to_device_stream == nullptr) {
done(errors::Internal("No send gpu copy-out-stream is available."));
return;
}
// Wait for the main stream on the sender to make sure the result is
// available.
s = send_device_to_device_stream->WaitFor(send_stream);
if (!s.ok()) {
done(s);
return;
se::Stream* send_device_to_device_stream = nullptr;
if (src->merge_device_to_device_stream()) {
send_device_to_device_stream = send_stream;
} else {
send_device_to_device_stream =
static_cast<const GPUDeviceContext*>(send_dev_context)
->device_to_device_stream(dev_to_dev_stream_index);
if (send_device_to_device_stream == nullptr) {
done(absl::AbortedError("No send gpu copy-out-stream is available."));
return;
}
// Wait for the main stream on the sender to make sure the result is
// available.
s = send_device_to_device_stream->WaitFor(send_stream);
if (!s.ok()) {
done(s);
return;
}
}

const int64_t total_bytes = input->TotalBytes();
Expand All @@ -264,10 +269,12 @@ void GPUUtil::DeviceToDeviceCopy(
// truly free.
// TODO(zhengxq): remove this dependency when we switch to a better way
// to make sure the memory is free.
s = send_device_to_device_stream->WaitFor(recv_stream);
if (!s.ok()) {
done(s);
return;
if (send_device_to_device_stream != recv_stream) {
s = send_device_to_device_stream->WaitFor(recv_stream);
if (!s.ok()) {
done(s);
return;
}
}

VLOG(2) << "src_ptr " << src_ptr << " dst_ptr " << dst_ptr;
Expand Down Expand Up @@ -322,18 +329,23 @@ void GPUUtil::CopyGPUTensorToCPU(Device* gpu_device,
return;
}

auto send_device_to_host_stream =
static_cast<const GPUDeviceContext*>(device_context)
->device_to_host_stream();
if (send_device_to_host_stream == nullptr) {
done(absl::InternalError("No send gpu copy-out-stream is available."));
return;
}
// Wait for the sender's main stream to make sure the data are available.
s = send_device_to_host_stream->WaitFor(send_stream);
if (!s.ok()) {
done(s);
return;
se::Stream* send_device_to_host_stream = nullptr;
if (gpu_device->merge_device_to_host_stream()) {
send_device_to_host_stream = send_stream;
} else {
send_device_to_host_stream =
static_cast<const GPUDeviceContext*>(device_context)
->device_to_host_stream();
if (send_device_to_host_stream == nullptr) {
done(absl::InternalError("No send gpu copy-out-stream is available."));
return;
}
// Wait for the sender's main stream to make sure the data are available.
s = send_device_to_host_stream->WaitFor(send_stream);
if (!s.ok()) {
done(s);
return;
}
}

#ifdef TF_GPU_USE_PJRT
Expand Down Expand Up @@ -398,20 +410,27 @@ void GPUUtil::CopyCPUTensorToGPU(const Tensor* cpu_tensor,
return;
}

auto recv_host_to_device_stream =
static_cast<const GPUDeviceContext*>(device_context)
->host_to_device_stream();
if (recv_host_to_device_stream == nullptr) {
done(errors::Internal("No send gpu copy-out-stream is available."));
return;
}
// Wait for the recv-stream to make sure the buffer is truly available.
if (sync_dst_compute) {
s = recv_host_to_device_stream->WaitFor(recv_stream);
if (!s.ok()) {
done(s);
const bool merge_host_to_device_stream =
gpu_device->merge_host_to_device_stream();
se::Stream* recv_host_to_device_stream = nullptr;
if (merge_host_to_device_stream) {
recv_host_to_device_stream = recv_stream;
} else {
recv_host_to_device_stream =
static_cast<const GPUDeviceContext*>(device_context)
->host_to_device_stream();
if (recv_host_to_device_stream == nullptr) {
done(absl::AbortedError("No send gpu copy-out-stream is available."));
return;
}
// Wait for the recv-stream to make sure the buffer is truly available.
if (sync_dst_compute) {
s = recv_host_to_device_stream->WaitFor(recv_stream);
if (!s.ok()) {
done(s);
return;
}
}
}

const int64_t total_bytes = cpu_tensor->TotalBytes();
Expand Down Expand Up @@ -457,19 +476,38 @@ void GPUUtil::CopyCPUTensorToGPU(const Tensor* cpu_tensor,
}
}

if (merge_host_to_device_stream) {
// It brings acceleration by moving these lines ahead of the event_mgr
// callback, because they mark the completion of this data copy, so that
// subsequent ops can be scheduled, without needing to wait for the
// callback. This is safe, because:
// 1) For `recv_host_to_device_stream->ok()`, it checks `Stream::status_`,
// which will not be modified by the event_mgr.
// 2) For `done(absl::OkStatus())`, it leads to the scheduling of
// subsequent ops. If one op needs to access the transferred data, it
// must be queued in the same stream as the copy, so there is a
// CUDA-promised dependency: the operations will not be executed until the
// copy is really finished.
if (!recv_host_to_device_stream->ok()) {
LOG(FATAL) << "CPU->GPU Memcpy failed"; // Crash OK
}
done(absl::OkStatus());
}
dev_info->event_mgr->ThenExecute(
recv_host_to_device_stream,
[recv_host_to_device_stream, done, input_ref, do_staging, staging_buffer,
host_memory_allocator]() {
host_memory_allocator, merge_host_to_device_stream]() {
if (do_staging) {
host_memory_allocator->DeallocateRaw(staging_buffer);
} else {
input_ref.Unref();
}
if (!recv_host_to_device_stream->ok()) {
LOG(FATAL) << "CPU->GPU Memcpy failed";
if (!merge_host_to_device_stream) {
if (!recv_host_to_device_stream->ok()) {
LOG(FATAL) << "CPU->GPU Memcpy failed"; // Crash OK
}
done(absl::OkStatus());
}
done(absl::OkStatus());
});
}

Expand Down
12 changes: 12 additions & 0 deletions tensorflow/core/framework/device.h
Expand Up @@ -193,6 +193,18 @@ class Device : public DeviceBase {
// Informs if this Device can be used as a caller in RemoteCall operation.
virtual bool IsRemoteCallAllowed() const;

// Whether to merge the host_to_device copy stream with the compute stream.
// Only useful for GPU devices.
virtual bool merge_host_to_device_stream() const { return false; }

// Whether to merge the device_to_host copy stream with the compute stream.
// Only useful for GPU devices.
virtual bool merge_device_to_host_stream() const { return false; }

// Whether to merge the device_to_device copy streams with the compute stream.
// Only useful for GPU devices.
virtual bool merge_device_to_device_stream() const { return false; }

protected:
void DeleteResourceMgr() {
delete rmgr_;
Expand Down
2 changes: 1 addition & 1 deletion tensorflow/core/ops/ops.pbtxt
@@ -1,4 +1,4 @@
go/nodeserialize
go/debugproto
op {
name: "Abort"
attr {
Expand Down
29 changes: 29 additions & 0 deletions tensorflow/core/protobuf/config.proto
Expand Up @@ -273,6 +273,35 @@ message GPUOptions {
// node_id for use when creating a PjRt GPU client with remote devices,
// which enumerates jobs*tasks from a ServerDef.
int32 node_id = 18;

// Whether to merge data transfer streams into the compute stream in the
// same stream group. Stream merging helps reduce the overhead caused by
// stream synchronization, especially when data transfers are frequent. For
// example, setting "merge_host_to_device_stream = true" will make the
// compute stream responsible for both computation and host to device memory
// copy.
message StreamMergeOptions {
// If true, the compute stream will be used for host_to_device copy as
// well. It's no longer necessary to record an event before the copy to
// let the copy stream wait for the compute stream to finish. There is
// also no need to wait for the copy to complete before executing the
// callback function.
bool merge_host_to_device_stream = 1;

// If true, the compute stream will be used for device_to_host copy as
// well. It's no longer necessary to record an event before the copy to
// let the copy stream wait for the compute stream to finish.
bool merge_device_to_host_stream = 2;

// If true, the compute stream will be used for device_to_device copy as
// well. It's no longer necessary to record an event before the copy to
// let the copy stream wait for the compute stream of the sending device
// to finish. There is also no need to wait for the compute stream of the
// receiving device to finish if the copy is within the same device.
bool merge_device_to_device_stream = 3;
}

StreamMergeOptions stream_merge_options = 19;
}

// Everything inside experimental is subject to change and is not subject
Expand Down
1 change: 1 addition & 0 deletions tensorflow/python/lib/core/BUILD
Expand Up @@ -298,6 +298,7 @@ cc_library(
# deps = [
# ":ndarray_tensor",
# ":ndarray_tensor_bridge",
# ":safe_pyobject_ptr",
# "@com_google_absl//absl/log:check",
# "@com_google_absl//absl/status",
# "//third_party/clif/python:clif",
Expand Down

0 comments on commit ba97100

Please sign in to comment.