Skip to content

Commit

Permalink
Write the last packet synchronously.
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillOsenkov committed Jan 17, 2021
1 parent 52fd474 commit 0760a0d
Showing 1 changed file with 42 additions and 45 deletions.
Expand Up @@ -741,69 +741,66 @@ public async Task RunPacketReadLoopAsync()
/// <param name="packet">The packet to send.</param>
public void SendData(INodePacket packet)
{
lock (_writeBufferMemoryStream)
{
var stream = this._writeBufferMemoryStream;
var writer = this._writeBufferStreamWriter;
var stream = this._writeBufferMemoryStream;
var writer = this._writeBufferStreamWriter;

// clear the buffer but keep the underlying capacity to avoid reallocations
stream.SetLength(0);
// clear the buffer but keep the underlying capacity to avoid reallocations
stream.SetLength(0);

ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(stream);
try
{
stream.WriteByte((byte)packet.Type);
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(stream);
try
{
stream.WriteByte((byte)packet.Type);

// Pad for the packet length
writer.Write(0);
packet.Translate(writeTranslator);
// Pad for the packet length
writer.Write(0);
packet.Translate(writeTranslator);

int writeStreamLength = (int)stream.Position;
int writeStreamLength = (int)stream.Position;

// Now plug in the real packet length
stream.Position = 1;
writer.Write(writeStreamLength - 5);
// Now plug in the real packet length
stream.Position = 1;
writer.Write(writeStreamLength - 5);

byte[] writeStreamBuffer = stream.GetBuffer();
byte[] writeStreamBuffer = stream.GetBuffer();

for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
{
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
if (writeStreamLength - i <= MaxPacketWriteSize)
{
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
if (writeStreamLength - i <= MaxPacketWriteSize)
{
// We are done, write the last bit asynchronously. This is actually the general case for
// most packets in the build, and the asynchronous behavior here is desirable.
// We are done, write the last bit asynchronously. This is actually the general case for
// most packets in the build, and the asynchronous behavior here is desirable.
#if FEATURE_APM
_serverToClientStream.BeginWrite(writeStreamBuffer, i, lengthToWrite, PacketWriteComplete, null);
#else
_serverToClientStream.WriteAsync(writeStreamBuffer, i, lengthToWrite);
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
#endif
return;
}
else
{
// If this packet is longer that we can write in one go, then we need to break it up. We can't
// return out of this function and let the rest of the system continue because another operation
// might want to send data immediately afterward, and that could result in overlapping writes
// to the pipe on different threads.
return;
}
else
{
// If this packet is longer that we can write in one go, then we need to break it up. We can't
// return out of this function and let the rest of the system continue because another operation
// might want to send data immediately afterward, and that could result in overlapping writes
// to the pipe on different threads.
#if FEATURE_APM
IAsyncResult result = _serverToClientStream.BeginWrite(writeStreamBuffer, i, lengthToWrite, null, null);
_serverToClientStream.EndWrite(result);
#else
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
#endif
}
}
}
catch (IOException e)
{
// Do nothing here because any exception will be caught by the async read handler
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in SendData: {0}", e);
}
catch (ObjectDisposedException) // This happens if a child dies unexpectedly
{
// Do nothing here because any exception will be caught by the async read handler
}
}
catch (IOException e)
{
// Do nothing here because any exception will be caught by the async read handler
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in SendData: {0}", e);
}
catch (ObjectDisposedException) // This happens if a child dies unexpectedly
{
// Do nothing here because any exception will be caught by the async read handler
}
}

Expand Down Expand Up @@ -926,7 +923,7 @@ private bool ProcessBodyBytesRead(int bytesRead, int packetLength, NodePacketTyp
return true;
}

private bool ReadAndRoutePacket(NodePacketType packetType, byte [] packetData, int packetLength)
private bool ReadAndRoutePacket(NodePacketType packetType, byte[] packetData, int packetLength)
{
try
{
Expand Down

0 comments on commit 0760a0d

Please sign in to comment.