New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change Ix.Async Amb() to cancel the losers, add unit tests #914
Changes from 2 commits
6843410
def72d3
f1daa4f
e821150
8aab0f3
2701e76
c7ec529
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
// Licensed to the .NET Foundation under one or more agreements. | ||
// The .NET Foundation licenses this file to you under the Apache 2.0 License. | ||
// See the LICENSE file in the project root for more information. | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Threading.Tasks; | ||
using Xunit; | ||
|
||
namespace Tests | ||
{ | ||
public class Amb : AsyncEnumerableExTests | ||
{ | ||
[Fact] | ||
public void Amb_Null() | ||
{ | ||
Assert.Throws<ArgumentNullException>(() => AsyncEnumerableEx.Amb(default, Return42)); | ||
Assert.Throws<ArgumentNullException>(() => AsyncEnumerableEx.Amb(Return42, default)); | ||
} | ||
|
||
[Fact] | ||
public async Task Amb_First_Wins() | ||
{ | ||
var source = AsyncEnumerable.Range(1, 5).Amb(AsyncEnumerableEx.Never<int>()); | ||
|
||
var xs = source.GetAsyncEnumerator(); | ||
|
||
try | ||
{ | ||
for (var i = 1; i <= 5; i++) | ||
{ | ||
Assert.True(await xs.MoveNextAsync()); | ||
Assert.Equal(i, xs.Current); | ||
} | ||
|
||
Assert.False(await xs.MoveNextAsync()); | ||
} | ||
finally | ||
{ | ||
await xs.DisposeAsync(); | ||
} | ||
} | ||
|
||
[Fact] | ||
public async Task Amb_First_Wins_Alt() | ||
{ | ||
var source = AsyncEnumerable.Range(1, 5).Amb(AsyncEnumerable.Range(1, 5).SelectAwait(async v => | ||
{ | ||
await Task.Delay(500); | ||
return v; | ||
})); | ||
|
||
var xs = source.GetAsyncEnumerator(); | ||
|
||
try | ||
{ | ||
for (var i = 1; i <= 5; i++) | ||
{ | ||
Assert.True(await xs.MoveNextAsync()); | ||
Assert.Equal(i, xs.Current); | ||
} | ||
|
||
Assert.False(await xs.MoveNextAsync()); | ||
} | ||
finally | ||
{ | ||
await xs.DisposeAsync(); | ||
} | ||
} | ||
|
||
[Fact] | ||
public async Task Amb_Second_Wins() | ||
{ | ||
var source = AsyncEnumerableEx.Never<int>().Amb(AsyncEnumerable.Range(1, 5)); | ||
|
||
var xs = source.GetAsyncEnumerator(); | ||
|
||
try | ||
{ | ||
for (var i = 1; i <= 5; i++) | ||
{ | ||
Assert.True(await xs.MoveNextAsync()); | ||
Assert.Equal(i, xs.Current); | ||
} | ||
|
||
Assert.False(await xs.MoveNextAsync()); | ||
} | ||
finally | ||
{ | ||
await xs.DisposeAsync(); | ||
} | ||
} | ||
|
||
[Fact] | ||
public async Task Amb_Second_Wins_Alt() | ||
{ | ||
var source = AsyncEnumerable.Range(1, 5).SelectAwait(async v => | ||
{ | ||
await Task.Delay(500); | ||
return v; | ||
}).Amb(AsyncEnumerable.Range(6, 5)); | ||
|
||
var xs = source.GetAsyncEnumerator(); | ||
|
||
try | ||
{ | ||
for (var i = 1; i <= 5; i++) | ||
{ | ||
Assert.True(await xs.MoveNextAsync()); | ||
Assert.Equal(i + 5, xs.Current); | ||
} | ||
|
||
Assert.False(await xs.MoveNextAsync()); | ||
} | ||
finally | ||
{ | ||
await xs.DisposeAsync(); | ||
} | ||
} | ||
|
||
[Fact] | ||
public async Task Amb_Many_First_Wins() | ||
{ | ||
var source = AsyncEnumerableEx.Amb( | ||
AsyncEnumerable.Range(1, 5), | ||
AsyncEnumerableEx.Never<int>(), | ||
AsyncEnumerableEx.Never<int>() | ||
); | ||
|
||
var xs = source.GetAsyncEnumerator(); | ||
|
||
try | ||
{ | ||
for (var i = 1; i <= 5; i++) | ||
{ | ||
Assert.True(await xs.MoveNextAsync()); | ||
Assert.Equal(i, xs.Current); | ||
} | ||
|
||
Assert.False(await xs.MoveNextAsync()); | ||
} | ||
finally | ||
{ | ||
await xs.DisposeAsync(); | ||
} | ||
} | ||
|
||
[Fact] | ||
public async Task Amb_Many_Last_Wins() | ||
{ | ||
var source = AsyncEnumerableEx.Amb( | ||
AsyncEnumerableEx.Never<int>(), | ||
AsyncEnumerableEx.Never<int>(), | ||
AsyncEnumerable.Range(1, 5) | ||
); | ||
|
||
var xs = source.GetAsyncEnumerator(); | ||
|
||
try | ||
{ | ||
for (var i = 1; i <= 5; i++) | ||
{ | ||
Assert.True(await xs.MoveNextAsync()); | ||
Assert.Equal(i, xs.Current); | ||
} | ||
|
||
Assert.False(await xs.MoveNextAsync()); | ||
} | ||
finally | ||
{ | ||
await xs.DisposeAsync(); | ||
} | ||
} | ||
|
||
[Fact] | ||
public async Task Amb_Many_Enum_First_Wins() | ||
{ | ||
var source = AsyncEnumerableEx.Amb(new[] { | ||
AsyncEnumerable.Range(1, 5), | ||
AsyncEnumerableEx.Never<int>(), | ||
AsyncEnumerableEx.Never<int>() | ||
}.AsEnumerable() | ||
); | ||
|
||
var xs = source.GetAsyncEnumerator(); | ||
|
||
try | ||
{ | ||
for (var i = 1; i <= 5; i++) | ||
{ | ||
Assert.True(await xs.MoveNextAsync()); | ||
Assert.Equal(i, xs.Current); | ||
} | ||
|
||
Assert.False(await xs.MoveNextAsync()); | ||
} | ||
finally | ||
{ | ||
await xs.DisposeAsync(); | ||
} | ||
} | ||
|
||
[Fact] | ||
public async Task Amb_Many_Enum_Last_Wins() | ||
{ | ||
var source = AsyncEnumerableEx.Amb(new[] { | ||
AsyncEnumerableEx.Never<int>(), | ||
AsyncEnumerableEx.Never<int>(), | ||
AsyncEnumerable.Range(1, 5) | ||
}.AsEnumerable() | ||
); | ||
|
||
var xs = source.GetAsyncEnumerator(); | ||
|
||
try | ||
{ | ||
for (var i = 1; i <= 5; i++) | ||
{ | ||
Assert.True(await xs.MoveNextAsync()); | ||
Assert.Equal(i, xs.Current); | ||
} | ||
|
||
Assert.False(await xs.MoveNextAsync()); | ||
} | ||
finally | ||
{ | ||
await xs.DisposeAsync(); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,25 @@ async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken) | |
Task<bool>? firstMoveNext = null; | ||
Task<bool>? secondMoveNext = null; | ||
|
||
// | ||
// We need separate tokens for each source so that the non-winner can get disposed and unblocked | ||
// i.e., see Never() | ||
// | ||
|
||
var firstCancelToken = new CancellationTokenSource(); | ||
var secondCancelToken = new CancellationTokenSource(); | ||
|
||
// | ||
// The incoming cancellationToken should still be able to cancel both | ||
// | ||
|
||
var bothRegistry = cancellationToken.Register(() => | ||
{ | ||
firstCancelToken.Cancel(); | ||
secondCancelToken.Cancel(); | ||
}); | ||
|
||
|
||
try | ||
{ | ||
// | ||
|
@@ -36,15 +55,15 @@ async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken) | |
// adding a WhenAny combinator that does exactly that. We can even avoid calling AsTask. | ||
// | ||
|
||
firstEnumerator = first.GetAsyncEnumerator(cancellationToken); | ||
firstEnumerator = first.GetAsyncEnumerator(firstCancelToken.Token); | ||
firstMoveNext = firstEnumerator.MoveNextAsync().AsTask(); | ||
|
||
// | ||
// REVIEW: Order of operations has changed here compared to the original, but is now in sync with the N-ary | ||
// overload which performs GetAsyncEnumerator/MoveNextAsync in pairs, rather than phased. | ||
// | ||
|
||
secondEnumerator = second.GetAsyncEnumerator(cancellationToken); | ||
secondEnumerator = second.GetAsyncEnumerator(secondCancelToken.Token); | ||
secondMoveNext = secondEnumerator.MoveNextAsync().AsTask(); | ||
} | ||
catch | ||
|
@@ -58,6 +77,8 @@ async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken) | |
AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator) | ||
}; | ||
|
||
bothRegistry.Dispose(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should both cancellation token sources be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We this in the N-ary case, but not here it seems. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. I'll update the code and add tests for this case. |
||
|
||
await Task.WhenAll(cleanup).ConfigureAwait(false); | ||
|
||
throw; | ||
|
@@ -83,11 +104,13 @@ async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken) | |
if (moveNextWinner == firstMoveNext) | ||
{ | ||
winner = firstEnumerator; | ||
secondCancelToken.Cancel(); | ||
disposeLoser = AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator); | ||
} | ||
else | ||
{ | ||
winner = secondEnumerator; | ||
firstCancelToken.Cancel(); | ||
disposeLoser = AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator); | ||
} | ||
|
||
|
@@ -110,6 +133,8 @@ await using (winner.ConfigureAwait(false)) | |
} | ||
finally | ||
{ | ||
bothRegistry.Dispose(); | ||
|
||
// | ||
// REVIEW: This behavior differs from the original implementation in that we never discard any in flight | ||
// asynchronous operations. If an exception occurs while enumerating the winner, it can be | ||
|
@@ -143,12 +168,24 @@ async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken) | |
|
||
var enumerators = new IAsyncEnumerator<TSource>[n]; | ||
var moveNexts = new Task<bool>[n]; | ||
var individualTokenSources = new CancellationTokenSource[n]; | ||
for (var i = 0; i < n; i++) | ||
{ | ||
individualTokenSources[i] = new CancellationTokenSource(); | ||
} | ||
var allIndividualDispose = cancellationToken.Register(() => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See remark above; linked cancellation token sources would be easier. They represent an OR between their own cancellation state and the cancellation state of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. This OR relation was not apparent to me from the documentation. |
||
{ | ||
foreach (var tokenSource in individualTokenSources) | ||
{ | ||
tokenSource.Cancel(); | ||
} | ||
}); | ||
|
||
try | ||
{ | ||
for (var i = 0; i < n; i++) | ||
{ | ||
var enumerator = sources[i].GetAsyncEnumerator(cancellationToken); | ||
var enumerator = sources[i].GetAsyncEnumerator(individualTokenSources[i].Token); | ||
|
||
enumerators[i] = enumerator; | ||
moveNexts[i] = enumerator.MoveNextAsync().AsTask(); | ||
|
@@ -161,9 +198,14 @@ async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken) | |
for (var i = 0; i < n; i++) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We actually clean up in opposite order for the binary variant. We may want to reverse this loop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. Updating. |
||
{ | ||
cleanup[i] = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i]); | ||
|
||
individualTokenSources[i].Cancel(); | ||
} | ||
|
||
allIndividualDispose.Dispose(); | ||
|
||
await Task.WhenAll(cleanup).ConfigureAwait(false); | ||
|
||
throw; | ||
} | ||
|
||
|
@@ -189,6 +231,7 @@ async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken) | |
{ | ||
if (i != winnerIndex) | ||
{ | ||
individualTokenSources[i].Cancel(); | ||
var loserCleanupTask = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i]); | ||
loserCleanupTasks.Add(loserCleanupTask); | ||
} | ||
|
@@ -215,6 +258,8 @@ await using (winner.ConfigureAwait(false)) | |
} | ||
finally | ||
{ | ||
allIndividualDispose.Dispose(); | ||
|
||
await cleanupLosers.ConfigureAwait(false); | ||
} | ||
} | ||
|
@@ -236,7 +281,14 @@ await using (enumerator.ConfigureAwait(false)) | |
{ | ||
if (moveNextAsync != null) | ||
{ | ||
await moveNextAsync.ConfigureAwait(false); | ||
try | ||
{ | ||
await moveNextAsync.ConfigureAwait(false); | ||
} | ||
catch (TaskCanceledException) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's worth considering passing the catch (TaskCanceledException tce) when (tce.CancellationToken == token) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't make this work. Upon cancellation, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the problem is in var task = new TaskCompletionSource<bool>();
_registration = _token.Register(state =>
((TaskCompletionSource<bool>)state).SetCanceled(), task);
return new ValueTask<bool>(task.Task); When the TaskCompletionSource is unblocked by SetCanceled, there is no link to the _token there and the task produces an unrelated There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, that works. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bartdesmet Okay, are you saying let this particular catch not be predicated on the token, just catch all TaskCanceledException and ignore them. As for not losing errors, RxJava uses a globar error handler callback that can be hooked and the undeliverable or suppressed exceptions be consumed/logged that way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, giving this is As I was looking over the code again, I noticed something interesting, namely that we await the outcome of The design point I'm looking for is to be analogous to Maybe the following equivalence should hold as well? Given: static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this Task<T> t)
{
yield return await t;
} the following should hold: static Task<bool> AssertAsync<T>(params Func<Task<T>> taskFactories)
{
var tasks = taskFactories.Select(tf => tf()).ToArray();
var taskAny = await ((Task<T>)await Task.WhenAny(tasks));
var enums = taskFactories.Select(tf => tf().ToAsyncEnumerable()).ToArray();
var enumAny = await AsyncEnumerableEx.Amb(enums).SingleAsync();
return taskAny == enumAny;
} both in terms of returned value (assuming the tasks are completing deterministically across both sides of the assert), exception propagation behavior, and timing behavior in the face of non-terminating tasks (i.e. With such a design, it'd be easier to explain There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed the when from catch. I don't fully understand what you mean by the WhenAll/WhenAny part. Perhaps it could be resolved in a separate PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. Will give it another thought and figure out what makes most sense to do here. |
||
{ | ||
// ignored because of cancelling the non-winners | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CancellationTokenSource.CreateLinkedTokenSource
could be used to do away with some of theRegister
complexity.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.