Skip to content

Commit

Permalink
Reduce byte array allocations when reading/writing packets
Browse files Browse the repository at this point in the history
Byte arrays are a major source of LOH allocations when streaming logging events across nodes. Allocating a large MemoryStream once and then growing it as needed almost completely removes allocations for byte arrays.

This should significantly improve memory traffic during large builds.
  • Loading branch information
KirillOsenkov committed Jan 11, 2021
1 parent 70d3538 commit b7dde7d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 37 deletions.
Expand Up @@ -609,8 +609,20 @@ internal class NodeContext

/// <summary>
/// A buffer typically big enough to handle a packet body.
/// We use this as a convenient way to manage and cache a byte[] that's resized
/// automatically to fit our payload.
/// </summary>
private byte[] _smallReadBuffer;
private MemoryStream _readBufferMemoryStream;

/// <summary>
/// A buffer for writing packets.
/// </summary>
private MemoryStream _writeBufferMemoryStream;

/// <summary>
/// A BinaryWriter to assist writing bytes to the buffer.
/// </summary>
private BinaryWriter _writeBufferStreamWriter;

/// <summary>
/// Event indicating the node has terminated.
Expand Down Expand Up @@ -640,7 +652,11 @@ internal class NodeContext
_serverToClientStream = nodePipe;
_packetFactory = factory;
_headerByte = new byte[5]; // 1 for the packet type, 4 for the body length
_smallReadBuffer = new byte[1000]; // 1000 was just an average seen on one profile run.

// packets get this large so avoid reallocations
_readBufferMemoryStream = new MemoryStream(MaxPacketWriteSize);
_writeBufferMemoryStream = new MemoryStream(MaxPacketWriteSize);
_writeBufferStreamWriter = new BinaryWriter(_writeBufferMemoryStream);
_nodeTerminated = new ManualResetEvent(false);
_terminateDelegate = terminateDelegate;
_sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();
Expand Down Expand Up @@ -685,16 +701,8 @@ public async Task RunPacketReadLoopAsync()
NodePacketType packetType = (NodePacketType)_headerByte[0];
int packetLength = BitConverter.ToInt32(_headerByte, 1);

