Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes "InvalidOperationException" errors by performing async operations in SemaphoreSlim #796

Merged
merged 20 commits into from Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -442,6 +442,7 @@
<Compile Include="Microsoft\Data\SqlClient\SNI\SNIPhysicalHandle.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNIProxy.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNITcpHandle.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNIStreams.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SslOverTdsStream.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNICommon.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SspiClientContextStatus.cs" />
Expand Down
Expand Up @@ -93,7 +93,7 @@ public SNINpHandle(string serverName, string pipeName, long timerExpire, object
}

_sslOverTdsStream = new SslOverTdsStream(_pipeStream);
_sslStream = new SslStream(_sslOverTdsStream, true, new RemoteCertificateValidationCallback(ValidateServerCertificate), null);
_sslStream = new SNISslStream(_sslOverTdsStream, true, new RemoteCertificateValidationCallback(ValidateServerCertificate));

_stream = _pipeStream;
_status = TdsEnums.SNI_SUCCESS;
Expand Down Expand Up @@ -189,39 +189,36 @@ public override uint Receive(out SNIPacket packet, int timeout)
try
{
SNIPacket errorPacket;
lock (this)
packet = null;
try
{
packet = null;
try
{
packet = RentPacket(headerSize: 0, dataSize: _bufferSize);
packet.ReadFromStream(_stream);
packet = RentPacket(headerSize: 0, dataSize: _bufferSize);
packet.ReadFromStream(_stream);

if (packet.Length == 0)
{
errorPacket = packet;
packet = null;
var e = new Win32Exception();
SqlClientEventSource.Log.TrySNITraceEvent("<sc.SNI.SNINpHandle.Receive |SNI|ERR> packet length is 0.");
return ReportErrorAndReleasePacket(errorPacket, (uint)e.NativeErrorCode, 0, e.Message);
}
}
catch (ObjectDisposedException ode)
if (packet.Length == 0)
{
errorPacket = packet;
packet = null;
SqlClientEventSource.Log.TrySNITraceEvent("<sc.SNI.SNINpHandle.Receive |SNI|ERR> ObjectDisposedException message = {0}.", ode.Message);
return ReportErrorAndReleasePacket(errorPacket, ode);
var e = new Win32Exception();
SqlClientEventSource.Log.TrySNITraceEvent("<sc.SNI.SNINpHandle.Receive |SNI|ERR> packet length is 0.");
return ReportErrorAndReleasePacket(errorPacket, (uint)e.NativeErrorCode, 0, e.Message);
}
catch (IOException ioe)
{
errorPacket = packet;
packet = null;
SqlClientEventSource.Log.TrySNITraceEvent("<sc.SNI.SNINpHandle.Receive |SNI|ERR> IOException message = {0}.", ioe.Message);
return ReportErrorAndReleasePacket(errorPacket, ioe);
}
return TdsEnums.SNI_SUCCESS;
}
catch (ObjectDisposedException ode)
{
errorPacket = packet;
packet = null;
SqlClientEventSource.Log.TrySNITraceEvent("<sc.SNI.SNINpHandle.Receive |SNI|ERR> ObjectDisposedException message = {0}.", ode.Message);
return ReportErrorAndReleasePacket(errorPacket, ode);
}
catch (IOException ioe)
{
errorPacket = packet;
packet = null;
SqlClientEventSource.Log.TrySNITraceEvent("<sc.SNI.SNINpHandle.Receive |SNI|ERR> IOException message = {0}.", ioe.Message);
return ReportErrorAndReleasePacket(errorPacket, ioe);
}
return TdsEnums.SNI_SUCCESS;
}
finally
{
Expand Down Expand Up @@ -286,7 +283,7 @@ public override uint Send(SNIPacket packet)
}

// this lock ensures that two packets are not being written to the transport at the same time
// so that sending a standard and an out-of-band packet are both written atomically no data is
// so that sending a standard and an out-of-band packet are both written atomically no data is
// interleaved
lock (_sendSync)
{
Expand Down
@@ -0,0 +1,99 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Net.Security;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Sockets;

namespace Microsoft.Data.SqlClient.SNI
{
/// <summary>
/// This class extends SslStream to customize stream behavior for Managed SNI implementation.
/// </summary>
internal class SNISslStream : SslStream
{
private readonly ConcurrentQueueSemaphore _writeAsyncQueueSemaphore;
private readonly ConcurrentQueueSemaphore _readAsyncQueueSemaphore;

public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertificateValidationCallback userCertificateValidationCallback)
: base(innerStream, leaveInnerStreamOpen, userCertificateValidationCallback)
{
_writeAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1);
_readAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1);
}

// Prevent the ReadAsync's collision by running task in Semaphore Slim
cheenamalhotra marked this conversation as resolved.
Show resolved Hide resolved
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _readAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false);
try
{
return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
cheenamalhotra marked this conversation as resolved.
Show resolved Hide resolved
}
finally
{
_readAsyncQueueSemaphore.Release();
}
}

// Prevent the WriteAsync's collision by running task in Semaphore Slim
cheenamalhotra marked this conversation as resolved.
Show resolved Hide resolved
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _writeAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false);
try
{
await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
finally
{
_writeAsyncQueueSemaphore.Release();
}
}
}

/// <summary>
/// This class extends NetworkStream to customize stream behavior for Managed SNI implementation.
/// </summary>
internal class SNINetworkStream : NetworkStream
{
private readonly ConcurrentQueueSemaphore _writeAsyncQueueSemaphore;
private readonly ConcurrentQueueSemaphore _readAsyncQueueSemaphore;

public SNINetworkStream(Socket socket, bool ownsSocket) : base(socket, ownsSocket)
{
_writeAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1);
_readAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1);
}

// Prevent the ReadAsync's collision by running task in Semaphore Slim
cheenamalhotra marked this conversation as resolved.
Show resolved Hide resolved
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _readAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false);
try
{
return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
finally
{
_readAsyncQueueSemaphore.Release();
}
}

// Prevent the WriteAsync's collision by running task in Semaphore Slim
cheenamalhotra marked this conversation as resolved.
Show resolved Hide resolved
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _writeAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false);
try
{
await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
finally
{
_writeAsyncQueueSemaphore.Release();
}
}
}
}