Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make TensorFlow output allocations asynchronous when using NCCL backend. #3464

Merged
merged 3 commits into from Apr 7, 2022

Conversation

romerojosh
Copy link
Collaborator

@romerojosh romerojosh commented Mar 10, 2022

Checklist before submitting

  • Did you read the contributor guide?
  • Did you update the docs?
  • Did you write any tests to validate this change?
  • Did you update the CHANGELOG, if this change affects users?

Description

Some developers have reported performance issues with the alltoall implementation in a TF workflow. With profiling, I found that one source of performance loss is in the implementation of the allocation of the output buffer in TF. In the current implementation, the output allocation triggers a device sync (https://github.com/horovod/horovod/blob/master/horovod/tensorflow/mpi_ops.cc#L341), which stalls the GPU launch pipeline and introduces some unneeded latency. This sync was also reported in #3457. This type of launch stalling and associated latency is particularly problematic for alltoall since it is typically used in the critical path of the model and usually is not overlapped with other work.

This PR uses CUDA events and CUDA stream wait operations to replace the CPU sync on output allocations for TF when using the NCCL backend. It is similar to work done in #2963 where CUDA events were used to replace CPU syncs for input data dependencies.

Beyond just alltoall, I extended this asynchronous handling of the output allocation to the other ops that would benefit (e.g., allgather and reducescatter).

@github-actions
Copy link

github-actions bot commented Mar 10, 2022

Unit Test Results

     802 files  ±0       802 suites  ±0   9h 27m 35s ⏱️ - 34m 5s
     756 tests ±0       713 ✔️ ±0       43 💤 ±0  0 ±0 
18 665 runs  ±0  13 382 ✔️ ±0  5 283 💤 ±0  0 ±0 

Results for commit e334a17. ± Comparison against base commit 4111d6b.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Mar 10, 2022

Unit Test Results (with flaky tests)

     886 files  ±0       886 suites  ±0   9h 49m 34s ⏱️ - 33m 38s
     756 tests ±0       713 ✔️ ±0       43 💤 ±0  0 ±0 
20 861 runs  ±0  14 714 ✔️ ±0  6 147 💤 ±0  0 ±0 

Results for commit e334a17. ± Comparison against base commit 4111d6b.

♻️ This comment has been updated with latest results.

Signed-off-by: Josh Romero <joshr@nvidia.com>
Signed-off-by: Josh Romero <joshr@nvidia.com>
@@ -582,6 +582,62 @@ Status NCCLBroadcast::Execute(std::vector<TensorTableEntry>& entries,
entries, true, nccl_op_context_.error_check_callback_);
}

Status NCCLAllgather::AllocateOutput(std::vector<TensorTableEntry>& entries,
const Response& response,
int64_t**& entry_component_sizes,
Copy link
Collaborator

@nvcastet nvcastet Apr 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you passing the 2d array and 1d array pointers by reference? You are not modifying the pointer address and passing them by copy has no perf penalty since sizeof(argtype)==size(ptrType).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the existing interface for this function:

virtual Status AllocateOutput(std::vector<TensorTableEntry>& entries,
const Response& response,
int64_t**& entry_component_sizes,
int*& recvcounts);

For this PR, I am just adding specializations to use the events for the scheduling.

}

for (int i = 1; i < world_size; ++i) {
sdispls[i] = sdispls[i-1] + sendcounts[i-1];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For displacements, the index is the rank id here?
You are only doing (world_size-1) because you skip yourself?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is computing displacements, so sdispls[0] is always 0.

Also, this is just a specialization for NCCL of a function that already exists here:

template <typename T>
Status PrepareOutputAndParams(TensorTableEntry& e,
std::vector<T>& sdispls,
std::vector<T>& rdispls,
std::vector<T>& sendcounts,
std::vector<T>& recvcounts) {
auto& process_set = global_state_->process_set_table.Get(e.process_set_id);
auto world_size = process_set.controller->GetSize();
const auto& splits = e.splits;
std::vector<int32_t> recvsplits;
// Perform alltoall of splits to get expected receive splits
process_set.controller->AlltoallGetRecvSplits(splits, recvsplits);
// Every tensor participating in Alltoall operation may have different
// first dimension size, but the rest of dimensions are same for all
// tensors. Here we get shape of tensor sliced by first dimension.
TensorShape slice_shape;
for (int i = 1; i < e.tensor->shape().dims(); ++i) {
slice_shape.AddDim(e.tensor->shape().dim_size(i));
}
int64_t slice_num_elements = slice_shape.num_elements();
// Prepare send/recvcounts and displacements for Alltoallv
sdispls.resize(world_size);
rdispls.resize(world_size);
sendcounts.resize(world_size);
recvcounts.resize(world_size);
size_t output_first_dim = 0;
for (int i = 0; i < world_size; ++i) {
sendcounts[i] = splits[i] * slice_num_elements;
recvcounts[i] = recvsplits[i] * slice_num_elements;
output_first_dim += recvsplits[i];
}
for (int i = 1; i < world_size; ++i) {
sdispls[i] = sdispls[i-1] + sendcounts[i-1];
rdispls[i] = rdispls[i-1] + recvcounts[i-1];
}
// Allocate output
TensorShape output_shape;
output_shape.AddDim(output_first_dim);
output_shape.AppendShape(slice_shape);
Status status = e.context->AllocateOutput(output_shape, &e.output);
if (!status.ok()) {
return status;
}
// Allocate and fill received_splits output
TensorShape received_splits_shape;
received_splits_shape.AddDim(recvsplits.size());
Status rstatus = e.context->AllocateOutput(1, received_splits_shape,
&e.received_splits);
if (!rstatus.ok()) {
return rstatus;
}
auto* target_pointer = reinterpret_cast<int32_t*>(
const_cast<void*>(e.received_splits->data()));
std::copy(recvsplits.cbegin(), recvsplits.cend(), target_pointer);
return Status::OK();
}
};
. The only difference is adding the CUDA event handling.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense!

@romerojosh romerojosh merged commit f9d7f77 into horovod:master Apr 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants