New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change Ix.Async Amb() to cancel the losers, add unit tests #914
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good improvement to Amb
.
// The incoming cancellationToken should still be able to cancel both | ||
// | ||
|
||
var bothRegistry = cancellationToken.Register(() => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CancellationTokenSource.CreateLinkedTokenSource
could be used to do away with some of the Register
complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -58,6 +77,8 @@ async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken) | |||
AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator) | |||
}; | |||
|
|||
bothRegistry.Dispose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should both cancellation token sources be Cancel
'ed here as to unblock the non-failing MoveNextAsync
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We this in the N-ary case, but not here it seems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I'll update the code and add tests for this case.
{ | ||
individualTokenSources[i] = new CancellationTokenSource(); | ||
} | ||
var allIndividualDispose = cancellationToken.Register(() => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See remark above; linked cancellation token sources would be easier. They represent an OR between their own cancellation state and the cancellation state of the CancellationToken
they wrap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. This OR relation was not apparent to me from the documentation.
@@ -161,9 +198,14 @@ async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken) | |||
for (var i = 0; i < n; i++) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually clean up in opposite order for the binary variant. We may want to reverse this loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Updating.
{ | ||
await moveNextAsync.ConfigureAwait(false); | ||
} | ||
catch (TaskCanceledException) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worth considering passing the CancellationToken
down to this method and changing this exception handler using a when
filter to only handle the exception if the cancellation is due to the given token.
catch (TaskCanceledException tce) when (tce.CancellationToken == token)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't make this work. Upon cancellation, the tce.CancellationToken
is not equal to token
and tce.CancellationToken.IsCancellationRequested
is false whereas token.IsCancellationRequested
is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem is in Never
:
var task = new TaskCompletionSource<bool>();
_registration = _token.Register(state =>
((TaskCompletionSource<bool>)state).SetCanceled(), task);
return new ValueTask<bool>(task.Task);
When the TaskCompletionSource is unblocked by SetCanceled, there is no link to the _token there and the task produces an unrelated TaskCanceledException
. I don't know how to get the token into that exception. Would a SetException
work with a well prepared TaskCancellationException
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe with TrySetCancel(_token)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for Never
, that should be changed to carry the correct token on the exception for sure. TrySetCanceled
with a token is the right thing to use there, as @quinmars pointed out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bartdesmet Okay, are you saying let this particular catch not be predicated on the token, just catch all TaskCanceledException and ignore them.
As for not losing errors, RxJava uses a globar error handler callback that can be hooked and the undeliverable or suppressed exceptions be consumed/logged that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, giving this is AwaitMoveNextAsyncAndDispose
, I think it would be fair to catch all cancellations. However...
As I was looking over the code again, I noticed something interesting, namely that we await the outcome of Task.WhenAll
for all the losers from the finally
block, where we may be propagating an exception from enumerating over the winner. If our await
ing of the losers throws an exception, it will supersede the original exception, which would be counterintuitive. As such, it would be better to let exceptions for losers escape and always give priority to the winner's enumeration outcome (successful or exceptional) and not bother propagating losers' exceptions from finally
.
The design point I'm looking for is to be analogous to Task.WhenAny
's behavior which is Amb
for tasks. There, too, exceptions from losing tasks are not propagated. This said, it's a bit of a contrived analogy, given that waiting for the losers' outcomes for Task.WhenAny
would amount to doing a Task.WhenAll
, which would render it useless. There's no second rendez-vous with a task, unlike on async enumerators where all subsequent async operations have another chance of observing outcomes of losers. I still think though that the Task.WhenAny
analogy is valid, and maybe the design point for Amb
should be to be equivalent to await Task.WhenAny(enumerators.Select(e => e.MoveNextAsync().ToTask()))
for the first call to MoveNextAsync
, thus letting the losing tasks have their exceptions unhandled. We would also no longer block in the finally
handler for any losing tasks.
Maybe the following equivalence should hold as well? Given:
static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this Task<T> t)
{
yield return await t;
}
the following should hold:
static Task<bool> AssertAsync<T>(params Func<Task<T>> taskFactories)
{
var tasks = taskFactories.Select(tf => tf()).ToArray();
var taskAny = await ((Task<T>)await Task.WhenAny(tasks));
var enums = taskFactories.Select(tf => tf().ToAsyncEnumerable()).ToArray();
var enumAny = await AsyncEnumerableEx.Amb(enums).SingleAsync();
return taskAny == enumAny;
}
both in terms of returned value (assuming the tasks are completing deterministically across both sides of the assert), exception propagation behavior, and timing behavior in the face of non-terminating tasks (i.e. Task.WhenAny
would complete if any task terminates, while our current Amb
would block indefinitely).
With such a design, it'd be easier to explain IAsyncEnumerator<T>
as a multi-shot continuation equivalent of Task<T>
, which effectively is how async iterators are implemented and designed as well (using a single IValueTaskSource
shared instance being used for the returned ValueTask<bool>
values returns from MoveNetxtAsync
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the when from catch. I don't fully understand what you mean by the WhenAll/WhenAny part. Perhaps it could be resolved in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Will give it another thought and figure out what makes most sense to do here.
This PR changes the
Amb()
implementation to cancel the individual losers when there is a winner, which should unblock sources such asNever
. I also added the missing unit tests to verifyAmb
.