Skip to content

Commit

Permalink
Make SendData write packets asynchronously.
Browse files Browse the repository at this point in the history
This avoids blocking the main loop. Roughly equivalent to writing the packet asynchronously using fire-and-forget (BeginWrite).
  • Loading branch information
KirillOsenkov committed Jan 25, 2021
1 parent 7f6b358 commit c89545c
Showing 1 changed file with 25 additions and 26 deletions.
Expand Up @@ -3,14 +3,13 @@

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Globalization;
using System.IO;
using System.IO.Pipes;
using System.Diagnostics;
using System.Threading;
#if !FEATURE_APM
using System.Threading.Tasks;
#endif
using System.Runtime.InteropServices;
#if FEATURE_PIPE_SECURITY
using System.Security.Principal;
Expand Down Expand Up @@ -741,6 +740,29 @@ private void WriteInt32(MemoryStream stream, int value)
/// </summary>
/// <param name="packet">The packet to send.</param>
public void SendData(INodePacket packet)
{
_packetQueue.Add(packet);
DrainPacketQueue();
}

private BlockingCollection<INodePacket> _packetQueue = new BlockingCollection<INodePacket>();
private Task _packetDrainTask = Task.CompletedTask;

private void DrainPacketQueue()
{
lock (_packetQueue)
{
_packetDrainTask = _packetDrainTask.ContinueWith(_ =>
{
while (_packetQueue.TryTake(out var packet))
{
SendDataCore(packet);
}
}, TaskScheduler.Default);
}
}

private void SendDataCore(INodePacket packet)
{
// clear the buffer but keep the underlying capacity to avoid reallocations
_writeBufferMemoryStream.SetLength(0);
Expand All @@ -765,30 +787,7 @@ public void SendData(INodePacket packet)
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
{
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
if (writeStreamLength - i <= MaxPacketWriteSize)
{
// We are done, write the last bit asynchronously. This is actually the general case for
// most packets in the build, and the asynchronous behavior here is desirable.
#if FEATURE_APM
_serverToClientStream.BeginWrite(writeStreamBuffer, i, lengthToWrite, PacketWriteComplete, null);
#else
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
#endif
return;
}
else
{
// If this packet is longer that we can write in one go, then we need to break it up. We can't
// return out of this function and let the rest of the system continue because another operation
// might want to send data immediately afterward, and that could result in overlapping writes
// to the pipe on different threads.
#if FEATURE_APM
IAsyncResult result = _serverToClientStream.BeginWrite(writeStreamBuffer, i, lengthToWrite, null, null);
_serverToClientStream.EndWrite(result);
#else
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
#endif
}
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
}
}
catch (IOException e)
Expand Down

0 comments on commit c89545c

Please sign in to comment.