From b7dde7de4269b70701b57e07e90ddbedddb1185e Mon Sep 17 00:00:00 2001 From: Kirill Osenkov Date: Mon, 11 Jan 2021 13:26:36 -0800 Subject: [PATCH] Reduce byte array allocations when reading/writing packets Byte arrays are a major source of LOH allocations when streaming logging events across nodes. Allocating a large MemoryStream once and then growing it as needed almost completely removes allocations for byte arrays. This should significantly improve memory traffic during large builds. --- .../NodeProviderOutOfProcBase.cs | 72 ++++++++++--------- src/Shared/NodeEndpointOutOfProcBase.cs | 26 +++++-- 2 files changed, 61 insertions(+), 37 deletions(-) diff --git a/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs b/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs index e65f614e08e..46c2ae5ce6d 100644 --- a/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs +++ b/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs @@ -609,8 +609,20 @@ internal class NodeContext /// /// A buffer typically big enough to handle a packet body. + /// We use this as a convenient way to manage and cache a byte[] that's resized + /// automatically to fit our payload. /// - private byte[] _smallReadBuffer; + private MemoryStream _readBufferMemoryStream; + + /// + /// A buffer for writing packets. + /// + private MemoryStream _writeBufferMemoryStream; + + /// + /// A BinaryWriter to assist writing bytes to the buffer. + /// + private BinaryWriter _writeBufferStreamWriter; /// /// Event indicating the node has terminated. @@ -640,7 +652,11 @@ internal class NodeContext _serverToClientStream = nodePipe; _packetFactory = factory; _headerByte = new byte[5]; // 1 for the packet type, 4 for the body length - _smallReadBuffer = new byte[1000]; // 1000 was just an average seen on one profile run. + + // packets get this large so avoid reallocations + _readBufferMemoryStream = new MemoryStream(MaxPacketWriteSize); + _writeBufferMemoryStream = new MemoryStream(MaxPacketWriteSize); + _writeBufferStreamWriter = new BinaryWriter(_writeBufferMemoryStream); _nodeTerminated = new ManualResetEvent(false); _terminateDelegate = terminateDelegate; _sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer(); @@ -685,16 +701,8 @@ public async Task RunPacketReadLoopAsync() NodePacketType packetType = (NodePacketType)_headerByte[0]; int packetLength = BitConverter.ToInt32(_headerByte, 1); - byte[] packetData; - if (packetLength < _smallReadBuffer.Length) - { - packetData = _smallReadBuffer; - } - else - { - // Preallocated buffer is not large enough to hold the body. Allocate now, but don't hold it forever. - packetData = new byte[packetLength]; - } + _readBufferMemoryStream.SetLength(packetLength); + byte[] packetData = _readBufferMemoryStream.GetBuffer(); try { @@ -733,26 +741,30 @@ public async Task RunPacketReadLoopAsync() /// The packet to send. public void SendData(INodePacket packet) { - MemoryStream writeStream = new MemoryStream(); - ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream); + // clear the buffer but keep the underlying capacity to avoid reallocations + _writeBufferMemoryStream.SetLength(0); + + ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(_writeBufferMemoryStream); try { - writeStream.WriteByte((byte)packet.Type); + _writeBufferMemoryStream.WriteByte((byte)packet.Type); // Pad for the packet length - writeStream.Write(BitConverter.GetBytes((int)0), 0, 4); + _writeBufferStreamWriter.Write(0); packet.Translate(writeTranslator); + int writeStreamLength = (int)_writeBufferMemoryStream.Position; + // Now plug in the real packet length - writeStream.Position = 1; - writeStream.Write(BitConverter.GetBytes((int)writeStream.Length - 5), 0, 4); + _writeBufferMemoryStream.Position = 1; + _writeBufferStreamWriter.Write(writeStreamLength - 5); - byte[] writeStreamBuffer = writeStream.GetBuffer(); + byte[] writeStreamBuffer = _writeBufferMemoryStream.GetBuffer(); - for (int i = 0; i < writeStream.Length; i += MaxPacketWriteSize) + for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize) { - int lengthToWrite = Math.Min((int)writeStream.Length - i, MaxPacketWriteSize); - if ((int)writeStream.Length - i <= MaxPacketWriteSize) + int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize); + if (writeStreamLength - i <= MaxPacketWriteSize) { // We are done, write the last bit asynchronously. This is actually the general case for // most packets in the build, and the asynchronous behavior here is desirable. @@ -770,7 +782,7 @@ public void SendData(INodePacket packet) // might want to send data immediately afterward, and that could result in overlapping writes // to the pipe on different threads. #if FEATURE_APM - IAsyncResult result = _serverToClientStream.BeginWrite(writeStream.GetBuffer(), i, lengthToWrite, null, null); + IAsyncResult result = _serverToClientStream.BeginWrite(writeStreamBuffer, i, lengthToWrite, null, null); _serverToClientStream.EndWrite(result); #else _serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite); @@ -887,16 +899,10 @@ private void HeaderReadComplete(IAsyncResult result) int packetLength = BitConverter.ToInt32(_headerByte, 1); MSBuildEventSource.Log.PacketReadSize(packetLength); - byte[] packetData; - if (packetLength < _smallReadBuffer.Length) - { - packetData = _smallReadBuffer; - } - else - { - // Preallocated buffer is not large enough to hold the body. Allocate now, but don't hold it forever. - packetData = new byte[packetLength]; - } + // Ensures the buffer is at least this length. + // It avoids reallocations if the buffer is already large enough. + _readBufferMemoryStream.SetLength(packetLength); + byte[] packetData = _readBufferMemoryStream.GetBuffer(); _clientToServerStream.BeginRead(packetData, 0, packetLength, BodyReadComplete, new Tuple(packetData, packetLength)); } diff --git a/src/Shared/NodeEndpointOutOfProcBase.cs b/src/Shared/NodeEndpointOutOfProcBase.cs index 462615f5505..a4e7a2dfc85 100644 --- a/src/Shared/NodeEndpointOutOfProcBase.cs +++ b/src/Shared/NodeEndpointOutOfProcBase.cs @@ -99,6 +99,16 @@ internal abstract class NodeEndpointOutOfProcBase : INodeEndpoint /// private SharedReadBuffer _sharedReadBuffer; + /// + /// A way to cache a byte array when writing out packets + /// + private MemoryStream _packetStream; + + /// + /// A binary writer to help write into + /// + private BinaryWriter _binaryWriter; + #endregion #region INodeEndpoint Events @@ -189,6 +199,10 @@ internal void InternalConstruct(string pipeName) _asyncDataMonitor = new object(); _sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer(); + // packets get at least this large + _packetStream = new MemoryStream(1048576); + _binaryWriter = new BinaryWriter(_packetStream); + #if FEATURE_PIPE_SECURITY && FEATURE_NAMED_PIPE_SECURITY_CONSTRUCTOR if (!NativeMethodsShared.IsMono) { @@ -584,22 +598,26 @@ private void PacketPumpProc() INodePacket packet; while (localPacketQueue.TryDequeue(out packet)) { - MemoryStream packetStream = new MemoryStream(); + var packetStream = _packetStream; + packetStream.SetLength(0); + ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(packetStream); packetStream.WriteByte((byte)packet.Type); // Pad for packet length - packetStream.Write(BitConverter.GetBytes((int)0), 0, 4); + _binaryWriter.Write(0); // Reset the position in the write buffer. packet.Translate(writeTranslator); + int packetStreamLength = (int)packetStream.Position; + // Now write in the actual packet length packetStream.Position = 1; - packetStream.Write(BitConverter.GetBytes((int)packetStream.Length - 5), 0, 4); + _binaryWriter.Write(packetStreamLength - 5); - localWritePipe.Write(packetStream.GetBuffer(), 0, (int)packetStream.Length); + localWritePipe.Write(packetStream.GetBuffer(), 0, packetStreamLength); } } catch (Exception e)