Skip to content

Commit

Permalink
Reduce byte array allocations when reading/writing packets (#6023)
Browse files Browse the repository at this point in the history
* Reduce byte array allocations when reading/writing packets

Byte arrays are a major source of LOH allocations when streaming logging events across nodes. Allocating a large MemoryStream once and then growing it as needed almost completely removes allocations for byte arrays.

This should significantly improve memory traffic during large builds.

* Write the last part of the packet synchronously on Mono.

If we use WriteAsync and don't await it, then subsequent WriteAsync may be called before the first continuation returns. If both calls share the same buffer and they overlap, we will overwrite the data in the buffer and cause junk to arrive at receiver.

* Make SendData write packets asynchronously.

This avoids blocking the main loop. Roughly equivalent to writing the packet asynchronously using fire-and-forget (BeginWrite).
  • Loading branch information
KirillOsenkov committed Feb 9, 2021
1 parent 1a0d8e8 commit de3b887
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 58 deletions.
Expand Up @@ -3,14 +3,13 @@

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Globalization;
using System.IO;
using System.IO.Pipes;
using System.Diagnostics;
using System.Threading;
#if !FEATURE_APM
using System.Threading.Tasks;
#endif
using System.Runtime.InteropServices;
#if FEATURE_PIPE_SECURITY
using System.Security.Principal;
Expand Down Expand Up @@ -609,8 +608,27 @@ internal class NodeContext

/// <summary>
/// A buffer typically big enough to handle a packet body.
/// We use this as a convenient way to manage and cache a byte[] that's resized
/// automatically to fit our payload.
/// </summary>
private MemoryStream _readBufferMemoryStream;

/// <summary>
/// A reusable buffer for writing packets.
/// </summary>
private MemoryStream _writeBufferMemoryStream;

/// <summary>
/// A queue used for enqueuing packets to write to the stream asynchronously.
/// </summary>
private byte[] _smallReadBuffer;
private BlockingCollection<INodePacket> _packetWriteQueue = new BlockingCollection<INodePacket>();

/// <summary>
/// A task representing the last packet write, so we can chain packet writes one after another.
/// We want to queue up writing packets on a separate thread asynchronously, but serially.
/// Each task drains the <see cref="_packetWriteQueue"/>
/// </summary>
private Task _packetWriteDrainTask = Task.CompletedTask;

/// <summary>
/// Event indicating the node has terminated.
Expand Down Expand Up @@ -640,7 +658,9 @@ internal class NodeContext
_serverToClientStream = nodePipe;
_packetFactory = factory;
_headerByte = new byte[5]; // 1 for the packet type, 4 for the body length
_smallReadBuffer = new byte[1000]; // 1000 was just an average seen on one profile run.

_readBufferMemoryStream = new MemoryStream();
_writeBufferMemoryStream = new MemoryStream();
_nodeTerminated = new ManualResetEvent(false);
_terminateDelegate = terminateDelegate;
_sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();
Expand Down Expand Up @@ -685,16 +705,8 @@ public async Task RunPacketReadLoopAsync()
NodePacketType packetType = (NodePacketType)_headerByte[0];
int packetLength = BitConverter.ToInt32(_headerByte, 1);

byte[] packetData;
if (packetLength < _smallReadBuffer.Length)
{
packetData = _smallReadBuffer;
}
else
{
// Preallocated buffer is not large enough to hold the body. Allocate now, but don't hold it forever.
packetData = new byte[packetLength];
}
_readBufferMemoryStream.SetLength(packetLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();

try
{
Expand Down Expand Up @@ -728,54 +740,81 @@ public async Task RunPacketReadLoopAsync()
#endif

/// <summary>
/// Sends the specified packet to this node.
/// Sends the specified packet to this node asynchronously.
/// The method enqueues a task to write the packet and returns
/// immediately. This is because SendData() is on a hot path
/// under the primary lock (BuildManager's _syncLock)
/// and we want to minimize our time there.
/// </summary>
/// <param name="packet">The packet to send.</param>
public void SendData(INodePacket packet)
{
MemoryStream writeStream = new MemoryStream();
_packetWriteQueue.Add(packet);
DrainPacketQueue();
}

/// <summary>
/// Schedule a task to drain the packet write queue. We could have had a
/// dedicated thread that would pump the queue constantly, but
/// we don't want to allocate a dedicated thread per node (1MB stack)
/// </summary>
/// <remarks>Usually there'll be a single packet in the queue, but sometimes
/// a burst of SendData comes in, with 10-20 packets scheduled. In this case
/// the first scheduled task will drain all of them, and subsequent tasks
/// will run on an empty queue. I tried to write logic that avoids queueing
/// a new task if the queue is already being drained, but it didn't show any
/// improvement and made things more complicated.</remarks>
private void DrainPacketQueue()
{
// this lock is only necessary to protect a write to _packetWriteDrainTask field
lock (_packetWriteQueue)
{
// average latency between the moment this runs and when the delegate starts
// running is about 100-200 microseconds (unless there's thread pool saturation)
_packetWriteDrainTask = _packetWriteDrainTask.ContinueWith(_ =>
{
while (_packetWriteQueue.TryTake(out var packet))
{
SendDataCore(packet);
}
}, TaskScheduler.Default);
}
}

/// <summary>
/// Actually writes and sends the packet. This can't be called in parallel
/// because it reuses the _writeBufferMemoryStream, and this is why we use
/// the _packetWriteDrainTask to serially chain invocations one after another.
/// </summary>
/// <param name="packet">The packet to send.</param>
private void SendDataCore(INodePacket packet)
{
MemoryStream writeStream = _writeBufferMemoryStream;

// clear the buffer but keep the underlying capacity to avoid reallocations
writeStream.SetLength(0);

ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
try
{
writeStream.WriteByte((byte)packet.Type);

// Pad for the packet length
writeStream.Write(BitConverter.GetBytes((int)0), 0, 4);
WriteInt32(writeStream, 0);
packet.Translate(writeTranslator);

int writeStreamLength = (int)writeStream.Position;

// Now plug in the real packet length
writeStream.Position = 1;
writeStream.Write(BitConverter.GetBytes((int)writeStream.Length - 5), 0, 4);
WriteInt32(writeStream, writeStreamLength - 5);

byte[] writeStreamBuffer = writeStream.GetBuffer();

for (int i = 0; i < writeStream.Length; i += MaxPacketWriteSize)
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
{
int lengthToWrite = Math.Min((int)writeStream.Length - i, MaxPacketWriteSize);
if ((int)writeStream.Length - i <= MaxPacketWriteSize)
{
// We are done, write the last bit asynchronously. This is actually the general case for
// most packets in the build, and the asynchronous behavior here is desirable.
#if FEATURE_APM
_serverToClientStream.BeginWrite(writeStreamBuffer, i, lengthToWrite, PacketWriteComplete, null);
#else
_serverToClientStream.WriteAsync(writeStreamBuffer, i, lengthToWrite);
#endif
return;
}
else
{
// If this packet is longer that we can write in one go, then we need to break it up. We can't
// return out of this function and let the rest of the system continue because another operation
// might want to send data immediately afterward, and that could result in overlapping writes
// to the pipe on different threads.
#if FEATURE_APM
IAsyncResult result = _serverToClientStream.BeginWrite(writeStream.GetBuffer(), i, lengthToWrite, null, null);
_serverToClientStream.EndWrite(result);
#else
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
#endif
}
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
}
}
catch (IOException e)
Expand All @@ -789,6 +828,17 @@ public void SendData(INodePacket packet)
}
}

/// <summary>
/// Avoid having a BinaryWriter just to write a 4-byte int
/// </summary>
private void WriteInt32(MemoryStream stream, int value)
{
stream.WriteByte((byte)value);
stream.WriteByte((byte)(value >> 8));
stream.WriteByte((byte)(value >> 16));
stream.WriteByte((byte)(value >> 24));
}

/// <summary>
/// Closes the node's context, disconnecting it from the node.
/// </summary>
Expand Down Expand Up @@ -887,16 +937,10 @@ private void HeaderReadComplete(IAsyncResult result)
int packetLength = BitConverter.ToInt32(_headerByte, 1);
MSBuildEventSource.Log.PacketReadSize(packetLength);

byte[] packetData;
if (packetLength < _smallReadBuffer.Length)
{
packetData = _smallReadBuffer;
}
else
{
// Preallocated buffer is not large enough to hold the body. Allocate now, but don't hold it forever.
packetData = new byte[packetLength];
}
// Ensures the buffer is at least this length.
// It avoids reallocations if the buffer is already large enough.
_readBufferMemoryStream.SetLength(packetLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();

_clientToServerStream.BeginRead(packetData, 0, packetLength, BodyReadComplete, new Tuple<byte[], int>(packetData, packetLength));
}
Expand Down
25 changes: 21 additions & 4 deletions src/Shared/NodeEndpointOutOfProcBase.cs
Expand Up @@ -99,6 +99,16 @@ internal abstract class NodeEndpointOutOfProcBase : INodeEndpoint
/// </summary>
private SharedReadBuffer _sharedReadBuffer;

/// <summary>
/// A way to cache a byte array when writing out packets
/// </summary>
private MemoryStream _packetStream;

/// <summary>
/// A binary writer to help write into <see cref="_packetStream"/>
/// </summary>
private BinaryWriter _binaryWriter;

#endregion

#region INodeEndpoint Events
Expand Down Expand Up @@ -189,6 +199,9 @@ internal void InternalConstruct(string pipeName)
_asyncDataMonitor = new object();
_sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();

_packetStream = new MemoryStream();
_binaryWriter = new BinaryWriter(_packetStream);

#if FEATURE_PIPE_SECURITY && FEATURE_NAMED_PIPE_SECURITY_CONSTRUCTOR
if (!NativeMethodsShared.IsMono)
{
Expand Down Expand Up @@ -590,22 +603,26 @@ private void PacketPumpProc()
INodePacket packet;
while (localPacketQueue.TryDequeue(out packet))
{
MemoryStream packetStream = new MemoryStream();
var packetStream = _packetStream;
packetStream.SetLength(0);

ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(packetStream);

packetStream.WriteByte((byte)packet.Type);

// Pad for packet length
packetStream.Write(BitConverter.GetBytes((int)0), 0, 4);
_binaryWriter.Write(0);

// Reset the position in the write buffer.
packet.Translate(writeTranslator);

int packetStreamLength = (int)packetStream.Position;

// Now write in the actual packet length
packetStream.Position = 1;
packetStream.Write(BitConverter.GetBytes((int)packetStream.Length - 5), 0, 4);
_binaryWriter.Write(packetStreamLength - 5);

localWritePipe.Write(packetStream.GetBuffer(), 0, (int)packetStream.Length);
localWritePipe.Write(packetStream.GetBuffer(), 0, packetStreamLength);
}
}
catch (Exception e)
Expand Down

1 comment on commit de3b887

@uweigand
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this commit changed the encoding of the packet length header field from native byte order to always little-endian (which causes a problem on big-endian systems). See #6204 for more details.

Please sign in to comment.