Skip to content

Commit

Permalink
Perf: Rework ExecuteReaderAsync to minimize allocations (#528)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wraith2 committed Jul 21, 2020
1 parent 415262d commit 7b82e07
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 108 deletions.
Expand Up @@ -706,6 +706,7 @@
<Reference Include="System.Memory" />
</ItemGroup>
<ItemGroup>
<Compile Include="Microsoft\Data\SqlClient\AAsyncCallContext.cs" />
<Compile Include="Resources\SR.Designer.cs">
<DesignTime>True</DesignTime>
<AutoGen>True</AutoGen>
Expand Down
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
Expand All @@ -15,6 +16,7 @@ namespace Microsoft.Data.ProviderBase
{
internal abstract partial class DbConnectionFactory
{
private static readonly Action<Task<DbConnectionInternal>, object> s_tryGetConnectionCompletedContinuation = TryGetConnectionCompletedContinuation;

internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions, DbConnectionInternal oldConnection, out DbConnectionInternal connection)
{
Expand Down Expand Up @@ -82,25 +84,7 @@ internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSour

// now that we have an antecedent task, schedule our work when it is completed.
// If it is a new slot or a completed task, this continuation will start right away.
newTask = s_pendingOpenNonPooled[idx].ContinueWith((_) =>
{
Transaction originalTransaction = ADP.GetCurrentTransaction();
try
{
ADP.SetCurrentTransaction(retry.Task.AsyncState as Transaction);
var newConnection = CreateNonPooledConnection(owningConnection, poolGroup, userOptions);
if ((oldConnection != null) && (oldConnection.State == ConnectionState.Open))
{
oldConnection.PrepareForReplaceConnection();
oldConnection.Dispose();
}
return newConnection;
}
finally
{
ADP.SetCurrentTransaction(originalTransaction);
}
}, cancellationTokenSource.Token, TaskContinuationOptions.LongRunning, TaskScheduler.Default);
newTask = CreateReplaceConnectionContinuation(s_pendingOpenNonPooled[idx], owningConnection, retry, userOptions, oldConnection, poolGroup, cancellationTokenSource);

// Place this new task in the slot so any future work will be queued behind it
s_pendingOpenNonPooled[idx] = newTask;
Expand All @@ -114,29 +98,11 @@ internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSour
}

// once the task is done, propagate the final results to the original caller
newTask.ContinueWith((task) =>
{
cancellationTokenSource.Dispose();
if (task.IsCanceled)
{
retry.TrySetException(ADP.ExceptionWithStackTrace(ADP.NonPooledOpenTimeout()));
}
else if (task.IsFaulted)
{
retry.TrySetException(task.Exception.InnerException);
}
else
{
if (!retry.TrySetResult(task.Result))
{
// The outer TaskCompletionSource was already completed
// Which means that we don't know if someone has messed with the outer connection in the middle of creation
// So the best thing to do now is to destroy the newly created connection
task.Result.DoomThisConnection();
task.Result.Dispose();
}
}
}, TaskScheduler.Default);
newTask.ContinueWith(
continuationAction: s_tryGetConnectionCompletedContinuation,
state: Tuple.Create(cancellationTokenSource, retry),
scheduler: TaskScheduler.Default
);

return false;
}
Expand Down Expand Up @@ -188,5 +154,62 @@ internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSour

return true;
}

private Task<DbConnectionInternal> CreateReplaceConnectionContinuation(Task<DbConnectionInternal> task, DbConnection owningConnection, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions, DbConnectionInternal oldConnection, DbConnectionPoolGroup poolGroup, CancellationTokenSource cancellationTokenSource)
{
return task.ContinueWith(
(_) =>
{
Transaction originalTransaction = ADP.GetCurrentTransaction();
try
{
ADP.SetCurrentTransaction(retry.Task.AsyncState as Transaction);
var newConnection = CreateNonPooledConnection(owningConnection, poolGroup, userOptions);
if ((oldConnection != null) && (oldConnection.State == ConnectionState.Open))
{
oldConnection.PrepareForReplaceConnection();
oldConnection.Dispose();
}
return newConnection;
}
finally
{
ADP.SetCurrentTransaction(originalTransaction);
}
},
cancellationTokenSource.Token,
TaskContinuationOptions.LongRunning,
TaskScheduler.Default
);
}