byte[] packetData;
if (packetLength < _smallReadBuffer.Length)
{
packetData = _smallReadBuffer;
}
else
{
// Preallocated buffer is not large enough to hold the body. Allocate now, but don't hold it forever.
packetData = new byte[packetLength];
}
_readBufferMemoryStream.SetLength(packetLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();

try
{
Expand Down Expand Up @@ -733,26 +741,30 @@ public async Task RunPacketReadLoopAsync()
/// <param name="packet">The packet to send.</param>
public void SendData(INodePacket packet)
{
MemoryStream writeStream = new MemoryStream();
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
// clear the buffer but keep the underlying capacity to avoid reallocations
_writeBufferMemoryStream.SetLength(0);

ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(_writeBufferMemoryStream);
try
{
writeStream.WriteByte((byte)packet.Type);
_writeBufferMemoryStream.WriteByte((byte)packet.Type);

// Pad for the packet length
writeStream.Write(BitConverter.GetBytes((int)0), 0, 4);
_writeBufferStreamWriter.Write(0);
packet.Translate(writeTranslator);

int writeStreamLength = (int)_writeBufferMemoryStream.Position;

// Now plug in the real packet length
writeStream.Position = 1;
writeStream.Write(BitConverter.GetBytes((int)writeStream.Length - 5), 0, 4);
_writeBufferMemoryStream.Position = 1;
_writeBufferStreamWriter.Write(writeStreamLength - 5);

byte[] writeStreamBuffer = writeStream.GetBuffer();
byte[] writeStreamBuffer = _writeBufferMemoryStream.GetBuffer();

for (int i = 0; i < writeStream.Length; i += MaxPacketWriteSize)
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
{
int lengthToWrite = Math.Min((int)writeStream.Length - i, MaxPacketWriteSize);
if ((int)writeStream.Length - i <= MaxPacketWriteSize)
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
if (writeStreamLength - i <= MaxPacketWriteSize)
{
// We are done, write the last bit asynchronously. This is actually the general case for
// most packets in the build, and the asynchronous behavior here is desirable.
Expand All @@ -770,7 +782,7 @@ public void SendData(INodePacket packet)
// might want to send data immediately afterward, and that could result in overlapping writes
// to the pipe on different threads.
#if FEATURE_APM
IAsyncResult result = _serverToClientStream.BeginWrite(writeStream.GetBuffer(), i, lengthToWrite, null, null);
IAsyncResult result = _serverToClientStream.BeginWrite(writeStreamBuffer, i, lengthToWrite, null, null);
_serverToClientStream.EndWrite(result);
#else
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
Expand Down Expand Up @@ -887,16 +899,10 @@ private void HeaderReadComplete(IAsyncResult result)
int packetLength = BitConverter.ToInt32(_headerByte, 1);
MSBuildEventSource.Log.PacketReadSize(packetLength);

byte[] packetData;
if (packetLength < _smallReadBuffer.Length)
{
packetData = _smallReadBuffer;
}
else
{
// Preallocated buffer is not large enough to hold the body. Allocate now, but don't hold it forever.
packetData = new byte[packetLength];
}
// Ensures the buffer is at least this length.
// It avoids reallocations if the buffer is already large enough.
_readBufferMemoryStream.SetLength(packetLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();

_clientToServerStream.BeginRead(packetData, 0, packetLength, BodyReadComplete, new Tuple<byte[], int>(packetData, packetLength));
}
Expand Down
26 changes: 22 additions & 4 deletions src/Shared/NodeEndpointOutOfProcBase.cs
Expand Up @@ -99,6 +99,16 @@ internal abstract class NodeEndpointOutOfProcBase : INodeEndpoint
/// </summary>
private SharedReadBuffer _sharedReadBuffer;

/// <summary>
/// A way to cache a byte array when writing out packets
/// </summary>
private MemoryStream _packetStream;

/// <summary>
/// A binary writer to help write into <see cref="_packetStream"/>
/// </summary>
private BinaryWriter _binaryWriter;

#endregion

#region INodeEndpoint Events
Expand Down Expand Up @@ -189,6 +199,10 @@ internal void InternalConstruct(string pipeName)
_asyncDataMonitor = new object();
_sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();

// packets get at least this large
_packetStream = new MemoryStream(1048576);
_binaryWriter = new BinaryWriter(_packetStream);

#if FEATURE_PIPE_SECURITY && FEATURE_NAMED_PIPE_SECURITY_CONSTRUCTOR
if (!NativeMethodsShared.IsMono)
{
Expand Down Expand Up @@ -584,22 +598,26 @@ private void PacketPumpProc()
INodePacket packet;
while (localPacketQueue.TryDequeue(out packet))
{
MemoryStream packetStream = new MemoryStream();
var packetStream = _packetStream;
packetStream.SetLength(0);

ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(packetStream);

packetStream.WriteByte((byte)packet.Type);

// Pad for packet length
packetStream.Write(BitConverter.GetBytes((int)0), 0, 4);
_binaryWriter.Write(0);

// Reset the position in the write buffer.
packet.Translate(writeTranslator);

int packetStreamLength = (int)packetStream.Position;

// Now write in the actual packet length
packetStream.Position = 1;
packetStream.Write(BitConverter.GetBytes((int)packetStream.Length - 5), 0, 4);
_binaryWriter.Write(packetStreamLength - 5);

localWritePipe.Write(packetStream.GetBuffer(), 0, (int)packetStream.Length);
localWritePipe.Write(packetStream.GetBuffer(), 0, packetStreamLength);
}
}
catch (Exception e)
Expand Down

0 comments on commit b7dde7d

Please sign in to comment.