Skip to content

Commit

Permalink
Revert "Reduce byte array allocations when reading/writing packets (#…
Browse files Browse the repository at this point in the history
…6023)"

This reverts commit de3b887.
  • Loading branch information
benvillalobos committed Feb 19, 2021
1 parent f0eebf2 commit 6a68543
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 119 deletions.
Expand Up @@ -3,13 +3,14 @@

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Globalization;
using System.IO;
using System.IO.Pipes;
using System.Diagnostics;
using System.Threading;
#if !FEATURE_APM
using System.Threading.Tasks;
#endif
using System.Runtime.InteropServices;
#if FEATURE_PIPE_SECURITY
using System.Security.Principal;
Expand Down Expand Up @@ -608,27 +609,8 @@ internal class NodeContext

/// <summary>
/// A buffer typically big enough to handle a packet body.
/// We use this as a convenient way to manage and cache a byte[] that's resized
/// automatically to fit our payload.
/// </summary>
private MemoryStream _readBufferMemoryStream;

/// <summary>
/// A reusable buffer for writing packets.
/// </summary>
private MemoryStream _writeBufferMemoryStream;

/// <summary>
/// A queue used for enqueuing packets to write to the stream asynchronously.
/// </summary>
private BlockingCollection<INodePacket> _packetWriteQueue = new BlockingCollection<INodePacket>();

/// <summary>
/// A task representing the last packet write, so we can chain packet writes one after another.
/// We want to queue up writing packets on a separate thread asynchronously, but serially.
/// Each task drains the <see cref="_packetWriteQueue"/>
/// </summary>
private Task _packetWriteDrainTask = Task.CompletedTask;
private byte[] _smallReadBuffer;

/// <summary>
/// Event indicating the node has terminated.
Expand Down Expand Up @@ -658,9 +640,7 @@ internal class NodeContext
_serverToClientStream = nodePipe;
_packetFactory = factory;
_headerByte = new byte[5]; // 1 for the packet type, 4 for the body length

_readBufferMemoryStream = new MemoryStream();
_writeBufferMemoryStream = new MemoryStream();
_smallReadBuffer = new byte[1000]; // 1000 was just an average seen on one profile run.
_nodeTerminated = new ManualResetEvent(false);
_terminateDelegate = terminateDelegate;
_sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();
Expand Down Expand Up @@ -705,8 +685,16 @@ public async Task RunPacketReadLoopAsync()
NodePacketType packetType = (NodePacketType)_headerByte[0];
int packetLength = BitConverter.ToInt32(_headerByte, 1);

