Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes "InvalidOperationException" errors by performing async operations in SemaphoreSlim #796

Merged
merged 20 commits into from Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -442,6 +442,7 @@
<Compile Include="Microsoft\Data\SqlClient\SNI\SNIPhysicalHandle.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNIProxy.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNITcpHandle.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNIStreams.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SslOverTdsStream.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNICommon.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SspiClientContextStatus.cs" />
Expand Down
Expand Up @@ -93,7 +93,7 @@ public SNINpHandle(string serverName, string pipeName, long timerExpire, object
}

_sslOverTdsStream = new SslOverTdsStream(_pipeStream);
_sslStream = new SslStream(_sslOverTdsStream, true, new RemoteCertificateValidationCallback(ValidateServerCertificate), null);
_sslStream = new SNISslStream(_sslOverTdsStream, true, new RemoteCertificateValidationCallback(ValidateServerCertificate));

_stream = _pipeStream;
_status = TdsEnums.SNI_SUCCESS;
Expand Down Expand Up @@ -189,8 +189,6 @@ public override uint Receive(out SNIPacket packet, int timeout)
try
{
SNIPacket errorPacket;
lock (this)
{
packet = null;
try
{
Expand Down Expand Up @@ -222,7 +220,6 @@ public override uint Receive(out SNIPacket packet, int timeout)
}
return TdsEnums.SNI_SUCCESS;
}
}
finally
{
SqlClientEventSource.Log.TrySNIScopeLeaveEvent(scopeID);
Expand Down Expand Up @@ -271,40 +268,43 @@ public override uint Send(SNIPacket packet)
bool releaseLock = false;
try
{
// is the packet is marked out out-of-band (attention packets only) it must be
// sent immediately even if a send of recieve operation is already in progress
// because out of band packets are used to cancel ongoing operations
// so try to take the lock if possible but continue even if it can't be taken
if (packet.IsOutOfBand)
{
Monitor.TryEnter(this, ref releaseLock);
}
else
lock (this)
cheenamalhotra marked this conversation as resolved.
Show resolved Hide resolved
{
Monitor.Enter(this);
releaseLock = true;
}

// this lock ensures that two packets are not being written to the transport at the same time
// so that sending a standard and an out-of-band packet are both written atomically no data is
// interleaved
lock (_sendSync)
{
try
// is the packet is marked out out-of-band (attention packets only) it must be
// sent immediately even if a send of recieve operation is already in progress
// because out of band packets are used to cancel ongoing operations
// so try to take the lock if possible but continue even if it can't be taken
if (packet.IsOutOfBand)
{
packet.WriteToStream(_stream);
return TdsEnums.SNI_SUCCESS;
Monitor.TryEnter(this, ref releaseLock);
}
catch (ObjectDisposedException ode)
else
{
SqlClientEventSource.Log.TrySNITraceEvent("<sc.SNI.SNINpHandle.Send |SNI|ERR> ObjectDisposedException message = {0}.", ode.Message);
return ReportErrorAndReleasePacket(packet, ode);
Monitor.Enter(this);
releaseLock = true;
}
catch (IOException ioe)
{
SqlClientEventSource.Log.TrySNITraceEvent("<sc.SNI.SNINpHandle.Send |SNI|ERR> IOException message = {0}.", ioe.Message);

return ReportErrorAndReleasePacket(packet, ioe);
// this lock ensures that two packets are not being written to the transport at the same time
// so that sending a standard and an out-of-band packet are both written atomically no data is
// interleaved
lock (_sendSync)
{
try
{
packet.WriteToStream(_stream);
return TdsEnums.SNI_SUCCESS;
}
catch (ObjectDisposedException ode)
{
SqlClientEventSource.Log.TrySNITraceEvent("<sc.SNI.SNINpHandle.Send |SNI|ERR> ObjectDisposedException message = {0}.", ode.Message);
return ReportErrorAndReleasePacket(packet, ode);
}
catch (IOException ioe)
{
SqlClientEventSource.Log.TrySNITraceEvent("<sc.SNI.SNINpHandle.Send |SNI|ERR> IOException message = {0}.", ioe.Message);

return ReportErrorAndReleasePacket(packet, ioe);
}
}
}
}
Expand Down
@@ -0,0 +1,99 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Net.Security;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Sockets;

namespace Microsoft.Data.SqlClient.SNI
{
/// <summary>
/// This class extends SslStream to customize stream behavior for Managed SNI implementation.
/// </summary>
internal class SNISslStream : SslStream
{
private readonly ConcurrentQueueSemaphore _writeAsyncQueueSemaphore;
private readonly ConcurrentQueueSemaphore _readAsyncQueueSemaphore;

public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertificateValidationCallback userCertificateValidationCallback)
: base(innerStream, leaveInnerStreamOpen, userCertificateValidationCallback)
{
_writeAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1);
_readAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1);
}

// Prevent ReadAsync collisions by running the task in a Semaphore Slim
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _readAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false);
try
{
return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
cheenamalhotra marked this conversation as resolved.
Show resolved Hide resolved
}
finally
{
_readAsyncQueueSemaphore.Release();
}
}

// Prevent the WriteAsync collisions by running the task in a Semaphore Slim
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _writeAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false);
try
{
await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
cheenamalhotra marked this conversation as resolved.
Show resolved Hide resolved
}
finally
{
_writeAsyncQueueSemaphore.Release();
}
}
}

/// <summary>
/// This class extends NetworkStream to customize stream behavior for Managed SNI implementation.
/// </summary>
internal class SNINetworkStream : NetworkStream
{
private readonly ConcurrentQueueSemaphore _writeAsyncQueueSemaphore;
private readonly ConcurrentQueueSemaphore _readAsyncQueueSemaphore;

public SNINetworkStream(Socket socket, bool ownsSocket) : base(socket, ownsSocket)
{
_writeAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1);
_readAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1);
}

// Prevent the ReadAsync collisions by running the task in a Semaphore Slim
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _readAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false);
try
{
return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
finally
{
_readAsyncQueueSemaphore.Release();
}
}

// Prevent the WriteAsync collisions by running the task in a Semaphore Slim
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _writeAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false);
try
{
await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
finally
{
_writeAsyncQueueSemaphore.Release();
}
}
}
}