Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Wraith2 committed Jun 17, 2021
1 parent 001e995 commit 4d612cc
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 115 deletions.
Expand Up @@ -344,7 +344,6 @@
<Compile Include="Microsoft\Data\SqlClient\SqlDiagnosticListener.NetStandard.cs" />
<Compile Include="Microsoft\Data\SqlClient\SqlDelegatedTransaction.NetStandard.cs" />
<Compile Include="Microsoft\Data\SqlClient\TdsParser.NetStandard.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\ConcurrentQueueSemaphore.NetStandard.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SNIStreams.Task.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\SslOverTdsStream.NetStandard.cs" />
</ItemGroup>
Expand Down Expand Up @@ -395,7 +394,6 @@
<Compile Include="Microsoft\Data\SqlClient\SNI\SslOverTdsStream.NetCoreApp.cs" />
<Compile Include="Microsoft\Data\SqlClient\SqlConnectionFactory.AssemblyLoadContext.cs" />
<Compile Include="Microsoft\Data\SqlClient\SqlDependencyUtils.AssemblyLoadContext.cs" />
<Compile Include="Microsoft\Data\SqlClient\SNI\ConcurrentQueueSemaphore.NetCoreApp.cs" />
<Compile Condition="$(TargetFramework.StartsWith('netcoreapp2.'))" Include="Microsoft\Data\SqlClient\SNI\SNIStreams.Task.cs" />
<Compile Condition="!$(TargetFramework.StartsWith('netcoreapp2.'))" Include="Microsoft\Data\SqlClient\SNI\SNIStreams.ValueTask.cs" />
</ItemGroup>
Expand Down

This file was deleted.

This file was deleted.

Expand Up @@ -15,29 +15,45 @@ namespace Microsoft.Data.SqlClient.SNI
/// </summary>
internal sealed partial class ConcurrentQueueSemaphore
{
private static readonly Action<Task, object> s_continuePop = ContinuePop;

private readonly SemaphoreSlim _semaphore;
private readonly ConcurrentQueue<TaskCompletionSource<bool>> _queue =
new ConcurrentQueue<TaskCompletionSource<bool>>();
private readonly ConcurrentQueue<TaskCompletionSource<bool>> _queue;

public ConcurrentQueueSemaphore(int initialCount)
{
_semaphore = new SemaphoreSlim(initialCount);
_queue = new ConcurrentQueue<TaskCompletionSource<bool>>();
}

public void Release()
public Task WaitAsync(CancellationToken cancellationToken)
{
_semaphore.Release();
// try sync wait with 0 which will not block to see if we need to do an async wait
if (_semaphore.Wait(0, cancellationToken))
{
return Task.CompletedTask;
}
else
{
var tcs = new TaskCompletionSource<bool>();
_queue.Enqueue(tcs);
_semaphore.WaitAsync().ContinueWith(
continuationAction: static (Task task, object state) =>
{
ConcurrentQueue<TaskCompletionSource<bool>> queue = (ConcurrentQueue<TaskCompletionSource<bool>>)state;
if (queue.TryDequeue(out TaskCompletionSource<bool> popped))
{
popped.SetResult(true);
}
},
state: _queue,
cancellationToken: cancellationToken
);
return tcs.Task;
}
}

private static void ContinuePop(Task task, object state)
public void Release()
{
ConcurrentQueue<TaskCompletionSource<bool>> queue = (ConcurrentQueue<TaskCompletionSource<bool>>)state;
if (queue.TryDequeue(out TaskCompletionSource<bool> popped))
{
popped.SetResult(true);
}
_semaphore.Release();
}
}

Expand Down
Expand Up @@ -12,15 +12,7 @@ internal sealed partial class SNISslStream
{
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ValueTask<int> valueTask = ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken);
if (valueTask.IsCompletedSuccessfully)
{
return Task.FromResult(valueTask.Result);
}
else
{
return valueTask.AsTask();
}
return ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
Expand All @@ -38,15 +30,7 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ValueTask valueTask = WriteAsync(new Memory<byte>(buffer, offset, count), cancellationToken);
if (valueTask.IsCompletedSuccessfully)
{
return Task.CompletedTask;
}
else
{
return valueTask.AsTask();
}
return WriteAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
Expand All @@ -63,20 +47,11 @@ public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, Cancella
}
}


internal sealed partial class SNINetworkStream
{
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ValueTask<int> valueTask = ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken);
if (valueTask.IsCompletedSuccessfully)
{
return Task.FromResult(valueTask.Result);
}
else
{
return valueTask.AsTask();
}
return ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
Expand All @@ -95,15 +70,7 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
// Prevent the WriteAsync collisions by running the task in a Semaphore Slim
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ValueTask valueTask = WriteAsync(new Memory<byte>(buffer, offset, count), cancellationToken);
if (valueTask.IsCompletedSuccessfully)
{
return Task.CompletedTask;
}
else
{
return valueTask.AsTask();
}
return WriteAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
Expand Down

0 comments on commit 4d612cc

Please sign in to comment.