_readBufferMemoryStream.SetLength(packetLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();
byte[] packetData;
if (packetLength < _smallReadBuffer.Length)
{
packetData = _smallReadBuffer;
}
else
{
// Preallocated buffer is not large enough to hold the body. Allocate now, but don't hold it forever.
packetData = new byte[packetLength];
}

try
{
Expand Down Expand Up @@ -740,81 +728,54 @@ public async Task RunPacketReadLoopAsync()
#endif

/// <summary>
/// Sends the specified packet to this node asynchronously.
/// The method enqueues a task to write the packet and returns
/// immediately. This is because SendData() is on a hot path
/// under the primary lock (BuildManager's _syncLock)
/// and we want to minimize our time there.
/// Sends the specified packet to this node.
/// </summary>
/// <param name="packet">The packet to send.</param>
public void SendData(INodePacket packet)
{
_packetWriteQueue.Add(packet);
DrainPacketQueue();
}

/// <summary>
/// Schedule a task to drain the packet write queue. We could have had a
/// dedicated thread that would pump the queue constantly, but
/// we don't want to allocate a dedicated thread per node (1MB stack)
/// </summary>
/// <remarks>Usually there'll be a single packet in the queue, but sometimes
/// a burst of SendData comes in, with 10-20 packets scheduled. In this case
/// the first scheduled task will drain all of them, and subsequent tasks
/// will run on an empty queue. I tried to write logic that avoids queueing
/// a new task if the queue is already being drained, but it didn't show any
/// improvement and made things more complicated.</remarks>
private void DrainPacketQueue()
{
// this lock is only necessary to protect a write to _packetWriteDrainTask field
lock (_packetWriteQueue)
{
// average latency between the moment this runs and when the delegate starts
// running is about 100-200 microseconds (unless there's thread pool saturation)
_packetWriteDrainTask = _packetWriteDrainTask.ContinueWith(_ =>
{
while (_packetWriteQueue.TryTake(out var packet))
{
SendDataCore(packet);
}
}, TaskScheduler.Default);
}
}

/// <summary>
/// Actually writes and sends the packet. This can't be called in parallel
/// because it reuses the _writeBufferMemoryStream, and this is why we use
/// the _packetWriteDrainTask to serially chain invocations one after another.
/// </summary>
/// <param name="packet">The packet to send.</param>
private void SendDataCore(INodePacket packet)
{
MemoryStream writeStream = _writeBufferMemoryStream;

// clear the buffer but keep the underlying capacity to avoid reallocations
writeStream.SetLength(0);

MemoryStream writeStream = new MemoryStream();
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
try
{
writeStream.WriteByte((byte)packet.Type);

// Pad for the packet length
WriteInt32(writeStream, 0);
writeStream.Write(BitConverter.GetBytes((int)0), 0, 4);
packet.Translate(writeTranslator);

int writeStreamLength = (int)writeStream.Position;

// Now plug in the real packet length
writeStream.Position = 1;
WriteInt32(writeStream, writeStreamLength - 5);
writeStream.Write(BitConverter.GetBytes((int)writeStream.Length - 5), 0, 4);

byte[] writeStreamBuffer = writeStream.GetBuffer();

for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
for (int i = 0; i < writeStream.Length; i += MaxPacketWriteSize)
{
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
int lengthToWrite = Math.Min((int)writeStream.Length - i, MaxPacketWriteSize);
if ((int)writeStream.Length - i <= MaxPacketWriteSize)
{
// We are done, write the last bit asynchronously. This is actually the general case for
// most packets in the build, and the asynchronous behavior here is desirable.
#if FEATURE_APM
_serverToClientStream.BeginWrite(writeStreamBuffer, i, lengthToWrite, PacketWriteComplete, null);
#else
_serverToClientStream.WriteAsync(writeStreamBuffer, i, lengthToWrite);
#endif
return;
}
else
{
// If this packet is longer that we can write in one go, then we need to break it up. We can't
// return out of this function and let the rest of the system continue because another operation
// might want to send data immediately afterward, and that could result in overlapping writes
// to the pipe on different threads.
#if FEATURE_APM
IAsyncResult result = _serverToClientStream.BeginWrite(writeStream.GetBuffer(), i, lengthToWrite, null, null);
_serverToClientStream.EndWrite(result);
#else
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
#endif
}
}
}
catch (IOException e)
Expand All @@ -828,17 +789,6 @@ private void SendDataCore(INodePacket packet)
}
}

/// <summary>
/// Avoid having a BinaryWriter just to write a 4-byte int
/// </summary>
private void WriteInt32(MemoryStream stream, int value)
{
stream.WriteByte((byte)value);
stream.WriteByte((byte)(value >> 8));
stream.WriteByte((byte)(value >> 16));
stream.WriteByte((byte)(value >> 24));
}

/// <summary>
/// Closes the node's context, disconnecting it from the node.
/// </summary>
Expand Down Expand Up @@ -937,10 +887,16 @@ private void HeaderReadComplete(IAsyncResult result)
int packetLength = BitConverter.ToInt32(_headerByte, 1);
MSBuildEventSource.Log.PacketReadSize(packetLength);

// Ensures the buffer is at least this length.
// It avoids reallocations if the buffer is already large enough.
_readBufferMemoryStream.SetLength(packetLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();
byte[] packetData;
if (packetLength < _smallReadBuffer.Length)
{
packetData = _smallReadBuffer;
}
else
{
// Preallocated buffer is not large enough to hold the body. Allocate now, but don't hold it forever.
packetData = new byte[packetLength];
}

_clientToServerStream.BeginRead(packetData, 0, packetLength, BodyReadComplete, new Tuple<byte[], int>(packetData, packetLength));
}
Expand Down
25 changes: 4 additions & 21 deletions src/Shared/NodeEndpointOutOfProcBase.cs
Expand Up @@ -99,16 +99,6 @@ internal abstract class NodeEndpointOutOfProcBase : INodeEndpoint
/// </summary>
private SharedReadBuffer _sharedReadBuffer;

/// <summary>
/// A way to cache a byte array when writing out packets
/// </summary>
private MemoryStream _packetStream;

/// <summary>
/// A binary writer to help write into <see cref="_packetStream"/>
/// </summary>
private BinaryWriter _binaryWriter;

#endregion

#region INodeEndpoint Events
Expand Down Expand Up @@ -199,9 +189,6 @@ internal void InternalConstruct(string pipeName)
_asyncDataMonitor = new object();
_sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();

_packetStream = new MemoryStream();
_binaryWriter = new BinaryWriter(_packetStream);

#if FEATURE_PIPE_SECURITY && FEATURE_NAMED_PIPE_SECURITY_CONSTRUCTOR
if (!NativeMethodsShared.IsMono)
{
Expand Down Expand Up @@ -603,26 +590,22 @@ private void PacketPumpProc()
INodePacket packet;
while (localPacketQueue.TryDequeue(out packet))
{
var packetStream = _packetStream;
packetStream.SetLength(0);

MemoryStream packetStream = new MemoryStream();
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(packetStream);

packetStream.WriteByte((byte)packet.Type);

// Pad for packet length
_binaryWriter.Write(0);
packetStream.Write(BitConverter.GetBytes((int)0), 0, 4);

// Reset the position in the write buffer.
packet.Translate(writeTranslator);

int packetStreamLength = (int)packetStream.Position;

// Now write in the actual packet length
packetStream.Position = 1;
_binaryWriter.Write(packetStreamLength - 5);
packetStream.Write(BitConverter.GetBytes((int)packetStream.Length - 5), 0, 4);

localWritePipe.Write(packetStream.GetBuffer(), 0, packetStreamLength);
localWritePipe.Write(packetStream.GetBuffer(), 0, (int)packetStream.Length);
}
}
catch (Exception e)
Expand Down

0 comments on commit 6a68543

Please sign in to comment.