diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj index 3e7db7d945..e8d8abbb22 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj @@ -279,7 +279,9 @@ + + @@ -309,6 +311,8 @@ + + @@ -436,6 +440,7 @@ Common\CoreLib\System\Threading\Tasks\TaskToApm.cs + @@ -447,8 +452,8 @@ - + @@ -778,7 +783,7 @@ - + diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/ConcurrentQueueSemaphore.NetCoreApp.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/ConcurrentQueueSemaphore.NetCoreApp.cs new file mode 100644 index 0000000000..871893bf58 --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/ConcurrentQueueSemaphore.NetCoreApp.cs @@ -0,0 +1,32 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Data.SqlClient.SNI +{ + internal sealed partial class ConcurrentQueueSemaphore + { + public ValueTask WaitAsync(CancellationToken cancellationToken) + { + // try sync wait with 0 which will not block to see if we need to do an async wait + if (_semaphore.Wait(0, cancellationToken)) + { + return new ValueTask(); + } + else + { + var tcs = new TaskCompletionSource(); + _queue.Enqueue(tcs); + _semaphore.WaitAsync().ContinueWith( + continuationAction: s_continuePop, + state: _queue, + cancellationToken: cancellationToken + ); + return new ValueTask(tcs.Task); + } + } + } +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/ConcurrentQueueSemaphore.NetStandard.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/ConcurrentQueueSemaphore.NetStandard.cs new file mode 100644 index 0000000000..d22b1b9c8c --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/ConcurrentQueueSemaphore.NetStandard.cs @@ -0,0 +1,24 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Data.SqlClient.SNI +{ + internal sealed partial class ConcurrentQueueSemaphore + { + public Task WaitAsync(CancellationToken cancellationToken) + { + var tcs = new TaskCompletionSource(); + _queue.Enqueue(tcs); + _semaphore.WaitAsync().ContinueWith( + continuationAction: s_continuePop, + state: _queue, + cancellationToken: cancellationToken + ); + return tcs.Task; + } + } +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/ConcurrentQueueSemaphore.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/ConcurrentQueueSemaphore.cs new file mode 100644 index 0000000000..f328059d23 --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/ConcurrentQueueSemaphore.cs @@ -0,0 +1,44 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Data.SqlClient.SNI +{ + /// + /// This class implements a FIFO Queue with SemaphoreSlim for ordered execution of parallel tasks. + /// Currently used in Managed SNI (SNISslStream) to override SslStream's WriteAsync implementation. + /// + internal sealed partial class ConcurrentQueueSemaphore + { + private static readonly Action s_continuePop = ContinuePop; + + private readonly SemaphoreSlim _semaphore; + private readonly ConcurrentQueue> _queue = + new ConcurrentQueue>(); + + public ConcurrentQueueSemaphore(int initialCount) + { + _semaphore = new SemaphoreSlim(initialCount); + } + + public void Release() + { + _semaphore.Release(); + } + + private static void ContinuePop(Task task, object state) + { + ConcurrentQueue> queue = (ConcurrentQueue>)state; + if (queue.TryDequeue(out TaskCompletionSource popped)) + { + popped.SetResult(true); + } + } + } + +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.NetCoreApp.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.NetCoreApp.cs new file mode 100644 index 0000000000..6a62e2b9ab --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.NetCoreApp.cs @@ -0,0 +1,126 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Net.Security; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using System.Net.Sockets; +using System; + +namespace Microsoft.Data.SqlClient.SNI +{ + + internal sealed partial class SNISslStream + { + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + ValueTask valueTask = ReadAsync(new Memory(buffer, offset, count), cancellationToken); + if (valueTask.IsCompletedSuccessfully) + { + return Task.FromResult(valueTask.Result); + } + else + { + return valueTask.AsTask(); + } + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + return await base.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + } + finally + { + _readAsyncSemaphore.Release(); + } + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + ValueTask valueTask = WriteAsync(new Memory(buffer, offset, count), cancellationToken); + if (valueTask.IsCompletedSuccessfully) + { + return Task.CompletedTask; + } + else + { + return valueTask.AsTask(); + } + } + + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + await base.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); + } + finally + { + _writeAsyncSemaphore.Release(); + } + } + } + + + internal sealed partial class SNINetworkStream + { + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + ValueTask valueTask = ReadAsync(new Memory(buffer, offset, count), cancellationToken); + if (valueTask.IsCompletedSuccessfully) + { + return Task.FromResult(valueTask.Result); + } + else + { + return valueTask.AsTask(); + } + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + return await base.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + } + finally + { + _readAsyncSemaphore.Release(); + } + } + + // Prevent the WriteAsync collisions by running the task in a Semaphore Slim + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + ValueTask valueTask = WriteAsync(new Memory(buffer, offset, count), cancellationToken); + if (valueTask.IsCompletedSuccessfully) + { + return Task.CompletedTask; + } + else + { + return valueTask.AsTask(); + } + } + + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + await base.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); + } + finally + { + _writeAsyncSemaphore.Release(); + } + } + } +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.NetStandard.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.NetStandard.cs new file mode 100644 index 0000000000..0a9ab39d7a --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.NetStandard.cs @@ -0,0 +1,72 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Net.Security; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using System.Net.Sockets; + +namespace Microsoft.Data.SqlClient.SNI +{ + internal sealed partial class SNISslStream + { + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + } + finally + { + _readAsyncSemaphore.Release(); + } + } + + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + } + finally + { + _writeAsyncSemaphore.Release(); + } + } + } + + internal sealed partial class SNINetworkStream + { + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + } + finally + { + _readAsyncSemaphore.Release(); + } + } + + // Prevent the WriteAsync collisions by running the task in a Semaphore Slim + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + } + finally + { + _writeAsyncSemaphore.Release(); + } + } + } +} diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs index eb8661d022..fbaef7b7ca 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIStreams.cs @@ -13,7 +13,7 @@ namespace Microsoft.Data.SqlClient.SNI /// /// This class extends SslStream to customize stream behavior for Managed SNI implementation. /// - internal class SNISslStream : SslStream + internal sealed partial class SNISslStream : SslStream { private readonly ConcurrentQueueSemaphore _writeAsyncSemaphore; private readonly ConcurrentQueueSemaphore _readAsyncSemaphore; @@ -24,40 +24,12 @@ public SNISslStream(Stream innerStream, bool leaveInnerStreamOpen, RemoteCertifi _writeAsyncSemaphore = new ConcurrentQueueSemaphore(1); _readAsyncSemaphore = new ConcurrentQueueSemaphore(1); } - - // Prevent ReadAsync collisions by running the task in a Semaphore Slim - public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - try - { - return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); - } - finally - { - _readAsyncSemaphore.Release(); - } - } - - // Prevent the WriteAsync collisions by running the task in a Semaphore Slim - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - try - { - await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); - } - finally - { - _writeAsyncSemaphore.Release(); - } - } } /// /// This class extends NetworkStream to customize stream behavior for Managed SNI implementation. /// - internal class SNINetworkStream : NetworkStream + internal sealed partial class SNINetworkStream : NetworkStream { private readonly ConcurrentQueueSemaphore _writeAsyncSemaphore; private readonly ConcurrentQueueSemaphore _readAsyncSemaphore; @@ -67,33 +39,5 @@ public SNINetworkStream(Socket socket, bool ownsSocket) : base(socket, ownsSocke _writeAsyncSemaphore = new ConcurrentQueueSemaphore(1); _readAsyncSemaphore = new ConcurrentQueueSemaphore(1); } - - // Prevent ReadAsync collisions by running the task in a Semaphore Slim - public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - await _readAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - try - { - return await base.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); - } - finally - { - _readAsyncSemaphore.Release(); - } - } - - // Prevent the WriteAsync collisions by running the task in a Semaphore Slim - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - await _writeAsyncSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - try - { - await base.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); - } - finally - { - _writeAsyncSemaphore.Release(); - } - } } } diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs index 72e9631a82..5cc11304cc 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlUtil.cs @@ -2133,48 +2133,4 @@ public static MethodInfo GetPromotedToken } } - /// - /// This class implements a FIFO Queue with SemaphoreSlim for ordered execution of parallel tasks. - /// Currently used in Managed SNI (SNISslStream) to override SslStream's WriteAsync implementation. - /// - internal class ConcurrentQueueSemaphore - { - private static readonly Action s_continuePop = ContinuePop; - - private readonly SemaphoreSlim _semaphore; - private readonly ConcurrentQueue> _queue = - new ConcurrentQueue>(); - - public ConcurrentQueueSemaphore(int initialCount) - { - _semaphore = new SemaphoreSlim(initialCount); - } - - public Task WaitAsync(CancellationToken cancellationToken) - { - var tcs = new TaskCompletionSource(); - _queue.Enqueue(tcs); - _semaphore.WaitAsync().ContinueWith( - continuationAction: s_continuePop, - state: _queue, - cancellationToken: cancellationToken - ); - return tcs.Task; - } - - public void Release() - { - _semaphore.Release(); - } - - private static void ContinuePop(Task task, object state) - { - ConcurrentQueue> queue = (ConcurrentQueue>)state; - if (queue.TryDequeue(out TaskCompletionSource popped)) - { - popped.SetResult(true); - } - } - } - -}//namespace +}