Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce byte array allocations when reading/writing packets #6023

Merged
merged 8 commits into from Feb 9, 2021
Expand Up @@ -3,14 +3,13 @@

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Globalization;
using System.IO;
using System.IO.Pipes;
using System.Diagnostics;
using System.Threading;
#if !FEATURE_APM
using System.Threading.Tasks;
#endif
using System.Runtime.InteropServices;
#if FEATURE_PIPE_SECURITY
using System.Security.Principal;
Expand Down Expand Up @@ -609,8 +608,27 @@ internal class NodeContext

/// <summary>
/// A buffer typically big enough to handle a packet body.
/// We use this as a convenient way to manage and cache a byte[] that's resized
/// automatically to fit our payload.
/// </summary>
private MemoryStream _readBufferMemoryStream;

/// <summary>
/// A reusable buffer for writing packets.
/// </summary>
private MemoryStream _writeBufferMemoryStream;

/// <summary>
/// A queue used for enqueuing packets to write to the stream asynchronously.
/// </summary>
private byte[] _smallReadBuffer;
private BlockingCollection<INodePacket> _packetWriteQueue = new BlockingCollection<INodePacket>();

/// <summary>
/// A task representing the last packet write, so we can chain packet writes one after another.
/// We want to queue up writing packets on a separate thread asynchronously, but serially.
/// Each task drains the <see cref="_packetWriteQueue"/>
/// </summary>
private Task _packetWriteDrainTask = Task.CompletedTask;

/// <summary>
/// Event indicating the node has terminated.
Expand Down Expand Up @@ -640,7 +658,9 @@ internal class NodeContext
_serverToClientStream = nodePipe;
_packetFactory = factory;
_headerByte = new byte[5]; // 1 for the packet type, 4 for the body length
_smallReadBuffer = new byte[1000]; // 1000 was just an average seen on one profile run.

_readBufferMemoryStream = new MemoryStream();
_writeBufferMemoryStream = new MemoryStream();
_nodeTerminated = new ManualResetEvent(false);
_terminateDelegate = terminateDelegate;
_sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();
Expand Down Expand Up @@ -685,16 +705,8 @@ public async Task RunPacketReadLoopAsync()
NodePacketType packetType = (NodePacketType)_headerByte[0];
int packetLength = BitConverter.ToInt32(_headerByte, 1);

