From 0f27c266a4c6e86194f84018e49a9efb5b1d3cbd Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Fri, 13 Nov 2020 03:02:45 -0800 Subject: [PATCH 01/18] Fixes "InvalidOperationException" errors by performing async write operations in semaphore queue. --- .../src/Microsoft.Data.SqlClient.csproj | 1 + .../Data/SqlClient/SNI/SNINpHandle.cs | 2 +- .../Data/SqlClient/SNI/SNISslStream.cs | 40 +++++++++++++++++ .../Data/SqlClient/SNI/SNITcpHandle.cs | 27 ++++++----- .../SNI/SslOverTdsStream.NetCoreApp.cs | 17 ++----- .../src/Microsoft/Data/SqlClient/SqlUtil.cs | 45 +++++++++++++++++++ .../Data/SqlClient/TdsParserStateObject.cs | 2 +- .../SqlClient/TdsParserStateObjectManaged.cs | 4 +- 8 files changed, 110 insertions(+), 28 deletions(-) create mode 100644 src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj index c8ccbad2b0..e6a5129f93 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj @@ -739,6 +739,7 @@ + True True diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs index 692aa9b7fe..d91d1a08d3 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs @@ -93,7 +93,7 @@ public SNINpHandle(string serverName, string pipeName, long timerExpire, object } _sslOverTdsStream = new SslOverTdsStream(_pipeStream); - _sslStream = new SslStream(_sslOverTdsStream, true, new RemoteCertificateValidationCallback(ValidateServerCertificate), null); + _sslStream = new SNISslStream(_sslOverTdsStream, true, new RemoteCertificateValidationCallback(ValidateServerCertificate)); _stream = _pipeStream; _status = TdsEnums.SNI_SUCCESS; diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs new file mode 100644 index 0000000000..8f408bc1c1 --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs @@ -0,0 +1,40 @@ +using System.Net.Security; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Data.SqlClient.SNI +{ + /// + /// This class extends SslStream to customize stream behavior for Managed SNI implementation. + /// + internal class SNISslStream : SslStream + { + private readonly ConcurrentQueueSemaphore _writeQueueSemaphore; + private readonly ConcurrentQueueSemaphore _readQueueSemaphore; + + public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertificateValidationCallback userCertificateValidationCallback) + : base(innerStream, leaveInnerStreamOpen, userCertificateValidationCallback) + { + _writeQueueSemaphore = new ConcurrentQueueSemaphore(1); + _readQueueSemaphore = new ConcurrentQueueSemaphore(1); + } + + // Prevent the ReadAsync's collision by running task in Semaphore Slim + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + _readQueueSemaphore.Wait(); + Task t = base.ReadAsync(buffer, offset, count, cancellationToken); + _readQueueSemaphore.Release(); + return t; + } + + // Prevent the WriteAsync's collision by running task in Semaphore Slim + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + _writeQueueSemaphore.Wait(); + return base.WriteAsync(buffer, offset, count, cancellationToken) + .ContinueWith(t => _writeQueueSemaphore.Release()); + } + } +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs index b072a4fa01..b3393e3752 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs @@ -160,14 +160,14 @@ public SNITCPHandle(string serverName, int port, long timerExpire, object callba { // Retry with cached IP address if (ex is SocketException || ex is ArgumentException || ex is AggregateException) - { + { if (hasCachedDNSInfo == false) { throw; } else { - int portRetry = String.IsNullOrEmpty(cachedDNSInfo.Port) ? port : Int32.Parse(cachedDNSInfo.Port); + int portRetry = String.IsNullOrEmpty(cachedDNSInfo.Port) ? port : Int32.Parse(cachedDNSInfo.Port); try { @@ -180,9 +180,9 @@ public SNITCPHandle(string serverName, int port, long timerExpire, object callba _socket = Connect(cachedDNSInfo.AddrIPv4, portRetry, ts, isInfiniteTimeOut, cachedFQDN, ref pendingDNSInfo); } } - catch(Exception exRetry) + catch (Exception exRetry) { - if (exRetry is SocketException || exRetry is ArgumentNullException + if (exRetry is SocketException || exRetry is ArgumentNullException || exRetry is ArgumentException || exRetry is ArgumentOutOfRangeException || exRetry is AggregateException) { if (parallel) @@ -199,7 +199,7 @@ public SNITCPHandle(string serverName, int port, long timerExpire, object callba throw; } } - } + } } else { @@ -226,7 +226,7 @@ public SNITCPHandle(string serverName, int port, long timerExpire, object callba _tcpStream = new NetworkStream(_socket, true); _sslOverTdsStream = new SslOverTdsStream(_tcpStream); - _sslStream = new SslStream(_sslOverTdsStream, true, new RemoteCertificateValidationCallback(ValidateServerCertificate), null); + _sslStream = new SNISslStream(_sslOverTdsStream, true, new RemoteCertificateValidationCallback(ValidateServerCertificate)); } catch (SocketException se) { @@ -331,7 +331,7 @@ private static Socket Connect(string serverName, int port, TimeSpan timeout, boo } CancellationTokenSource cts = null; - + void Cancel() { for (int i = 0; i < sockets.Length; ++i) @@ -355,7 +355,7 @@ void Cancel() } Socket availableSocket = null; - try + try { for (int i = 0; i < sockets.Length; ++i) { @@ -706,12 +706,17 @@ public override void SetAsyncCallbacks(SNIAsyncCallback receiveCallback, SNIAsyn /// SNI error code public override uint SendAsync(SNIPacket packet, SNIAsyncCallback callback = null) { - SNIAsyncCallback cb = callback ?? _sendCallback; - lock (this) + long scopeID = SqlClientEventSource.Log.TrySNIScopeEnterEvent(""); + try { + SNIAsyncCallback cb = callback ?? _sendCallback; packet.WriteToStreamAsync(_stream, cb, SNIProviders.TCP_PROV); + return TdsEnums.SNI_SUCCESS_IO_PENDING; + } + finally + { + SqlClientEventSource.Log.TrySNIScopeLeaveEvent(scopeID); } - return TdsEnums.SNI_SUCCESS_IO_PENDING; } /// diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SslOverTdsStream.NetCoreApp.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SslOverTdsStream.NetCoreApp.cs index cb634cb6af..97a1181ae3 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SslOverTdsStream.NetCoreApp.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SslOverTdsStream.NetCoreApp.cs @@ -12,24 +12,16 @@ namespace Microsoft.Data.SqlClient.SNI internal sealed partial class SslOverTdsStream { public override int Read(byte[] buffer, int offset, int count) - { - return Read(buffer.AsSpan(offset, count)); - } + => Read(buffer.AsSpan(offset, count)); public override void Write(byte[] buffer, int offset, int count) - { - Write(buffer.AsSpan(offset, count)); - } + => Write(buffer.AsSpan(offset, count)); public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - return ReadAsync(new Memory(buffer, offset, count), cancellationToken).AsTask(); - } + => ReadAsync(new Memory(buffer, offset, count), cancellationToken).AsTask(); public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - return WriteAsync(new ReadOnlyMemory(buffer, offset, count), cancellationToken).AsTask(); - } + => WriteAsync(new ReadOnlyMemory(buffer, offset, count), cancellationToken).AsTask(); public override int Read(Span buffer) { @@ -288,7 +280,6 @@ public override async ValueTask WriteAsync(ReadOnlyMemory buffer, Cancella await _stream.FlushAsync().ConfigureAwait(false); - remaining = remaining.Slice(dataLength); } } diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs index 5ddd978093..a6b9688afa 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Data; using System.Diagnostics; @@ -2131,4 +2132,48 @@ public static MethodInfo GetPromotedToken } } } + + /// + /// This class implements a FIFO Queue with SemaphoreSlim for ordered execution of parallel tasks. + /// Currently used in Managed SNI (SNISslStream) to override SslStream's WriteAsync implementation. + /// + internal class ConcurrentQueueSemaphore + { + private readonly SemaphoreSlim _semaphore; + private readonly ConcurrentQueue> _queue = + new ConcurrentQueue>(); + + public ConcurrentQueueSemaphore(int initialCount) + { + _semaphore = new SemaphoreSlim(initialCount); + } + + public ConcurrentQueueSemaphore(int initialCount, int maxCount) + { + _semaphore = new SemaphoreSlim(initialCount, maxCount); + } + + public void Wait() + { + WaitAsync().Wait(); + } + + public Task WaitAsync() + { + var tcs = new TaskCompletionSource(); + _queue.Enqueue(tcs); + _semaphore.WaitAsync().ContinueWith(t => + { + if (_queue.TryDequeue(out TaskCompletionSource popped)) + popped.SetResult(true); + }); + return tcs.Task; + } + + public void Release() + { + _semaphore.Release(); + } + } + }//namespace diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs index 7e9f5a6d67..5d36213755 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObject.cs @@ -3266,7 +3266,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false) if (willCancel) { - // If we have been cancelled, then ensure that we write the ATTN packet as well + // If we have been canceled, then ensure that we write the ATTN packet as well task = AsyncHelper.CreateContinuationTask(task, CancelWritePacket); } return task; diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObjectManaged.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObjectManaged.cs index ea802107ff..9e9e281a32 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObjectManaged.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParserStateObjectManaged.cs @@ -72,13 +72,13 @@ internal override void AssignPendingDNSInfo(string userProtocol, string DNSCache internal void ReadAsyncCallback(SNIPacket packet, uint error) { ReadAsyncCallback(IntPtr.Zero, PacketHandle.FromManagedPacket(packet), error); - _sessionHandle.ReturnPacket(packet); + _sessionHandle?.ReturnPacket(packet); } internal void WriteAsyncCallback(SNIPacket packet, uint sniError) { WriteAsyncCallback(IntPtr.Zero, PacketHandle.FromManagedPacket(packet), sniError); - _sessionHandle.ReturnPacket(packet); + _sessionHandle?.ReturnPacket(packet); } protected override void RemovePacketFromPendingList(PacketHandle packet) From aca1cc6f5c61df9c680b262715ed3ce65406f3c2 Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Fri, 13 Nov 2020 11:31:01 -0800 Subject: [PATCH 02/18] Add license header --- .../src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs index 8f408bc1c1..19d49fb28c 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs @@ -1,4 +1,8 @@ -using System.Net.Security; +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Net.Security; using System.IO; using System.Threading; using System.Threading.Tasks; From 65ec66907d0267a7efec25fca8b928dc6682bbbe Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Fri, 13 Nov 2020 11:37:26 -0800 Subject: [PATCH 03/18] Fix default include --- .../netcore/src/Microsoft.Data.SqlClient.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj index e6a5129f93..70d506bbf3 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj @@ -442,6 +442,7 @@ + @@ -739,7 +740,6 @@ - True True From 9ea12071bde0845ae6e3dcc0288da37da8529a99 Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Fri, 13 Nov 2020 14:14:46 -0800 Subject: [PATCH 04/18] Suppress issue 422 from pipelines, till it gets fixed. --- .../SQL/AsyncTest/AsyncCancelledConnectionsTest.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs index dfcd78bc15..f6c2c02896 100644 --- a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs +++ b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs @@ -12,7 +12,11 @@ namespace Microsoft.Data.SqlClient.ManualTesting.Tests public class AsyncCancelledConnectionsTest { private readonly ITestOutputHelper _output; - private const int NumberOfTasks = 100; // How many attempts to poison the connection pool we will try + + // TODO: Set back the count to 100 tasks after fixing driver issue #422. + // Currently the count is reduced to 25 as we encounter error 258 for "Encrypted" connections. + private const int NumberOfTasks = 25; // How many attempts to poison the connection pool we will try + private const int NumberOfNonPoisoned = 10; // Number of normal requests for each attempt public AsyncCancelledConnectionsTest(ITestOutputHelper output) From 7cc72edb35fd7b99edceadeca5e60deac632a4ae Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Sat, 14 Nov 2020 17:15:42 -0800 Subject: [PATCH 05/18] Fix Async + Mars perf issue --- .../Data/SqlClient/SNI/SNINpHandle.cs | 51 ++++----- .../Data/SqlClient/SNI/SNITcpHandle.cs | 104 +++++++++--------- .../AsyncCancelledConnectionsTest.cs | 4 +- 3 files changed, 77 insertions(+), 82 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs index d91d1a08d3..9280bbfc54 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs @@ -189,39 +189,36 @@ public override uint Receive(out SNIPacket packet, int timeout) try { SNIPacket errorPacket; - lock (this) + packet = null; + try { - packet = null; - try - { - packet = RentPacket(headerSize: 0, dataSize: _bufferSize); - packet.ReadFromStream(_stream); + packet = RentPacket(headerSize: 0, dataSize: _bufferSize); + packet.ReadFromStream(_stream); - if (packet.Length == 0) - { - errorPacket = packet; - packet = null; - var e = new Win32Exception(); - SqlClientEventSource.Log.TrySNITraceEvent(" packet length is 0."); - return ReportErrorAndReleasePacket(errorPacket, (uint)e.NativeErrorCode, 0, e.Message); - } - } - catch (ObjectDisposedException ode) + if (packet.Length == 0) { errorPacket = packet; packet = null; - SqlClientEventSource.Log.TrySNITraceEvent(" ObjectDisposedException message = {0}.", ode.Message); - return ReportErrorAndReleasePacket(errorPacket, ode); + var e = new Win32Exception(); + SqlClientEventSource.Log.TrySNITraceEvent(" packet length is 0."); + return ReportErrorAndReleasePacket(errorPacket, (uint)e.NativeErrorCode, 0, e.Message); } - catch (IOException ioe) - { - errorPacket = packet; - packet = null; - SqlClientEventSource.Log.TrySNITraceEvent(" IOException message = {0}.", ioe.Message); - return ReportErrorAndReleasePacket(errorPacket, ioe); - } - return TdsEnums.SNI_SUCCESS; } + catch (ObjectDisposedException ode) + { + errorPacket = packet; + packet = null; + SqlClientEventSource.Log.TrySNITraceEvent(" ObjectDisposedException message = {0}.", ode.Message); + return ReportErrorAndReleasePacket(errorPacket, ode); + } + catch (IOException ioe) + { + errorPacket = packet; + packet = null; + SqlClientEventSource.Log.TrySNITraceEvent(" IOException message = {0}.", ioe.Message); + return ReportErrorAndReleasePacket(errorPacket, ioe); + } + return TdsEnums.SNI_SUCCESS; } finally { @@ -286,7 +283,7 @@ public override uint Send(SNIPacket packet) } // this lock ensures that two packets are not being written to the transport at the same time - // so that sending a standard and an out-of-band packet are both written atomically no data is + // so that sending a standard and an out-of-band packet are both written atomically no data is // interleaved lock (_sendSync) { diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs index b3393e3752..f919d3cf52 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs @@ -143,7 +143,7 @@ public SNITCPHandle(string serverName, int port, long timerExpire, object callba bool reportError = true; // We will always first try to connect with serverName as before and let the DNS server to resolve the serverName. - // If the DSN resolution fails, we will try with IPs in the DNS cache if existed. We try with IPv4 first and followed by IPv6 if + // If the DSN resolution fails, we will try with IPs in the DNS cache if existed. We try with IPv4 first and followed by IPv6 if // IPv4 fails. The exceptions will be throw to upper level and be handled as before. try { @@ -582,7 +582,7 @@ public override uint Send(SNIPacket packet) } // this lock ensures that two packets are not being written to the transport at the same time - // so that sending a standard and an out-of-band packet are both written atomically no data is + // so that sending a standard and an out-of-band packet are both written atomically no data is // interleaved lock (_sendSync) { @@ -623,67 +623,65 @@ public override uint Send(SNIPacket packet) public override uint Receive(out SNIPacket packet, int timeoutInMilliseconds) { SNIPacket errorPacket; - lock (this) + packet = null; + try { - packet = null; - try + if (timeoutInMilliseconds > 0) { - if (timeoutInMilliseconds > 0) - { - _socket.ReceiveTimeout = timeoutInMilliseconds; - } - else if (timeoutInMilliseconds == -1) - { // SqlCient internally represents infinite timeout by -1, and for TcpClient this is translated to a timeout of 0 - _socket.ReceiveTimeout = 0; - } - else - { - // otherwise it is timeout for 0 or less than -1 - ReportTcpSNIError(0, SNICommon.ConnTimeoutError, string.Empty); - return TdsEnums.SNI_WAIT_TIMEOUT; - } - - packet = RentPacket(headerSize: 0, dataSize: _bufferSize); - packet.ReadFromStream(_stream); - - if (packet.Length == 0) - { - errorPacket = packet; - packet = null; - var e = new Win32Exception(); - return ReportErrorAndReleasePacket(errorPacket, (uint)e.NativeErrorCode, 0, e.Message); - } - - return TdsEnums.SNI_SUCCESS; + _socket.ReceiveTimeout = timeoutInMilliseconds; } - catch (ObjectDisposedException ode) + else if (timeoutInMilliseconds == -1) { - errorPacket = packet; - packet = null; - return ReportErrorAndReleasePacket(errorPacket, ode); + // SqlClient internally represents infinite timeout by -1, and for TcpClient this is translated to a timeout of 0 + _socket.ReceiveTimeout = 0; } - catch (SocketException se) + else { - errorPacket = packet; - packet = null; - return ReportErrorAndReleasePacket(errorPacket, se); + // otherwise it is timeout for 0 or less than -1 + ReportTcpSNIError(0, SNICommon.ConnTimeoutError, string.Empty); + return TdsEnums.SNI_WAIT_TIMEOUT; } - catch (IOException ioe) + + packet = RentPacket(headerSize: 0, dataSize: _bufferSize); + packet.ReadFromStream(_stream); + + if (packet.Length == 0) { errorPacket = packet; packet = null; - uint errorCode = ReportErrorAndReleasePacket(errorPacket, ioe); - if (ioe.InnerException is SocketException socketException && socketException.SocketErrorCode == SocketError.TimedOut) - { - errorCode = TdsEnums.SNI_WAIT_TIMEOUT; - } - - return errorCode; + var e = new Win32Exception(); + return ReportErrorAndReleasePacket(errorPacket, (uint)e.NativeErrorCode, 0, e.Message); } - finally + + return TdsEnums.SNI_SUCCESS; + } + catch (ObjectDisposedException ode) + { + errorPacket = packet; + packet = null; + return ReportErrorAndReleasePacket(errorPacket, ode); + } + catch (SocketException se) + { + errorPacket = packet; + packet = null; + return ReportErrorAndReleasePacket(errorPacket, se); + } + catch (IOException ioe) + { + errorPacket = packet; + packet = null; + uint errorCode = ReportErrorAndReleasePacket(errorPacket, ioe); + if (ioe.InnerException is SocketException socketException && socketException.SocketErrorCode == SocketError.TimedOut) { - _socket.ReceiveTimeout = 0; + errorCode = TdsEnums.SNI_WAIT_TIMEOUT; } + + return errorCode; + } + finally + { + _socket.ReceiveTimeout = 0; } } @@ -750,15 +748,15 @@ public override uint CheckConnection() { try { - // _socket.Poll method with argument SelectMode.SelectRead returns + // _socket.Poll method with argument SelectMode.SelectRead returns // True : if Listen has been called and a connection is pending, or // True : if data is available for reading, or // True : if the connection has been closed, reset, or terminated, i.e no active connection. // False : otherwise. // _socket.Available property returns the number of bytes of data available to read. // - // Since _socket.Connected alone doesn't guarantee if the connection is still active, we use it in - // combination with _socket.Poll method and _socket.Available == 0 check. When both of them + // Since _socket.Connected alone doesn't guarantee if the connection is still active, we use it in + // combination with _socket.Poll method and _socket.Available == 0 check. When both of them // return true we can safely determine that the connection is no longer active. if (!_socket.Connected || (_socket.Poll(100, SelectMode.SelectRead) && _socket.Available == 0)) { diff --git a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs index f6c2c02896..7d713c0e97 100644 --- a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs +++ b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs @@ -13,8 +13,8 @@ public class AsyncCancelledConnectionsTest { private readonly ITestOutputHelper _output; - // TODO: Set back the count to 100 tasks after fixing driver issue #422. - // Currently the count is reduced to 25 as we encounter error 258 for "Encrypted" connections. + // TODO: Set back the count to 100 tasks after fixing driver issue. + // Currently the count is reduced to 25 as we encounter error "Execution Timeout expired" for "Encrypted" connections. private const int NumberOfTasks = 25; // How many attempts to poison the connection pool we will try private const int NumberOfNonPoisoned = 10; // Number of normal requests for each attempt From ddcfd4122dee0bf42e076a5387207b62cd01898d Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Mon, 16 Nov 2020 12:14:09 -0800 Subject: [PATCH 06/18] Improvements --- .../netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs | 5 ++--- .../netcore/src/Microsoft/Data/SqlClient/SqlCommand.cs | 2 +- .../netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs | 3 ++- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs index 19d49fb28c..b8cf30d937 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs @@ -28,9 +28,8 @@ public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertifi public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { _readQueueSemaphore.Wait(); - Task t = base.ReadAsync(buffer, offset, count, cancellationToken); - _readQueueSemaphore.Release(); - return t; + return base.ReadAsync(buffer, offset, count, cancellationToken) + .ContinueWith(t => _readQueueSemaphore.Release(t.Result)); } // Prevent the WriteAsync's collision by running task in Semaphore Slim diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlCommand.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlCommand.cs index e19ee3eba0..10d5064a87 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlCommand.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlCommand.cs @@ -1319,7 +1319,7 @@ private void ThrowIfReconnectionHasBeenCanceled() if (_stateObj == null) { var reconnectionCompletionSource = _reconnectionCompletionSource; - if (reconnectionCompletionSource != null && reconnectionCompletionSource.Task.IsCanceled) + if (reconnectionCompletionSource != null && reconnectionCompletionSource.Task != null && reconnectionCompletionSource.Task.IsCanceled) { throw SQL.CR_ReconnectionCancelled(); } diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs index a6b9688afa..82f029a2d3 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs @@ -2170,9 +2170,10 @@ public Task WaitAsync() return tcs.Task; } - public void Release() + public int Release(int i = default) { _semaphore.Release(); + return i; } } From f6b5da432839c6fc6df6c700430b652258511e03 Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Mon, 16 Nov 2020 23:40:07 -0800 Subject: [PATCH 07/18] Unblock Sync overrides --- .../Data/SqlClient/SNI/SNISslStream.cs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs index b8cf30d937..ae4ed9b5a4 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs @@ -14,30 +14,28 @@ namespace Microsoft.Data.SqlClient.SNI /// internal class SNISslStream : SslStream { - private readonly ConcurrentQueueSemaphore _writeQueueSemaphore; - private readonly ConcurrentQueueSemaphore _readQueueSemaphore; + private readonly ConcurrentQueueSemaphore _writeAsyncQueueSemaphore; + private readonly ConcurrentQueueSemaphore _readAsyncQueueSemaphore; public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertificateValidationCallback userCertificateValidationCallback) : base(innerStream, leaveInnerStreamOpen, userCertificateValidationCallback) { - _writeQueueSemaphore = new ConcurrentQueueSemaphore(1); - _readQueueSemaphore = new ConcurrentQueueSemaphore(1); + _writeAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1); + _readAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1); } // Prevent the ReadAsync's collision by running task in Semaphore Slim public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - _readQueueSemaphore.Wait(); - return base.ReadAsync(buffer, offset, count, cancellationToken) - .ContinueWith(t => _readQueueSemaphore.Release(t.Result)); + return _readAsyncQueueSemaphore.WaitAsync().ContinueWith(t => base.ReadAsync(buffer, offset, count, cancellationToken).Result) + .ContinueWith(t => _readAsyncQueueSemaphore.Release(t.Result)); } // Prevent the WriteAsync's collision by running task in Semaphore Slim public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - _writeQueueSemaphore.Wait(); - return base.WriteAsync(buffer, offset, count, cancellationToken) - .ContinueWith(t => _writeQueueSemaphore.Release()); + return _writeAsyncQueueSemaphore.WaitAsync().ContinueWith(t => base.WriteAsync(buffer, offset, count, cancellationToken)) + .ContinueWith(t => _writeAsyncQueueSemaphore.Release()); } } } From 3eaa757050b93d237d702a2121a225a2d86859a7 Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Tue, 17 Nov 2020 01:13:49 -0800 Subject: [PATCH 08/18] Move release to finally --- .../Data/SqlClient/SNI/SNISslStream.cs | 21 +++++++++++++++---- .../src/Microsoft/Data/SqlClient/SqlUtil.cs | 3 +-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs index ae4ed9b5a4..a49cc1ca0d 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs @@ -27,15 +27,28 @@ public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertifi // Prevent the ReadAsync's collision by running task in Semaphore Slim public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - return _readAsyncQueueSemaphore.WaitAsync().ContinueWith(t => base.ReadAsync(buffer, offset, count, cancellationToken).Result) - .ContinueWith(t => _readAsyncQueueSemaphore.Release(t.Result)); + try + { + return _readAsyncQueueSemaphore.WaitAsync() + .ContinueWith(_ => base.ReadAsync(buffer, offset, count, cancellationToken).GetAwaiter().GetResult()); + } + finally + { + _readAsyncQueueSemaphore.Release(); + } } // Prevent the WriteAsync's collision by running task in Semaphore Slim public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - return _writeAsyncQueueSemaphore.WaitAsync().ContinueWith(t => base.WriteAsync(buffer, offset, count, cancellationToken)) - .ContinueWith(t => _writeAsyncQueueSemaphore.Release()); + try + { + return _writeAsyncQueueSemaphore.WaitAsync().ContinueWith(_ => base.WriteAsync(buffer, offset, count, cancellationToken)); + } + finally + { + _writeAsyncQueueSemaphore.Release(); + } } } } diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs index 82f029a2d3..a6b9688afa 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs @@ -2170,10 +2170,9 @@ public Task WaitAsync() return tcs.Task; } - public int Release(int i = default) + public void Release() { _semaphore.Release(); - return i; } } From de07449086d02ae888164bd335db09326fa74099 Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Tue, 17 Nov 2020 01:40:54 -0800 Subject: [PATCH 09/18] Implementing SNINetworkStream fixes issue 422 for non-encrypted TCP connections. --- .../src/Microsoft.Data.SqlClient.csproj | 2 +- .../SNI/{SNISslStream.cs => SNIStreams.cs} | 45 ++++++++++++++++++- .../Data/SqlClient/SNI/SNITcpHandle.cs | 2 +- 3 files changed, 46 insertions(+), 3 deletions(-) rename src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/{SNISslStream.cs => SNIStreams.cs} (52%) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj index 70d506bbf3..2bb10aeca7 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj @@ -442,7 +442,7 @@ - + diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs similarity index 52% rename from src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs rename to src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs index a49cc1ca0d..c52e0028f5 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNISslStream.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs @@ -6,6 +6,7 @@ using System.IO; using System.Threading; using System.Threading.Tasks; +using System.Net.Sockets; namespace Microsoft.Data.SqlClient.SNI { @@ -30,7 +31,49 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel try { return _readAsyncQueueSemaphore.WaitAsync() - .ContinueWith(_ => base.ReadAsync(buffer, offset, count, cancellationToken).GetAwaiter().GetResult()); + .ContinueWith(_ => base.ReadAsync(buffer, offset, count, cancellationToken).GetAwaiter().GetResult()); + } + finally + { + _readAsyncQueueSemaphore.Release(); + } + } + + // Prevent the WriteAsync's collision by running task in Semaphore Slim + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + try + { + return _writeAsyncQueueSemaphore.WaitAsync().ContinueWith(_ => base.WriteAsync(buffer, offset, count, cancellationToken)); + } + finally + { + _writeAsyncQueueSemaphore.Release(); + } + } + } + + /// + /// This class extends NetworkStream to customize stream behavior for Managed SNI implementation. + /// + internal class SNINetworkStream : NetworkStream + { + private readonly ConcurrentQueueSemaphore _writeAsyncQueueSemaphore; + private readonly ConcurrentQueueSemaphore _readAsyncQueueSemaphore; + + public SNINetworkStream(Socket socket, bool ownsSocket) : base(socket, ownsSocket) + { + _writeAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1); + _readAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1); + } + + // Prevent the ReadAsync's collision by running task in Semaphore Slim + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + try + { + return _readAsyncQueueSemaphore.WaitAsync() + .ContinueWith(_ => base.ReadAsync(buffer, offset, count, cancellationToken).GetAwaiter().GetResult()); } finally { diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs index f919d3cf52..3bf828d749 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs @@ -223,7 +223,7 @@ public SNITCPHandle(string serverName, int port, long timerExpire, object callba } _socket.NoDelay = true; - _tcpStream = new NetworkStream(_socket, true); + _tcpStream = new SNINetworkStream(_socket, true); _sslOverTdsStream = new SslOverTdsStream(_tcpStream); _sslStream = new SNISslStream(_sslOverTdsStream, true, new RemoteCertificateValidationCallback(ValidateServerCertificate)); From acfcb35785465cad202991086a3c403c8227c78c Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Tue, 17 Nov 2020 01:54:00 -0800 Subject: [PATCH 10/18] Revert test change --- .../SQL/AsyncTest/AsyncCancelledConnectionsTest.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs index 7d713c0e97..7ff00b8335 100644 --- a/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs +++ b/src/Microsoft.Data.SqlClient/tests/ManualTests/SQL/AsyncTest/AsyncCancelledConnectionsTest.cs @@ -13,9 +13,7 @@ public class AsyncCancelledConnectionsTest { private readonly ITestOutputHelper _output; - // TODO: Set back the count to 100 tasks after fixing driver issue. - // Currently the count is reduced to 25 as we encounter error "Execution Timeout expired" for "Encrypted" connections. - private const int NumberOfTasks = 25; // How many attempts to poison the connection pool we will try + private const int NumberOfTasks = 100; // How many attempts to poison the connection pool we will try private const int NumberOfNonPoisoned = 10; // Number of normal requests for each attempt From 664529a3b408ec88cc5cc4038da5de903b79a86c Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Tue, 17 Nov 2020 12:59:55 -0800 Subject: [PATCH 11/18] Change to async/await --- .../Data/SqlClient/SNI/SNIStreams.cs | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs index c52e0028f5..28097d85d3 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs @@ -26,12 +26,12 @@ public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertifi } // Prevent the ReadAsync's collision by running task in Semaphore Slim - public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { + await _readAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false); try { - return _readAsyncQueueSemaphore.WaitAsync() - .ContinueWith(_ => base.ReadAsync(buffer, offset, count, cancellationToken).GetAwaiter().GetResult()); + return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); } finally { @@ -40,11 +40,12 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel } // Prevent the WriteAsync's collision by running task in Semaphore Slim - public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { + await _writeAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false); try { - return _writeAsyncQueueSemaphore.WaitAsync().ContinueWith(_ => base.WriteAsync(buffer, offset, count, cancellationToken)); + await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); } finally { @@ -68,12 +69,12 @@ public SNINetworkStream(Socket socket, bool ownsSocket) : base(socket, ownsSocke } // Prevent the ReadAsync's collision by running task in Semaphore Slim - public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { + await _readAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false); try { - return _readAsyncQueueSemaphore.WaitAsync() - .ContinueWith(_ => base.ReadAsync(buffer, offset, count, cancellationToken).GetAwaiter().GetResult()); + return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); } finally { @@ -82,11 +83,12 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel } // Prevent the WriteAsync's collision by running task in Semaphore Slim - public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { + await _writeAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false); try { - return _writeAsyncQueueSemaphore.WaitAsync().ContinueWith(_ => base.WriteAsync(buffer, offset, count, cancellationToken)); + await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); } finally { From dcd2531cb5740cb6f8f01bb721319f57ff87fb1d Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Tue, 17 Nov 2020 13:10:28 -0800 Subject: [PATCH 12/18] Apply suggestions from code review Co-authored-by: David Engel --- .../src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs index 28097d85d3..563ab44544 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs @@ -25,7 +25,7 @@ public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertifi _readAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1); } - // Prevent the ReadAsync's collision by running task in Semaphore Slim + // Prevent ReadAsync collisions by running the task in a Semaphore Slim public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { await _readAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false); @@ -39,7 +39,7 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, } } - // Prevent the WriteAsync's collision by running task in Semaphore Slim + // Prevent the WriteAsync collisions by running the task in a Semaphore Slim public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { await _writeAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false); @@ -68,7 +68,7 @@ public SNINetworkStream(Socket socket, bool ownsSocket) : base(socket, ownsSocke _readAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1); } - // Prevent the ReadAsync's collision by running task in Semaphore Slim + // Prevent the ReadAsync collisions by running the task in a Semaphore Slim public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { await _readAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false); @@ -82,7 +82,7 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, } } - // Prevent the WriteAsync's collision by running task in Semaphore Slim + // Prevent the WriteAsync collisions by running the task in a Semaphore Slim public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { await _writeAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false); From cd0153a9a63ca9b7cf379f83b0ddcc689ec9eb7d Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Tue, 17 Nov 2020 15:09:41 -0800 Subject: [PATCH 13/18] Reset locks --- .../Data/SqlClient/SNI/SNINpHandle.cs | 109 ++++++------ .../Data/SqlClient/SNI/SNITcpHandle.cs | 157 +++++++++--------- 2 files changed, 136 insertions(+), 130 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs index 9280bbfc54..c3fbe6b6c4 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs @@ -189,37 +189,37 @@ public override uint Receive(out SNIPacket packet, int timeout) try { SNIPacket errorPacket; - packet = null; - try - { - packet = RentPacket(headerSize: 0, dataSize: _bufferSize); - packet.ReadFromStream(_stream); + packet = null; + try + { + packet = RentPacket(headerSize: 0, dataSize: _bufferSize); + packet.ReadFromStream(_stream); - if (packet.Length == 0) + if (packet.Length == 0) + { + errorPacket = packet; + packet = null; + var e = new Win32Exception(); + SqlClientEventSource.Log.TrySNITraceEvent(" packet length is 0."); + return ReportErrorAndReleasePacket(errorPacket, (uint)e.NativeErrorCode, 0, e.Message); + } + } + catch (ObjectDisposedException ode) { errorPacket = packet; packet = null; - var e = new Win32Exception(); - SqlClientEventSource.Log.TrySNITraceEvent(" packet length is 0."); - return ReportErrorAndReleasePacket(errorPacket, (uint)e.NativeErrorCode, 0, e.Message); + SqlClientEventSource.Log.TrySNITraceEvent(" ObjectDisposedException message = {0}.", ode.Message); + return ReportErrorAndReleasePacket(errorPacket, ode); } + catch (IOException ioe) + { + errorPacket = packet; + packet = null; + SqlClientEventSource.Log.TrySNITraceEvent(" IOException message = {0}.", ioe.Message); + return ReportErrorAndReleasePacket(errorPacket, ioe); + } + return TdsEnums.SNI_SUCCESS; } - catch (ObjectDisposedException ode) - { - errorPacket = packet; - packet = null; - SqlClientEventSource.Log.TrySNITraceEvent(" ObjectDisposedException message = {0}.", ode.Message); - return ReportErrorAndReleasePacket(errorPacket, ode); - } - catch (IOException ioe) - { - errorPacket = packet; - packet = null; - SqlClientEventSource.Log.TrySNITraceEvent(" IOException message = {0}.", ioe.Message); - return ReportErrorAndReleasePacket(errorPacket, ioe); - } - return TdsEnums.SNI_SUCCESS; - } finally { SqlClientEventSource.Log.TrySNIScopeLeaveEvent(scopeID); @@ -268,40 +268,43 @@ public override uint Send(SNIPacket packet) bool releaseLock = false; try { - // is the packet is marked out out-of-band (attention packets only) it must be - // sent immediately even if a send of recieve operation is already in progress - // because out of band packets are used to cancel ongoing operations - // so try to take the lock if possible but continue even if it can't be taken - if (packet.IsOutOfBand) + lock (this) { - Monitor.TryEnter(this, ref releaseLock); - } - else - { - Monitor.Enter(this); - releaseLock = true; - } - - // this lock ensures that two packets are not being written to the transport at the same time - // so that sending a standard and an out-of-band packet are both written atomically no data is - // interleaved - lock (_sendSync) - { - try + // is the packet is marked out out-of-band (attention packets only) it must be + // sent immediately even if a send of recieve operation is already in progress + // because out of band packets are used to cancel ongoing operations + // so try to take the lock if possible but continue even if it can't be taken + if (packet.IsOutOfBand) { - packet.WriteToStream(_stream); - return TdsEnums.SNI_SUCCESS; + Monitor.TryEnter(this, ref releaseLock); } - catch (ObjectDisposedException ode) + else { - SqlClientEventSource.Log.TrySNITraceEvent(" ObjectDisposedException message = {0}.", ode.Message); - return ReportErrorAndReleasePacket(packet, ode); + Monitor.Enter(this); + releaseLock = true; } - catch (IOException ioe) - { - SqlClientEventSource.Log.TrySNITraceEvent(" IOException message = {0}.", ioe.Message); - return ReportErrorAndReleasePacket(packet, ioe); + // this lock ensures that two packets are not being written to the transport at the same time + // so that sending a standard and an out-of-band packet are both written atomically no data is + // interleaved + lock (_sendSync) + { + try + { + packet.WriteToStream(_stream); + return TdsEnums.SNI_SUCCESS; + } + catch (ObjectDisposedException ode) + { + SqlClientEventSource.Log.TrySNITraceEvent(" ObjectDisposedException message = {0}.", ode.Message); + return ReportErrorAndReleasePacket(packet, ode); + } + catch (IOException ioe) + { + SqlClientEventSource.Log.TrySNITraceEvent(" IOException message = {0}.", ioe.Message); + + return ReportErrorAndReleasePacket(packet, ioe); + } } } } diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs index 3bf828d749..ef85841d24 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITcpHandle.cs @@ -566,45 +566,45 @@ public override uint Send(SNIPacket packet) { bool releaseLock = false; try - { - // is the packet is marked out out-of-band (attention packets only) it must be - // sent immediately even if a send of recieve operation is already in progress - // because out of band packets are used to cancel ongoing operations - // so try to take the lock if possible but continue even if it can't be taken - if (packet.IsOutOfBand) - { - Monitor.TryEnter(this, ref releaseLock); - } - else { - Monitor.Enter(this); - releaseLock = true; - } - - // this lock ensures that two packets are not being written to the transport at the same time - // so that sending a standard and an out-of-band packet are both written atomically no data is - // interleaved - lock (_sendSync) - { - try - { - packet.WriteToStream(_stream); - return TdsEnums.SNI_SUCCESS; - } - catch (ObjectDisposedException ode) + // is the packet is marked out out-of-band (attention packets only) it must be + // sent immediately even if a send of recieve operation is already in progress + // because out of band packets are used to cancel ongoing operations + // so try to take the lock if possible but continue even if it can't be taken + if (packet.IsOutOfBand) { - return ReportTcpSNIError(ode); + Monitor.TryEnter(this, ref releaseLock); } - catch (SocketException se) + else { - return ReportTcpSNIError(se); + Monitor.Enter(this); + releaseLock = true; } - catch (IOException ioe) + + // this lock ensures that two packets are not being written to the transport at the same time + // so that sending a standard and an out-of-band packet are both written atomically no data is + // interleaved + lock (_sendSync) { - return ReportTcpSNIError(ioe); + try + { + packet.WriteToStream(_stream); + return TdsEnums.SNI_SUCCESS; + } + catch (ObjectDisposedException ode) + { + return ReportTcpSNIError(ode); + } + catch (SocketException se) + { + return ReportTcpSNIError(se); + } + catch (IOException ioe) + { + return ReportTcpSNIError(ioe); + } } } - } finally { if (releaseLock) @@ -623,65 +623,68 @@ public override uint Send(SNIPacket packet) public override uint Receive(out SNIPacket packet, int timeoutInMilliseconds) { SNIPacket errorPacket; - packet = null; - try + lock (this) { - if (timeoutInMilliseconds > 0) + packet = null; + try { - _socket.ReceiveTimeout = timeoutInMilliseconds; + if (timeoutInMilliseconds > 0) + { + _socket.ReceiveTimeout = timeoutInMilliseconds; + } + else if (timeoutInMilliseconds == -1) + { + // SqlClient internally represents infinite timeout by -1, and for TcpClient this is translated to a timeout of 0 + _socket.ReceiveTimeout = 0; + } + else + { + // otherwise it is timeout for 0 or less than -1 + ReportTcpSNIError(0, SNICommon.ConnTimeoutError, string.Empty); + return TdsEnums.SNI_WAIT_TIMEOUT; + } + + packet = RentPacket(headerSize: 0, dataSize: _bufferSize); + packet.ReadFromStream(_stream); + + if (packet.Length == 0) + { + errorPacket = packet; + packet = null; + var e = new Win32Exception(); + return ReportErrorAndReleasePacket(errorPacket, (uint)e.NativeErrorCode, 0, e.Message); + } + + return TdsEnums.SNI_SUCCESS; } - else if (timeoutInMilliseconds == -1) + catch (ObjectDisposedException ode) { - // SqlClient internally represents infinite timeout by -1, and for TcpClient this is translated to a timeout of 0 - _socket.ReceiveTimeout = 0; + errorPacket = packet; + packet = null; + return ReportErrorAndReleasePacket(errorPacket, ode); } - else + catch (SocketException se) { - // otherwise it is timeout for 0 or less than -1 - ReportTcpSNIError(0, SNICommon.ConnTimeoutError, string.Empty); - return TdsEnums.SNI_WAIT_TIMEOUT; + errorPacket = packet; + packet = null; + return ReportErrorAndReleasePacket(errorPacket, se); } - - packet = RentPacket(headerSize: 0, dataSize: _bufferSize); - packet.ReadFromStream(_stream); - - if (packet.Length == 0) + catch (IOException ioe) { errorPacket = packet; packet = null; - var e = new Win32Exception(); - return ReportErrorAndReleasePacket(errorPacket, (uint)e.NativeErrorCode, 0, e.Message); - } + uint errorCode = ReportErrorAndReleasePacket(errorPacket, ioe); + if (ioe.InnerException is SocketException socketException && socketException.SocketErrorCode == SocketError.TimedOut) + { + errorCode = TdsEnums.SNI_WAIT_TIMEOUT; + } - return TdsEnums.SNI_SUCCESS; - } - catch (ObjectDisposedException ode) - { - errorPacket = packet; - packet = null; - return ReportErrorAndReleasePacket(errorPacket, ode); - } - catch (SocketException se) - { - errorPacket = packet; - packet = null; - return ReportErrorAndReleasePacket(errorPacket, se); - } - catch (IOException ioe) - { - errorPacket = packet; - packet = null; - uint errorCode = ReportErrorAndReleasePacket(errorPacket, ioe); - if (ioe.InnerException is SocketException socketException && socketException.SocketErrorCode == SocketError.TimedOut) + return errorCode; + } + finally { - errorCode = TdsEnums.SNI_WAIT_TIMEOUT; + _socket.ReceiveTimeout = 0; } - - return errorCode; - } - finally - { - _socket.ReceiveTimeout = 0; } } From b743e4555fad715bdcc22dc8b83124b784c82cda Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Wed, 18 Nov 2020 11:21:39 -0800 Subject: [PATCH 14/18] Simplified design as SemaphoreSlim is concurrent - confirmed. --- .../Data/SqlClient/SNI/SNIStreams.cs | 16 +++---- .../src/Microsoft/Data/SqlClient/SqlUtil.cs | 43 ------------------- 2 files changed, 8 insertions(+), 51 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs index 563ab44544..d29c7221ff 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs @@ -15,14 +15,14 @@ namespace Microsoft.Data.SqlClient.SNI /// internal class SNISslStream : SslStream { - private readonly ConcurrentQueueSemaphore _writeAsyncQueueSemaphore; - private readonly ConcurrentQueueSemaphore _readAsyncQueueSemaphore; + private readonly SemaphoreSlim _writeAsyncQueueSemaphore; + private readonly SemaphoreSlim _readAsyncQueueSemaphore; public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertificateValidationCallback userCertificateValidationCallback) : base(innerStream, leaveInnerStreamOpen, userCertificateValidationCallback) { - _writeAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1); - _readAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1); + _writeAsyncQueueSemaphore = new SemaphoreSlim(1); + _readAsyncQueueSemaphore = new SemaphoreSlim(1); } // Prevent ReadAsync collisions by running the task in a Semaphore Slim @@ -59,13 +59,13 @@ public override async Task WriteAsync(byte[] buffer, int offset, int count, Canc /// internal class SNINetworkStream : NetworkStream { - private readonly ConcurrentQueueSemaphore _writeAsyncQueueSemaphore; - private readonly ConcurrentQueueSemaphore _readAsyncQueueSemaphore; + private readonly SemaphoreSlim _writeAsyncQueueSemaphore; + private readonly SemaphoreSlim _readAsyncQueueSemaphore; public SNINetworkStream(Socket socket, bool ownsSocket) : base(socket, ownsSocket) { - _writeAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1); - _readAsyncQueueSemaphore = new ConcurrentQueueSemaphore(1); + _writeAsyncQueueSemaphore = new SemaphoreSlim(1); + _readAsyncQueueSemaphore = new SemaphoreSlim(1); } // Prevent the ReadAsync collisions by running the task in a Semaphore Slim diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs index a6b9688afa..dda5f8e958 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs @@ -2133,47 +2133,4 @@ public static MethodInfo GetPromotedToken } } - /// - /// This class implements a FIFO Queue with SemaphoreSlim for ordered execution of parallel tasks. - /// Currently used in Managed SNI (SNISslStream) to override SslStream's WriteAsync implementation. - /// - internal class ConcurrentQueueSemaphore - { - private readonly SemaphoreSlim _semaphore; - private readonly ConcurrentQueue> _queue = - new ConcurrentQueue>(); - - public ConcurrentQueueSemaphore(int initialCount) - { - _semaphore = new SemaphoreSlim(initialCount); - } - - public ConcurrentQueueSemaphore(int initialCount, int maxCount) - { - _semaphore = new SemaphoreSlim(initialCount, maxCount); - } - - public void Wait() - { - WaitAsync().Wait(); - } - - public Task WaitAsync() - { - var tcs = new TaskCompletionSource(); - _queue.Enqueue(tcs); - _semaphore.WaitAsync().ContinueWith(t => - { - if (_queue.TryDequeue(out TaskCompletionSource popped)) - popped.SetResult(true); - }); - return tcs.Task; - } - - public void Release() - { - _semaphore.Release(); - } - } - }//namespace From 7d341af8e4dbdfd59870a7ca520f1b40787d9a76 Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Wed, 18 Nov 2020 11:40:34 -0800 Subject: [PATCH 15/18] Improve --- .../Data/SqlClient/SNI/SNIStreams.cs | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs index d29c7221ff..aed7eac964 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs @@ -15,41 +15,41 @@ namespace Microsoft.Data.SqlClient.SNI /// internal class SNISslStream : SslStream { - private readonly SemaphoreSlim _writeAsyncQueueSemaphore; - private readonly SemaphoreSlim _readAsyncQueueSemaphore; + private readonly SemaphoreSlim _writeAsyncSemaphore; + private readonly SemaphoreSlim _readAsyncSemaphore; public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertificateValidationCallback userCertificateValidationCallback) : base(innerStream, leaveInnerStreamOpen, userCertificateValidationCallback) { - _writeAsyncQueueSemaphore = new SemaphoreSlim(1); - _readAsyncQueueSemaphore = new SemaphoreSlim(1); + _writeAsyncSemaphore = new SemaphoreSlim(1); + _readAsyncSemaphore = new SemaphoreSlim(1); } // Prevent ReadAsync collisions by running the task in a Semaphore Slim public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - await _readAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false); + await _readAsyncSemaphore.WaitAsync().ConfigureAwait(false); try { - return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + return await base.ReadAsync(buffer, offset, count, cancellationToken); } finally { - _readAsyncQueueSemaphore.Release(); + _readAsyncSemaphore.Release(); } } // Prevent the WriteAsync collisions by running the task in a Semaphore Slim public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - await _writeAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false); + await _writeAsyncSemaphore.WaitAsync().ConfigureAwait(false); try { - await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + await base.WriteAsync(buffer, offset, count, cancellationToken); } finally { - _writeAsyncQueueSemaphore.Release(); + _writeAsyncSemaphore.Release(); } } } @@ -59,40 +59,40 @@ public override async Task WriteAsync(byte[] buffer, int offset, int count, Canc /// internal class SNINetworkStream : NetworkStream { - private readonly SemaphoreSlim _writeAsyncQueueSemaphore; - private readonly SemaphoreSlim _readAsyncQueueSemaphore; + private readonly SemaphoreSlim _writeAsyncSemaphore; + private readonly SemaphoreSlim _readAsyncSemaphore; public SNINetworkStream(Socket socket, bool ownsSocket) : base(socket, ownsSocket) { - _writeAsyncQueueSemaphore = new SemaphoreSlim(1); - _readAsyncQueueSemaphore = new SemaphoreSlim(1); + _writeAsyncSemaphore = new SemaphoreSlim(1); + _readAsyncSemaphore = new SemaphoreSlim(1); } // Prevent the ReadAsync collisions by running the task in a Semaphore Slim public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - await _readAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false); + await _readAsyncSemaphore.WaitAsync().ConfigureAwait(false); try { - return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + return await base.ReadAsync(buffer, offset, count, cancellationToken); } finally { - _readAsyncQueueSemaphore.Release(); + _readAsyncSemaphore.Release(); } } // Prevent the WriteAsync collisions by running the task in a Semaphore Slim public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - await _writeAsyncQueueSemaphore.WaitAsync().ConfigureAwait(false); + await _writeAsyncSemaphore.WaitAsync().ConfigureAwait(false); try { - await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + await base.WriteAsync(buffer, offset, count, cancellationToken); } finally { - _writeAsyncQueueSemaphore.Release(); + _writeAsyncSemaphore.Release(); } } } From f76e1c001a308cb3865a1b988925daac748f0f6c Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Wed, 18 Nov 2020 13:33:33 -0800 Subject: [PATCH 16/18] Revert unwanted changes --- .../Data/SqlClient/SNI/SNINpHandle.cs | 62 +++++++++---------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs index c3fbe6b6c4..0132d7df58 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNINpHandle.cs @@ -189,6 +189,8 @@ public override uint Receive(out SNIPacket packet, int timeout) try { SNIPacket errorPacket; + lock (this) + { packet = null; try { @@ -220,6 +222,7 @@ public override uint Receive(out SNIPacket packet, int timeout) } return TdsEnums.SNI_SUCCESS; } + } finally { SqlClientEventSource.Log.TrySNIScopeLeaveEvent(scopeID); @@ -268,43 +271,40 @@ public override uint Send(SNIPacket packet) bool releaseLock = false; try { - lock (this) + // is the packet is marked out out-of-band (attention packets only) it must be + // sent immediately even if a send of recieve operation is already in progress + // because out of band packets are used to cancel ongoing operations + // so try to take the lock if possible but continue even if it can't be taken + if (packet.IsOutOfBand) + { + Monitor.TryEnter(this, ref releaseLock); + } + else { - // is the packet is marked out out-of-band (attention packets only) it must be - // sent immediately even if a send of recieve operation is already in progress - // because out of band packets are used to cancel ongoing operations - // so try to take the lock if possible but continue even if it can't be taken - if (packet.IsOutOfBand) + Monitor.Enter(this); + releaseLock = true; + } + + // this lock ensures that two packets are not being written to the transport at the same time + // so that sending a standard and an out-of-band packet are both written atomically no data is + // interleaved + lock (_sendSync) + { + try { - Monitor.TryEnter(this, ref releaseLock); + packet.WriteToStream(_stream); + return TdsEnums.SNI_SUCCESS; } - else + catch (ObjectDisposedException ode) { - Monitor.Enter(this); - releaseLock = true; + SqlClientEventSource.Log.TrySNITraceEvent(" ObjectDisposedException message = {0}.", ode.Message); + return ReportErrorAndReleasePacket(packet, ode); } - - // this lock ensures that two packets are not being written to the transport at the same time - // so that sending a standard and an out-of-band packet are both written atomically no data is - // interleaved - lock (_sendSync) + catch (IOException ioe) { - try - { - packet.WriteToStream(_stream); - return TdsEnums.SNI_SUCCESS; - } - catch (ObjectDisposedException ode) - { - SqlClientEventSource.Log.TrySNITraceEvent(" ObjectDisposedException message = {0}.", ode.Message); - return ReportErrorAndReleasePacket(packet, ode); - } - catch (IOException ioe) - { - SqlClientEventSource.Log.TrySNITraceEvent(" IOException message = {0}.", ioe.Message); - - return ReportErrorAndReleasePacket(packet, ioe); - } + SqlClientEventSource.Log.TrySNITraceEvent(" IOException message = {0}.", ioe.Message); + + return ReportErrorAndReleasePacket(packet, ioe); } } } From 10421a9e1421b833331f6b117515dcf3d17109de Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Wed, 18 Nov 2020 15:12:18 -0800 Subject: [PATCH 17/18] Adding queue back to maintain FIFO for extra security. --- .../Data/SqlClient/SNI/SNIStreams.cs | 26 +++++------ .../src/Microsoft/Data/SqlClient/SqlUtil.cs | 43 +++++++++++++++++++ 2 files changed, 56 insertions(+), 13 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs index aed7eac964..b079eaa522 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs @@ -15,14 +15,14 @@ namespace Microsoft.Data.SqlClient.SNI /// internal class SNISslStream : SslStream { - private readonly SemaphoreSlim _writeAsyncSemaphore; - private readonly SemaphoreSlim _readAsyncSemaphore; + private readonly ConcurrentQueueSemaphore _writeAsyncSemaphore; + private readonly ConcurrentQueueSemaphore _readAsyncSemaphore; public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertificateValidationCallback userCertificateValidationCallback) : base(innerStream, leaveInnerStreamOpen, userCertificateValidationCallback) { - _writeAsyncSemaphore = new SemaphoreSlim(1); - _readAsyncSemaphore = new SemaphoreSlim(1); + _writeAsyncSemaphore = new ConcurrentQueueSemaphore(1); + _readAsyncSemaphore = new ConcurrentQueueSemaphore(1); } // Prevent ReadAsync collisions by running the task in a Semaphore Slim @@ -31,7 +31,7 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, await _readAsyncSemaphore.WaitAsync().ConfigureAwait(false); try { - return await base.ReadAsync(buffer, offset, count, cancellationToken); + return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); } finally { @@ -45,7 +45,7 @@ public override async Task WriteAsync(byte[] buffer, int offset, int count, Canc await _writeAsyncSemaphore.WaitAsync().ConfigureAwait(false); try { - await base.WriteAsync(buffer, offset, count, cancellationToken); + await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); } finally { @@ -59,22 +59,22 @@ public override async Task WriteAsync(byte[] buffer, int offset, int count, Canc /// internal class SNINetworkStream : NetworkStream { - private readonly SemaphoreSlim _writeAsyncSemaphore; - private readonly SemaphoreSlim _readAsyncSemaphore; + private readonly ConcurrentQueueSemaphore _writeAsyncSemaphore; + private readonly ConcurrentQueueSemaphore _readAsyncSemaphore; public SNINetworkStream(Socket socket, bool ownsSocket) : base(socket, ownsSocket) { - _writeAsyncSemaphore = new SemaphoreSlim(1); - _readAsyncSemaphore = new SemaphoreSlim(1); + _writeAsyncSemaphore = new ConcurrentQueueSemaphore(1); + _readAsyncSemaphore = new ConcurrentQueueSemaphore(1); } - // Prevent the ReadAsync collisions by running the task in a Semaphore Slim + // Prevent ReadAsync collisions by running the task in a Semaphore Slim public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { await _readAsyncSemaphore.WaitAsync().ConfigureAwait(false); try { - return await base.ReadAsync(buffer, offset, count, cancellationToken); + return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); } finally { @@ -88,7 +88,7 @@ public override async Task WriteAsync(byte[] buffer, int offset, int count, Canc await _writeAsyncSemaphore.WaitAsync().ConfigureAwait(false); try { - await base.WriteAsync(buffer, offset, count, cancellationToken); + await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); } finally { diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs index dda5f8e958..a6b9688afa 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs @@ -2133,4 +2133,47 @@ public static MethodInfo GetPromotedToken } } + /// + /// This class implements a FIFO Queue with SemaphoreSlim for ordered execution of parallel tasks. + /// Currently used in Managed SNI (SNISslStream) to override SslStream's WriteAsync implementation. + /// + internal class ConcurrentQueueSemaphore + { + private readonly SemaphoreSlim _semaphore; + private readonly ConcurrentQueue> _queue = + new ConcurrentQueue>(); + + public ConcurrentQueueSemaphore(int initialCount) + { + _semaphore = new SemaphoreSlim(initialCount); + } + + public ConcurrentQueueSemaphore(int initialCount, int maxCount) + { + _semaphore = new SemaphoreSlim(initialCount, maxCount); + } + + public void Wait() + { + WaitAsync().Wait(); + } + + public Task WaitAsync() + { + var tcs = new TaskCompletionSource(); + _queue.Enqueue(tcs); + _semaphore.WaitAsync().ContinueWith(t => + { + if (_queue.TryDequeue(out TaskCompletionSource popped)) + popped.SetResult(true); + }); + return tcs.Task; + } + + public void Release() + { + _semaphore.Release(); + } + } + }//namespace From d6b71ec96acced5151fe3bcfe3449eec90de1f7b Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Wed, 18 Nov 2020 16:55:38 -0800 Subject: [PATCH 18/18] Cleanup --- .../src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs | 8 ++++---- .../src/Microsoft/Data/SqlClient/SqlUtil.cs | 14 ++------------ 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs index b079eaa522..eb8661d022 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs @@ -28,7 +28,7 @@ public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertifi // Prevent ReadAsync collisions by running the task in a Semaphore Slim public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - await _readAsyncSemaphore.WaitAsync().ConfigureAwait(false); + await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); @@ -42,7 +42,7 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, // Prevent the WriteAsync collisions by running the task in a Semaphore Slim public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - await _writeAsyncSemaphore.WaitAsync().ConfigureAwait(false); + await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); @@ -71,7 +71,7 @@ public SNINetworkStream(Socket socket, bool ownsSocket) : base(socket, ownsSocke // Prevent ReadAsync collisions by running the task in a Semaphore Slim public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - await _readAsyncSemaphore.WaitAsync().ConfigureAwait(false); + await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); @@ -85,7 +85,7 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, // Prevent the WriteAsync collisions by running the task in a Semaphore Slim public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - await _writeAsyncSemaphore.WaitAsync().ConfigureAwait(false); + await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs index a6b9688afa..bf21c8db3a 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs @@ -2148,17 +2148,7 @@ public ConcurrentQueueSemaphore(int initialCount) _semaphore = new SemaphoreSlim(initialCount); } - public ConcurrentQueueSemaphore(int initialCount, int maxCount) - { - _semaphore = new SemaphoreSlim(initialCount, maxCount); - } - - public void Wait() - { - WaitAsync().Wait(); - } - - public Task WaitAsync() + public Task WaitAsync(CancellationToken cancellationToken) { var tcs = new TaskCompletionSource(); _queue.Enqueue(tcs); @@ -2166,7 +2156,7 @@ public Task WaitAsync() { if (_queue.TryDequeue(out TaskCompletionSource popped)) popped.SetResult(true); - }); + }, cancellationToken); return tcs.Task; }