From 6a685433be3a96eccd97552744ec1e408edf6ca6 Mon Sep 17 00:00:00 2001
From: Ben Villalobos <4691428+BenVillalobos@users.noreply.github.com>
Date: Thu, 18 Feb 2021 16:20:46 -0800
Subject: [PATCH] Revert "Reduce byte array allocations when reading/writing
packets (#6023)"
This reverts commit de3b8871781e9d8e562f461f1f5916e3097f33ef.
---
.../NodeProviderOutOfProcBase.cs | 152 +++++++-----------
src/Shared/NodeEndpointOutOfProcBase.cs | 25 +--
2 files changed, 58 insertions(+), 119 deletions(-)
diff --git a/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs b/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs
index 501d9ddbbc8..e65f614e08e 100644
--- a/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs
+++ b/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs
@@ -3,13 +3,14 @@
using System;
using System.Collections.Generic;
-using System.Collections.Concurrent;
using System.Globalization;
using System.IO;
using System.IO.Pipes;
using System.Diagnostics;
using System.Threading;
+#if !FEATURE_APM
using System.Threading.Tasks;
+#endif
using System.Runtime.InteropServices;
#if FEATURE_PIPE_SECURITY
using System.Security.Principal;
@@ -608,27 +609,8 @@ internal class NodeContext
///
/// A buffer typically big enough to handle a packet body.
- /// We use this as a convenient way to manage and cache a byte[] that's resized
- /// automatically to fit our payload.
- ///
- private MemoryStream _readBufferMemoryStream;
-
- ///
- /// A reusable buffer for writing packets.
- ///
- private MemoryStream _writeBufferMemoryStream;
-
- ///
- /// A queue used for enqueuing packets to write to the stream asynchronously.
///
- private BlockingCollection _packetWriteQueue = new BlockingCollection();
-
- ///
- /// A task representing the last packet write, so we can chain packet writes one after another.
- /// We want to queue up writing packets on a separate thread asynchronously, but serially.
- /// Each task drains the
- ///
- private Task _packetWriteDrainTask = Task.CompletedTask;
+ private byte[] _smallReadBuffer;
///
/// Event indicating the node has terminated.
@@ -658,9 +640,7 @@ internal class NodeContext
_serverToClientStream = nodePipe;
_packetFactory = factory;
_headerByte = new byte[5]; // 1 for the packet type, 4 for the body length
-
- _readBufferMemoryStream = new MemoryStream();
- _writeBufferMemoryStream = new MemoryStream();
+ _smallReadBuffer = new byte[1000]; // 1000 was just an average seen on one profile run.
_nodeTerminated = new ManualResetEvent(false);
_terminateDelegate = terminateDelegate;
_sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();
@@ -705,8 +685,16 @@ public async Task RunPacketReadLoopAsync()
NodePacketType packetType = (NodePacketType)_headerByte[0];
int packetLength = BitConverter.ToInt32(_headerByte, 1);
- _readBufferMemoryStream.SetLength(packetLength);
- byte[] packetData = _readBufferMemoryStream.GetBuffer();
+ byte[] packetData;
+ if (packetLength < _smallReadBuffer.Length)
+ {
+ packetData = _smallReadBuffer;
+ }
+ else
+ {
+ // Preallocated buffer is not large enough to hold the body. Allocate now, but don't hold it forever.
+ packetData = new byte[packetLength];
+ }
try
{
@@ -740,81 +728,54 @@ public async Task RunPacketReadLoopAsync()
#endif
///
- /// Sends the specified packet to this node asynchronously.
- /// The method enqueues a task to write the packet and returns
- /// immediately. This is because SendData() is on a hot path
- /// under the primary lock (BuildManager's _syncLock)
- /// and we want to minimize our time there.
+ /// Sends the specified packet to this node.
///
/// The packet to send.
public void SendData(INodePacket packet)
{
- _packetWriteQueue.Add(packet);
- DrainPacketQueue();
- }
-
- ///
- /// Schedule a task to drain the packet write queue. We could have had a
- /// dedicated thread that would pump the queue constantly, but
- /// we don't want to allocate a dedicated thread per node (1MB stack)
- ///
- /// Usually there'll be a single packet in the queue, but sometimes
- /// a burst of SendData comes in, with 10-20 packets scheduled. In this case
- /// the first scheduled task will drain all of them, and subsequent tasks
- /// will run on an empty queue. I tried to write logic that avoids queueing
- /// a new task if the queue is already being drained, but it didn't show any
- /// improvement and made things more complicated.
- private void DrainPacketQueue()
- {
- // this lock is only necessary to protect a write to _packetWriteDrainTask field
- lock (_packetWriteQueue)
- {
- // average latency between the moment this runs and when the delegate starts
- // running is about 100-200 microseconds (unless there's thread pool saturation)
- _packetWriteDrainTask = _packetWriteDrainTask.ContinueWith(_ =>
- {
- while (_packetWriteQueue.TryTake(out var packet))
- {
- SendDataCore(packet);
- }
- }, TaskScheduler.Default);
- }
- }
-
- ///
- /// Actually writes and sends the packet. This can't be called in parallel
- /// because it reuses the _writeBufferMemoryStream, and this is why we use
- /// the _packetWriteDrainTask to serially chain invocations one after another.
- ///
- /// The packet to send.
- private void SendDataCore(INodePacket packet)
- {
- MemoryStream writeStream = _writeBufferMemoryStream;
-
- // clear the buffer but keep the underlying capacity to avoid reallocations
- writeStream.SetLength(0);
-
+ MemoryStream writeStream = new MemoryStream();
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
try
{
writeStream.WriteByte((byte)packet.Type);
// Pad for the packet length
- WriteInt32(writeStream, 0);
+ writeStream.Write(BitConverter.GetBytes((int)0), 0, 4);
packet.Translate(writeTranslator);
- int writeStreamLength = (int)writeStream.Position;
-
// Now plug in the real packet length
writeStream.Position = 1;
- WriteInt32(writeStream, writeStreamLength - 5);
+ writeStream.Write(BitConverter.GetBytes((int)writeStream.Length - 5), 0, 4);
byte[] writeStreamBuffer = writeStream.GetBuffer();
- for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
+ for (int i = 0; i < writeStream.Length; i += MaxPacketWriteSize)
{
- int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
- _serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
+ int lengthToWrite = Math.Min((int)writeStream.Length - i, MaxPacketWriteSize);
+ if ((int)writeStream.Length - i <= MaxPacketWriteSize)
+ {
+ // We are done, write the last bit asynchronously. This is actually the general case for
+ // most packets in the build, and the asynchronous behavior here is desirable.
+#if FEATURE_APM
+ _serverToClientStream.BeginWrite(writeStreamBuffer, i, lengthToWrite, PacketWriteComplete, null);
+#else
+ _serverToClientStream.WriteAsync(writeStreamBuffer, i, lengthToWrite);
+#endif
+ return;
+ }
+ else
+ {
+ // If this packet is longer that we can write in one go, then we need to break it up. We can't
+ // return out of this function and let the rest of the system continue because another operation
+ // might want to send data immediately afterward, and that could result in overlapping writes
+ // to the pipe on different threads.
+#if FEATURE_APM
+ IAsyncResult result = _serverToClientStream.BeginWrite(writeStream.GetBuffer(), i, lengthToWrite, null, null);
+ _serverToClientStream.EndWrite(result);
+#else
+ _serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
+#endif
+ }
}
}
catch (IOException e)
@@ -828,17 +789,6 @@ private void SendDataCore(INodePacket packet)
}
}
- ///
- /// Avoid having a BinaryWriter just to write a 4-byte int
- ///
- private void WriteInt32(MemoryStream stream, int value)
- {
- stream.WriteByte((byte)value);
- stream.WriteByte((byte)(value >> 8));
- stream.WriteByte((byte)(value >> 16));
- stream.WriteByte((byte)(value >> 24));
- }
-
///
/// Closes the node's context, disconnecting it from the node.
///
@@ -937,10 +887,16 @@ private void HeaderReadComplete(IAsyncResult result)
int packetLength = BitConverter.ToInt32(_headerByte, 1);
MSBuildEventSource.Log.PacketReadSize(packetLength);
- // Ensures the buffer is at least this length.
- // It avoids reallocations if the buffer is already large enough.
- _readBufferMemoryStream.SetLength(packetLength);
- byte[] packetData = _readBufferMemoryStream.GetBuffer();
+ byte[] packetData;
+ if (packetLength < _smallReadBuffer.Length)
+ {
+ packetData = _smallReadBuffer;
+ }
+ else
+ {
+ // Preallocated buffer is not large enough to hold the body. Allocate now, but don't hold it forever.
+ packetData = new byte[packetLength];
+ }
_clientToServerStream.BeginRead(packetData, 0, packetLength, BodyReadComplete, new Tuple(packetData, packetLength));
}
diff --git a/src/Shared/NodeEndpointOutOfProcBase.cs b/src/Shared/NodeEndpointOutOfProcBase.cs
index c58bc449a1c..7bb77dda88f 100644
--- a/src/Shared/NodeEndpointOutOfProcBase.cs
+++ b/src/Shared/NodeEndpointOutOfProcBase.cs
@@ -99,16 +99,6 @@ internal abstract class NodeEndpointOutOfProcBase : INodeEndpoint
///
private SharedReadBuffer _sharedReadBuffer;
- ///
- /// A way to cache a byte array when writing out packets
- ///
- private MemoryStream _packetStream;
-
- ///
- /// A binary writer to help write into
- ///
- private BinaryWriter _binaryWriter;
-
#endregion
#region INodeEndpoint Events
@@ -199,9 +189,6 @@ internal void InternalConstruct(string pipeName)
_asyncDataMonitor = new object();
_sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer();
- _packetStream = new MemoryStream();
- _binaryWriter = new BinaryWriter(_packetStream);
-
#if FEATURE_PIPE_SECURITY && FEATURE_NAMED_PIPE_SECURITY_CONSTRUCTOR
if (!NativeMethodsShared.IsMono)
{
@@ -603,26 +590,22 @@ private void PacketPumpProc()
INodePacket packet;
while (localPacketQueue.TryDequeue(out packet))
{
- var packetStream = _packetStream;
- packetStream.SetLength(0);
-
+ MemoryStream packetStream = new MemoryStream();
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(packetStream);
packetStream.WriteByte((byte)packet.Type);
// Pad for packet length
- _binaryWriter.Write(0);
+ packetStream.Write(BitConverter.GetBytes((int)0), 0, 4);
// Reset the position in the write buffer.
packet.Translate(writeTranslator);
- int packetStreamLength = (int)packetStream.Position;
-
// Now write in the actual packet length
packetStream.Position = 1;
- _binaryWriter.Write(packetStreamLength - 5);
+ packetStream.Write(BitConverter.GetBytes((int)packetStream.Length - 5), 0, 4);
- localWritePipe.Write(packetStream.GetBuffer(), 0, packetStreamLength);
+ localWritePipe.Write(packetStream.GetBuffer(), 0, (int)packetStream.Length);
}
}
catch (Exception e)