byte[] packetData;
if (packetLength < _smallReadBuffer.Length)
{
packetData = _smallReadBuffer;
}
else
{
// Preallocated buffer is not large enough to hold the body. Allocate now, but don't hold it forever.
packetData = new byte[packetLength];
}
_readBufferMemoryStream.SetLength(packetLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();

try
{
Expand Down Expand Up @@ -728,54 +740,80 @@ public async Task RunPacketReadLoopAsync()
#endif

/// <summary>
/// Sends the specified packet to this node.
/// Sends the specified packet to this node asynchronously.
/// The method enqueues a task to write the packet and returns
/// immediately. This is because SendData() is on a hot path
/// under the primary lock and we want to minimize our time there.
KirillOsenkov marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
/// <param name="packet">The packet to send.</param>
public void SendData(INodePacket packet)
{
MemoryStream writeStream = new MemoryStream();
_packetWriteQueue.Add(packet);
DrainPacketQueue();
}

/// <summary>
/// Schedule a task to drain the packet write queue. We could have had a
/// dedicated thread that would pump the queue constantly, but
/// we don't want to allocate a dedicated thread per node (1MB stack)
/// </summary>
/// <remarks>Usually there'll be a single packet in the queue, but sometimes
/// a burst of SendData comes in, with 10-20 packets scheduled. In this case
/// the first scheduled task will drain all of them, and subsequent tasks
/// will run on an empty queue. I tried to write logic that avoids queueing
/// a new task if the queue is already being drained, but it didn't show any
/// improvement and made things more complicated.</remarks>
private void DrainPacketQueue()
{
// this lock is only necessary to protect a write to _packetWriteDrainTask field
lock (_packetWriteQueue)
{
// average latency between the moment this runs and when the delegate starts
// running is about 100-200 microseconds (unless there's thread pool saturation)
_packetWriteDrainTask = _packetWriteDrainTask.ContinueWith(_ =>
KirillOsenkov marked this conversation as resolved.
Show resolved Hide resolved
{
while (_packetWriteQueue.TryTake(out var packet))
{
SendDataCore(packet);
}
}, TaskScheduler.Default);
}
}

/// <summary>
/// Actually writes and sends the packet. This can't be called in parallel
/// because it reuses the _writeBufferMemoryStream, and this is why we use
/// the _packetWriteDrainTask to serially chain invocations one after another.
/// </summary>
/// <param name="packet">The packet to send.</param>
private void SendDataCore(INodePacket packet)
{
MemoryStream writeStream = _writeBufferMemoryStream;

// clear the buffer but keep the underlying capacity to avoid reallocations
writeStream.SetLength(0);

ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
try
{
writeStream.WriteByte((byte)packet.Type);

// Pad for the packet length
writeStream.Write(BitConverter.GetBytes((int)0), 0, 4);
WriteInt32(writeStream, 0);
packet.Translate(writeTranslator);

int writeStreamLength = (int)writeStream.Position;

// Now plug in the real packet length
writeStream.Position = 1;
writeStream.Write(BitConverter.GetBytes((int)writeStream.Length - 5), 0, 4);
WriteInt32(writeStream, writeStreamLength - 5);

byte[] writeStreamBuffer = writeStream.GetBuffer();

for (int i = 0; i < writeStream.Length; i += MaxPacketWriteSize)
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
{
int lengthToWrite = Math.Min((int)writeStream.Length - i, MaxPacketWriteSize);
if ((int)writeStream.Length - i <= MaxPacketWriteSize)
{
// We are done, write the last bit asynchronously. This is actually the general case for
// most packets in the build, and the asynchronous behavior here is desirable.
#if FEATURE_APM
_serverToClientStream.BeginWrite(writeStreamBuffer, i, lengthToWrite, PacketWriteComplete, null);
#else
_serverToClientStream.WriteAsync(writeStreamBuffer, i, lengthToWrite);
#endif
return;
}
else
{
// If this packet is longer that we can write in one go, then we need to break it up. We can't
// return out of this function and let the rest of the system continue because another operation
// might want to send data immediately afterward, and that could result in overlapping writes
// to the pipe on different threads.
#if FEATURE_APM
IAsyncResult result = _serverToClientStream.BeginWrite(writeStream.GetBuffer(), i, lengthToWrite, null, null);
_serverToClientStream.EndWrite(result);
#else
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
#endif
}
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
}
}
catch (IOException e)
Expand All @@ -789,6 +827,17 @@ public void SendData(INodePacket packet)
}
}

/// <summary>
/// Avoid having a BinaryWriter just to write a 4-byte int
/// </summary>
private void WriteInt32(MemoryStream stream, int value)
{
stream.WriteByte((byte)value);
stream.WriteByte((byte)(value >> 8));
stream.WriteByte((byte)(value >> 16));
stream.WriteByte((byte)(value >> 24));
}

/// <summary>
/// Closes the node's context, disconnecting it from the node.
/// </summary>
Expand Down Expand Up @@ -887,16 +936,10 @@ private void HeaderReadComplete(IAsyncResult result)
int packetLength = BitConverter.ToInt32(_headerByte, 1);
MSBuildEventSource.Log.PacketReadSize(packetLength);

byte[] packetData;
if (packetLength < _smallReadBuffer.Length)
{
packetData = _smallReadBuffer;
}
else
{
// Preallocated buffer is not large enough to hold the body. Allocate now, but don't hold it forever.
packetData = new byte[packetLength];
}
// Ensures the buffer is at least this length.
// It avoids reallocations if the buffer is already large enough.
_readBufferMemoryStream.SetLength(packetLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();

_clientToServerStream.BeginRead(packetData, 0, packetLength, BodyReadComplete, new Tuple<byte[], int>(packetData, packetLength));
}
Expand Down
25 changes: 21 additions & 4 deletions src/Shared/NodeEndpointOutOfProcBase.cs
Expand Up @@ -99,6 +99,16 @@ internal abstract class NodeEndpointOutOfProcBase : INodeEndpoint
/// </summary>
private SharedReadBuffer _sharedReadBuffer;

/// <summary>
/// A way to cache a byte array when writing out packets
/// </summary>
private MemoryStream _packetStream;

/// <summary>
/// A binary writer to help write into <see cref="_packetStream"/>
/// </summary>
private BinaryWriter _binaryWriter;

#endregion

#region INodeEndpoint Events
Expand Down Expand Up @@ -189,6 +199,9 @@ internal void InternalConstruct(string pipeName)
_asyncDataMonitor = new object();
_sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();

_packetStream = new MemoryStream();
_binaryWriter = new BinaryWriter(_packetStream);

#if FEATURE_PIPE_SECURITY && FEATURE_NAMED_PIPE_SECURITY_CONSTRUCTOR
if (!NativeMethodsShared.IsMono)
{
Expand Down Expand Up @@ -584,22 +597,26 @@ private void PacketPumpProc()
INodePacket packet;
while (localPacketQueue.TryDequeue(out packet))
{
MemoryStream packetStream = new MemoryStream();
var packetStream = _packetStream;
packetStream.SetLength(0);

ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(packetStream);

packetStream.WriteByte((byte)packet.Type);

// Pad for packet length
packetStream.Write(BitConverter.GetBytes((int)0), 0, 4);
_binaryWriter.Write(0);

// Reset the position in the write buffer.
packet.Translate(writeTranslator);

int packetStreamLength = (int)packetStream.Position;

// Now write in the actual packet length
packetStream.Position = 1;
packetStream.Write(BitConverter.GetBytes((int)packetStream.Length - 5), 0, 4);
_binaryWriter.Write(packetStreamLength - 5);

localWritePipe.Write(packetStream.GetBuffer(), 0, (int)packetStream.Length);
localWritePipe.Write(packetStream.GetBuffer(), 0, packetStreamLength);
}
}
catch (Exception e)
Expand Down