private static void TryGetConnectionCompletedContinuation(Task<DbConnectionInternal> task, object state)
{
Tuple<CancellationTokenSource, TaskCompletionSource<DbConnectionInternal>> parameters = (Tuple<CancellationTokenSource, TaskCompletionSource<DbConnectionInternal>>)state;
CancellationTokenSource source = parameters.Item1;
source.Dispose();

TaskCompletionSource<DbConnectionInternal> retryCompletionSource = parameters.Item2;

if (task.IsCanceled)
{
retryCompletionSource.TrySetException(ADP.ExceptionWithStackTrace(ADP.NonPooledOpenTimeout()));
}
else if (task.IsFaulted)
{
retryCompletionSource.TrySetException(task.Exception.InnerException);
}
else
{
if (!retryCompletionSource.TrySetResult(task.Result))
{
// The outer TaskCompletionSource was already completed
// Which means that we don't know if someone has messed with the outer connection in the middle of creation
// So the best thing to do now is to destroy the newly created connection
task.Result.DoomThisConnection();
task.Result.Dispose();
}
}
}
}
}
@@ -0,0 +1,80 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Threading.Tasks;

namespace Microsoft.Data.SqlClient
{
// this class is a base class for creating derived objects that will store state for async operations
// avoiding the use of closures and allowing caching/reuse of the instances for frequently used async
// calls
//
// DO derive from this and seal your class
// DO add additional fields or properties needed for the async operation and then override Clear to zero them
// DO override AfterClear and use the owner parameter to return the object to a cache location if you have one, this is the purpose of the method
// CONSIDER creating your own Set method that calls the base Set rather than providing a parameterized ctor, it is friendlier to caching
// DO NOT use this class' state after Dispose has been called. It will not throw ObjectDisposedException but it will be a cleared object

internal abstract class AAsyncCallContext<TOwner, TTask> : IDisposable
where TOwner : class
{
protected TOwner _owner;
protected TaskCompletionSource<TTask> _source;
protected IDisposable _disposable;

protected AAsyncCallContext()
{
}

protected AAsyncCallContext(TOwner owner, TaskCompletionSource<TTask> source, IDisposable disposable = null)
{
Set(owner, source, disposable);
}

protected void Set(TOwner owner, TaskCompletionSource<TTask> source, IDisposable disposable = null)
{
_owner = owner ?? throw new ArgumentNullException(nameof(owner));
_source = source ?? throw new ArgumentNullException(nameof(source));
_disposable = disposable;
}

protected void ClearCore()
{
_source = null;
_owner = default;
IDisposable copyDisposable = _disposable;
_disposable = null;
copyDisposable?.Dispose();
}

/// <summary>
/// override this method to cleanup instance data before ClearCore is called which will blank the base data
/// </summary>
protected virtual void Clear()
{
}

/// <summary>
/// override this method to do work after the instance has been totally blanked, intended for cache return etc
/// </summary>
protected virtual void AfterCleared(TOwner owner)
{
}

public void Dispose()
{
TOwner owner = _owner;
try
{
Clear();
}
finally
{
ClearCore();
}
AfterCleared(owner);
}
}
}
Expand Up @@ -2265,7 +2265,7 @@ private Task ReadWriteColumnValueAsync(int col)
return writeTask;
}

private void RegisterForConnectionCloseNotification<T>(ref Task<T> outerTask)
private Task<T> RegisterForConnectionCloseNotification<T>(Task<T> outerTask)
{
SqlConnection connection = _connection;
if (connection == null)
Expand All @@ -2274,7 +2274,7 @@ private void RegisterForConnectionCloseNotification<T>(ref Task<T> outerTask)
throw ADP.ClosedConnectionError();
}

connection.RegisterForConnectionCloseNotification<T>(ref outerTask, this, SqlReferenceCollection.BulkCopyTag);
return connection.RegisterForConnectionCloseNotification(outerTask, this, SqlReferenceCollection.BulkCopyTag);
}

// Runs a loop to copy all columns of a single row.
Expand Down Expand Up @@ -3057,7 +3057,7 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken)
source = new TaskCompletionSource<object>(); // Creating the completion source/Task that we pass to application
resultTask = source.Task;

RegisterForConnectionCloseNotification(ref resultTask);
resultTask = RegisterForConnectionCloseNotification(resultTask);
}

if (_destinationTableName == null)
Expand Down

0 comments on commit 7b82e07

Please sign in to comment.