From 6843410fc29d7f33147aad89a212d032274b1a50 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 5 Jun 2019 14:07:35 +0200 Subject: [PATCH 01/13] Add Amb tests, fix Amb not canceling the losers --- .../System/Linq/Operators/Amb.cs | 232 ++++++++++++++++++ .../System/Linq/Operators/Amb.cs | 60 ++++- 2 files changed, 288 insertions(+), 4 deletions(-) create mode 100644 Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Amb.cs diff --git a/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Amb.cs b/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Amb.cs new file mode 100644 index 0000000000..476263c879 --- /dev/null +++ b/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Amb.cs @@ -0,0 +1,232 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace Tests +{ + public class Amb : AsyncEnumerableExTests + { + [Fact] + public void Amb_Null() + { + Assert.Throws(() => AsyncEnumerableEx.Amb(default, Return42)); + Assert.Throws(() => AsyncEnumerableEx.Amb(Return42, default)); + } + + [Fact] + public async Task Amb_First_Wins() + { + var source = AsyncEnumerable.Range(1, 5).Amb(AsyncEnumerableEx.Never()); + + var xs = source.GetAsyncEnumerator(); + + try + { + for (var i = 1; i <= 5; i++) + { + Assert.True(await xs.MoveNextAsync()); + Assert.Equal(i, xs.Current); + } + + Assert.False(await xs.MoveNextAsync()); + } + finally + { + await xs.DisposeAsync(); + } + } + + [Fact] + public async Task Amb_First_Wins_Alt() + { + var source = AsyncEnumerable.Range(1, 5).Amb(AsyncEnumerable.Range(1, 5).SelectAwait(async v => + { + await Task.Delay(500); + return v; + })); + + var xs = source.GetAsyncEnumerator(); + + try + { + for (var i = 1; i <= 5; i++) + { + Assert.True(await xs.MoveNextAsync()); + Assert.Equal(i, xs.Current); + } + + Assert.False(await xs.MoveNextAsync()); + } + finally + { + await xs.DisposeAsync(); + } + } + + [Fact] + public async Task Amb_Second_Wins() + { + var source = AsyncEnumerableEx.Never().Amb(AsyncEnumerable.Range(1, 5)); + + var xs = source.GetAsyncEnumerator(); + + try + { + for (var i = 1; i <= 5; i++) + { + Assert.True(await xs.MoveNextAsync()); + Assert.Equal(i, xs.Current); + } + + Assert.False(await xs.MoveNextAsync()); + } + finally + { + await xs.DisposeAsync(); + } + } + + [Fact] + public async Task Amb_Second_Wins_Alt() + { + var source = AsyncEnumerable.Range(1, 5).SelectAwait(async v => + { + await Task.Delay(500); + return v; + }).Amb(AsyncEnumerable.Range(6, 5)); + + var xs = source.GetAsyncEnumerator(); + + try + { + for (var i = 1; i <= 5; i++) + { + Assert.True(await xs.MoveNextAsync()); + Assert.Equal(i + 5, xs.Current); + } + + Assert.False(await xs.MoveNextAsync()); + } + finally + { + await xs.DisposeAsync(); + } + } + + [Fact] + public async Task Amb_Many_First_Wins() + { + var source = AsyncEnumerableEx.Amb( + AsyncEnumerable.Range(1, 5), + AsyncEnumerableEx.Never(), + AsyncEnumerableEx.Never() + ); + + var xs = source.GetAsyncEnumerator(); + + try + { + for (var i = 1; i <= 5; i++) + { + Assert.True(await xs.MoveNextAsync()); + Assert.Equal(i, xs.Current); + } + + Assert.False(await xs.MoveNextAsync()); + } + finally + { + await xs.DisposeAsync(); + } + } + + [Fact] + public async Task Amb_Many_Last_Wins() + { + var source = AsyncEnumerableEx.Amb( + AsyncEnumerableEx.Never(), + AsyncEnumerableEx.Never(), + AsyncEnumerable.Range(1, 5) + ); + + var xs = source.GetAsyncEnumerator(); + + try + { + for (var i = 1; i <= 5; i++) + { + Assert.True(await xs.MoveNextAsync()); + Assert.Equal(i, xs.Current); + } + + Assert.False(await xs.MoveNextAsync()); + } + finally + { + await xs.DisposeAsync(); + } + } + + [Fact] + public async Task Amb_Many_Enum_First_Wins() + { + var source = AsyncEnumerableEx.Amb(new[] { + AsyncEnumerable.Range(1, 5), + AsyncEnumerableEx.Never(), + AsyncEnumerableEx.Never() + }.AsEnumerable() + ); + + var xs = source.GetAsyncEnumerator(); + + try + { + for (var i = 1; i <= 5; i++) + { + Assert.True(await xs.MoveNextAsync()); + Assert.Equal(i, xs.Current); + } + + Assert.False(await xs.MoveNextAsync()); + } + finally + { + await xs.DisposeAsync(); + } + } + + [Fact] + public async Task Amb_Many_Enum_Last_Wins() + { + var source = AsyncEnumerableEx.Amb(new[] { + AsyncEnumerableEx.Never(), + AsyncEnumerableEx.Never(), + AsyncEnumerable.Range(1, 5) + }.AsEnumerable() + ); + + var xs = source.GetAsyncEnumerator(); + + try + { + for (var i = 1; i <= 5; i++) + { + Assert.True(await xs.MoveNextAsync()); + Assert.Equal(i, xs.Current); + } + + Assert.False(await xs.MoveNextAsync()); + } + finally + { + await xs.DisposeAsync(); + } + } + } +} diff --git a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs index 849a41f8f5..289a1966ea 100644 --- a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs +++ b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs @@ -27,6 +27,25 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) Task? firstMoveNext = null; Task? secondMoveNext = null; + // + // We need separate tokens for each source so that the non-winner can get disposed and unblocked + // i.e., see Never() + // + + var firstCancelToken = new CancellationTokenSource(); + var secondCancelToken = new CancellationTokenSource(); + + // + // The incoming cancellationToken should still be able to cancel both + // + + var bothRegistry = cancellationToken.Register(() => + { + firstCancelToken.Cancel(); + secondCancelToken.Cancel(); + }); + + try { // @@ -36,7 +55,7 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) // adding a WhenAny combinator that does exactly that. We can even avoid calling AsTask. // - firstEnumerator = first.GetAsyncEnumerator(cancellationToken); + firstEnumerator = first.GetAsyncEnumerator(firstCancelToken.Token); firstMoveNext = firstEnumerator.MoveNextAsync().AsTask(); // @@ -44,7 +63,7 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) // overload which performs GetAsyncEnumerator/MoveNextAsync in pairs, rather than phased. // - secondEnumerator = second.GetAsyncEnumerator(cancellationToken); + secondEnumerator = second.GetAsyncEnumerator(secondCancelToken.Token); secondMoveNext = secondEnumerator.MoveNextAsync().AsTask(); } catch @@ -58,6 +77,8 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator) }; + bothRegistry.Dispose(); + await Task.WhenAll(cleanup).ConfigureAwait(false); throw; @@ -83,11 +104,13 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) if (moveNextWinner == firstMoveNext) { winner = firstEnumerator; + secondCancelToken.Cancel(); disposeLoser = AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator); } else { winner = secondEnumerator; + firstCancelToken.Cancel(); disposeLoser = AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator); } @@ -110,6 +133,8 @@ await using (winner.ConfigureAwait(false)) } finally { + bothRegistry.Dispose(); + // // REVIEW: This behavior differs from the original implementation in that we never discard any in flight // asynchronous operations. If an exception occurs while enumerating the winner, it can be @@ -143,12 +168,24 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) var enumerators = new IAsyncEnumerator[n]; var moveNexts = new Task[n]; + var individualTokenSources = new CancellationTokenSource[n]; + for (var i = 0; i < n; i++) + { + individualTokenSources[i] = new CancellationTokenSource(); + } + var allIndividualDispose = cancellationToken.Register(() => + { + foreach (var tokenSource in individualTokenSources) + { + tokenSource.Cancel(); + } + }); try { for (var i = 0; i < n; i++) { - var enumerator = sources[i].GetAsyncEnumerator(cancellationToken); + var enumerator = sources[i].GetAsyncEnumerator(individualTokenSources[i].Token); enumerators[i] = enumerator; moveNexts[i] = enumerator.MoveNextAsync().AsTask(); @@ -161,9 +198,14 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) for (var i = 0; i < n; i++) { cleanup[i] = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i]); + + individualTokenSources[i].Cancel(); } await Task.WhenAll(cleanup).ConfigureAwait(false); + + allIndividualDispose.Dispose(); + throw; } @@ -189,6 +231,7 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) { if (i != winnerIndex) { + individualTokenSources[i].Cancel(); var loserCleanupTask = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i]); loserCleanupTasks.Add(loserCleanupTask); } @@ -215,6 +258,8 @@ await using (winner.ConfigureAwait(false)) } finally { + allIndividualDispose.Dispose(); + await cleanupLosers.ConfigureAwait(false); } } @@ -236,7 +281,14 @@ await using (enumerator.ConfigureAwait(false)) { if (moveNextAsync != null) { - await moveNextAsync.ConfigureAwait(false); + try + { + await moveNextAsync.ConfigureAwait(false); + } + catch (TaskCanceledException) + { + // ignored because of cancelling the non-winners + } } } } From def72d3b581897ee17645def2675ea2c8a50f516 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 5 Jun 2019 14:08:35 +0200 Subject: [PATCH 02/13] Make dispose order consistent across the overloads --- .../System.Interactive.Async/System/Linq/Operators/Amb.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs index 289a1966ea..a5d6df9589 100644 --- a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs +++ b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs @@ -202,10 +202,10 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) individualTokenSources[i].Cancel(); } - await Task.WhenAll(cleanup).ConfigureAwait(false); - allIndividualDispose.Dispose(); + await Task.WhenAll(cleanup).ConfigureAwait(false); + throw; } From f1daa4f495bd41aaeaebb3b1eedd4987953ba186 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 6 Jun 2019 09:02:02 +0200 Subject: [PATCH 03/13] Update cleanup logic --- .../System/Linq/Operators/Amb.cs | 110 ++++++++++++++++++ .../System/Linq/Operators/Amb.cs | 56 +++------ 2 files changed, 127 insertions(+), 39 deletions(-) diff --git a/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Amb.cs b/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Amb.cs index 476263c879..6f4336d078 100644 --- a/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Amb.cs +++ b/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Amb.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Xunit; @@ -228,5 +229,114 @@ public async Task Amb_Many_Enum_Last_Wins() await xs.DisposeAsync(); } } + + + [Fact] + public async Task Amb_First_GetAsyncEnumerator_Crashes() + { + var source = new FailingGetAsyncEnumerator().Amb(AsyncEnumerableEx.Never()); + + var xs = source.GetAsyncEnumerator(); + + try + { + await xs.MoveNextAsync(); + + Assert.False(true, "Should not have gotten here"); + } + catch (InvalidOperationException) + { + // we expect this + } + finally + { + await xs.DisposeAsync(); + } + } + + [Fact] + public async Task Amb_Second_GetAsyncEnumerator_Crashes() + { + var source = AsyncEnumerableEx.Never().Amb(new FailingGetAsyncEnumerator()); + + var xs = source.GetAsyncEnumerator(); + + try + { + await xs.MoveNextAsync(); + + Assert.False(true, "Should not have gotten here"); + } + catch (InvalidOperationException) + { + // we expect this + } + finally + { + await xs.DisposeAsync(); + } + } + + [Fact] + public async Task Amb_Many_First_GetAsyncEnumerator_Crashes() + { + var source = AsyncEnumerableEx.Amb( + new FailingGetAsyncEnumerator(), + AsyncEnumerableEx.Never(), + AsyncEnumerableEx.Never() + ); + + var xs = source.GetAsyncEnumerator(); + + try + { + await xs.MoveNextAsync(); + + Assert.False(true, "Should not have gotten here"); + } + catch (InvalidOperationException) + { + // we expect this + } + finally + { + await xs.DisposeAsync(); + } + } + + [Fact] + public async Task Amb_Many_Last_GetAsyncEnumerator_Crashes() + { + var source = AsyncEnumerableEx.Amb( + AsyncEnumerableEx.Never(), + AsyncEnumerableEx.Never(), + new FailingGetAsyncEnumerator() + ); + + var xs = source.GetAsyncEnumerator(); + + try + { + await xs.MoveNextAsync(); + + Assert.False(true, "Should not have gotten here"); + } + catch (InvalidOperationException) + { + // we expect this + } + finally + { + await xs.DisposeAsync(); + } + } + + private class FailingGetAsyncEnumerator : IAsyncEnumerable + { + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + throw new InvalidOperationException(); + } + } } } diff --git a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs index a5d6df9589..cc5cf82aa2 100644 --- a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs +++ b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs @@ -32,19 +32,8 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) // i.e., see Never() // - var firstCancelToken = new CancellationTokenSource(); - var secondCancelToken = new CancellationTokenSource(); - - // - // The incoming cancellationToken should still be able to cancel both - // - - var bothRegistry = cancellationToken.Register(() => - { - firstCancelToken.Cancel(); - secondCancelToken.Cancel(); - }); - + var firstCancelToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + var secondCancelToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); try { @@ -68,16 +57,18 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) } catch { + secondCancelToken.Cancel(); + firstCancelToken.Cancel(); + // NB: AwaitMoveNextAsyncAndDispose checks for null for both arguments, reducing the need for many null // checks over here. var cleanup = new[] { - AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator), - AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator) + AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator, secondCancelToken.Token), + AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator, firstCancelToken.Token) }; - bothRegistry.Dispose(); await Task.WhenAll(cleanup).ConfigureAwait(false); @@ -105,13 +96,13 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) { winner = firstEnumerator; secondCancelToken.Cancel(); - disposeLoser = AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator); + disposeLoser = AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator, secondCancelToken.Token); } else { winner = secondEnumerator; firstCancelToken.Cancel(); - disposeLoser = AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator); + disposeLoser = AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator, firstCancelToken.Token); } try @@ -133,8 +124,6 @@ await using (winner.ConfigureAwait(false)) } finally { - bothRegistry.Dispose(); - // // REVIEW: This behavior differs from the original implementation in that we never discard any in flight // asynchronous operations. If an exception occurs while enumerating the winner, it can be @@ -171,15 +160,8 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) var individualTokenSources = new CancellationTokenSource[n]; for (var i = 0; i < n; i++) { - individualTokenSources[i] = new CancellationTokenSource(); + individualTokenSources[i] = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); } - var allIndividualDispose = cancellationToken.Register(() => - { - foreach (var tokenSource in individualTokenSources) - { - tokenSource.Cancel(); - } - }); try { @@ -195,14 +177,12 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) { var cleanup = new Task[n]; - for (var i = 0; i < n; i++) + for (var i = n - 1; i >= 0; i--) { - cleanup[i] = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i]); - individualTokenSources[i].Cancel(); - } - allIndividualDispose.Dispose(); + cleanup[i] = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i], individualTokenSources[i].Token); + } await Task.WhenAll(cleanup).ConfigureAwait(false); @@ -227,12 +207,12 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) var loserCleanupTasks = new List(n - 1); - for (var i = 0; i < n; i++) + for (var i = n - 1; i >= 0; i--) { if (i != winnerIndex) { individualTokenSources[i].Cancel(); - var loserCleanupTask = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i]); + var loserCleanupTask = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i], individualTokenSources[i].Token); loserCleanupTasks.Add(loserCleanupTask); } } @@ -258,8 +238,6 @@ await using (winner.ConfigureAwait(false)) } finally { - allIndividualDispose.Dispose(); - await cleanupLosers.ConfigureAwait(false); } } @@ -273,7 +251,7 @@ public static IAsyncEnumerable Amb(this IEnumerable(Task? moveNextAsync, IAsyncEnumerator? enumerator) + private static async Task AwaitMoveNextAsyncAndDispose(Task? moveNextAsync, IAsyncEnumerator? enumerator, CancellationToken token) { if (enumerator != null) { @@ -285,7 +263,7 @@ await using (enumerator.ConfigureAwait(false)) { await moveNextAsync.ConfigureAwait(false); } - catch (TaskCanceledException) + catch (TaskCanceledException tce) // when (tce.CancellationToken == token) { // ignored because of cancelling the non-winners } From e821150a262f51b11bb9ec0ac290f783acc64324 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 6 Jun 2019 09:38:39 +0200 Subject: [PATCH 04/13] Include the cancellationtoken in Never and Amb disposing --- .../System.Interactive.Async/System/Linq/Operators/Amb.cs | 2 +- .../System.Interactive.Async/System/Linq/Operators/Never.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs index cc5cf82aa2..32c1f0ba5a 100644 --- a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs +++ b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs @@ -263,7 +263,7 @@ await using (enumerator.ConfigureAwait(false)) { await moveNextAsync.ConfigureAwait(false); } - catch (TaskCanceledException tce) // when (tce.CancellationToken == token) + catch (TaskCanceledException tce) when (tce.CancellationToken == token) { // ignored because of cancelling the non-winners } diff --git a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Never.cs b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Never.cs index cdc81ba038..969e2177d9 100644 --- a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Never.cs +++ b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Never.cs @@ -49,7 +49,7 @@ public ValueTask MoveNextAsync() _once = true; var task = new TaskCompletionSource(); - _registration = _token.Register(state => ((TaskCompletionSource)state).SetCanceled(), task); + _registration = _token.Register(state => ((TaskCompletionSource)state).TrySetCanceled(_token), task); return new ValueTask(task.Task); } } From 8aab0f3ce4f74baea1183e2392d84d93d8dd83ba Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 6 Jun 2019 20:49:25 +0200 Subject: [PATCH 05/13] Suppress any TaskCanceledException again --- .../System/Linq/Operators/Amb.cs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs index 32c1f0ba5a..6274d8a8ad 100644 --- a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs +++ b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs @@ -65,8 +65,8 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) var cleanup = new[] { - AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator, secondCancelToken.Token), - AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator, firstCancelToken.Token) + AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator), + AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator) }; @@ -96,13 +96,13 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) { winner = firstEnumerator; secondCancelToken.Cancel(); - disposeLoser = AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator, secondCancelToken.Token); + disposeLoser = AwaitMoveNextAsyncAndDispose(secondMoveNext, secondEnumerator); } else { winner = secondEnumerator; firstCancelToken.Cancel(); - disposeLoser = AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator, firstCancelToken.Token); + disposeLoser = AwaitMoveNextAsyncAndDispose(firstMoveNext, firstEnumerator); } try @@ -181,7 +181,7 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) { individualTokenSources[i].Cancel(); - cleanup[i] = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i], individualTokenSources[i].Token); + cleanup[i] = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i]); } await Task.WhenAll(cleanup).ConfigureAwait(false); @@ -212,7 +212,7 @@ async IAsyncEnumerator Core(CancellationToken cancellationToken) if (i != winnerIndex) { individualTokenSources[i].Cancel(); - var loserCleanupTask = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i], individualTokenSources[i].Token); + var loserCleanupTask = AwaitMoveNextAsyncAndDispose(moveNexts[i], enumerators[i]); loserCleanupTasks.Add(loserCleanupTask); } } @@ -251,7 +251,7 @@ public static IAsyncEnumerable Amb(this IEnumerable(Task? moveNextAsync, IAsyncEnumerator? enumerator, CancellationToken token) + private static async Task AwaitMoveNextAsyncAndDispose(Task? moveNextAsync, IAsyncEnumerator? enumerator) { if (enumerator != null) { @@ -263,7 +263,7 @@ await using (enumerator.ConfigureAwait(false)) { await moveNextAsync.ConfigureAwait(false); } - catch (TaskCanceledException tce) when (tce.CancellationToken == token) + catch (TaskCanceledException) { // ignored because of cancelling the non-winners } From dd6adb6fd32f1de9a578614af8595193790ad6ae Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 6 Jun 2019 21:20:18 +0200 Subject: [PATCH 06/13] IxAsync.Timeout: propagate timeout cancellation to main src --- .../System/Linq/Operators/Timeout.cs | 107 ++++++++++++++++++ .../System/Linq/Operators/Timeout.cs | 9 +- 2 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Timeout.cs diff --git a/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Timeout.cs b/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Timeout.cs new file mode 100644 index 0000000000..95f5f2ab6e --- /dev/null +++ b/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Timeout.cs @@ -0,0 +1,107 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace Tests +{ + public class Timeout : AsyncEnumerableExTests + { + [Fact] + public async Task Timeout_Never() + { + var source = AsyncEnumerableEx.Never().Timeout(TimeSpan.FromMilliseconds(100)); + + var en = source.GetAsyncEnumerator(); + + try + { + await en.MoveNextAsync(); + + Assert.False(true, "MoveNextAsync should have thrown"); + } + catch (TimeoutException) + { + // expected + } + finally + { + await en.DisposeAsync(); + } + } + + [Fact] + public async Task Timeout_Delayed_Main() + { + var source = AsyncEnumerable.Range(1, 5) + .SelectAwait(async v => + { + await Task.Delay(300); + return v; + }) + .Timeout(TimeSpan.FromMilliseconds(100)); + + var en = source.GetAsyncEnumerator(); + + try + { + await en.MoveNextAsync(); + + Assert.False(true, "MoveNextAsync should have thrown"); + } + catch (TimeoutException) + { + // expected + } + finally + { + await en.DisposeAsync(); + } + } + + [Fact] + public async Task Timeout_Delayed_Main_Canceled() + { + var tcs = new TaskCompletionSource(); + + var source = AsyncEnumerable.Range(1, 5) + .SelectAwaitWithCancellation(async (v, ct) => + { + try + { + await Task.Delay(500, ct); + } + catch (TaskCanceledException) + { + tcs.SetResult(0); + } + return v; + }) + .Timeout(TimeSpan.FromMilliseconds(250)); + + var en = source.GetAsyncEnumerator(); + + try + { + await en.MoveNextAsync(); + + Assert.False(true, "MoveNextAsync should have thrown"); + } + catch (TimeoutException) + { + // expected + } + finally + { + await en.DisposeAsync(); + } + + Assert.Equal(0, await tcs.Task); + } + } +} diff --git a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Timeout.cs b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Timeout.cs index 717db0c564..03025b64cf 100644 --- a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Timeout.cs +++ b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Timeout.cs @@ -32,6 +32,8 @@ private sealed class TimeoutAsyncIterator : AsyncIterator private Task? _loserTask; + private CancellationTokenSource? _sourceCTS; + public TimeoutAsyncIterator(IAsyncEnumerable source, TimeSpan timeout) { Debug.Assert(source != null); @@ -67,7 +69,8 @@ protected override async ValueTask MoveNextCore() switch (_state) { case AsyncIteratorState.Allocated: - _enumerator = _source.GetAsyncEnumerator(_cancellationToken); + _sourceCTS = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken); + _enumerator = _source.GetAsyncEnumerator(_sourceCTS.Token); _state = AsyncIteratorState.Iterating; goto case AsyncIteratorState.Iterating; @@ -77,7 +80,7 @@ protected override async ValueTask MoveNextCore() if (!moveNext.IsCompleted) { - using var delayCts = new CancellationTokenSource(); + using var delayCts = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken); var delay = Task.Delay(_timeout, delayCts.Token); @@ -101,6 +104,8 @@ protected override async ValueTask MoveNextCore() _loserTask = next.ContinueWith((_, state) => ((IAsyncDisposable)state).DisposeAsync().AsTask(), _enumerator); + _sourceCTS!.Cancel(); + throw new TimeoutException(); } From 017bcc0869b3510369705acf58eadf96b8567cf1 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 6 Jun 2019 21:24:59 +0200 Subject: [PATCH 07/13] Null out _sourceCTS --- .../System/Linq/Operators/Timeout.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Timeout.cs b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Timeout.cs index 03025b64cf..dd97b84968 100644 --- a/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Timeout.cs +++ b/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Timeout.cs @@ -60,6 +60,11 @@ public override async ValueTask DisposeAsync() await _enumerator.DisposeAsync().ConfigureAwait(false); _enumerator = null; } + if (_sourceCTS != null) + { + _sourceCTS.Dispose(); + _sourceCTS = null; + } await base.DisposeAsync().ConfigureAwait(false); } From 47c597de2c2f8bc0ed835221175ee9967eb3f3d9 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 6 Jun 2019 21:27:19 +0200 Subject: [PATCH 08/13] Add test for double timeout, exercises the cancel paths --- .../System/Linq/Operators/Timeout.cs | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Timeout.cs b/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Timeout.cs index 95f5f2ab6e..aa2dbffa23 100644 --- a/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Timeout.cs +++ b/Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Timeout.cs @@ -35,6 +35,31 @@ public async Task Timeout_Never() } } + [Fact] + public async Task Timeout_Double_Never() + { + var source = AsyncEnumerableEx.Never() + .Timeout(TimeSpan.FromMilliseconds(300)) + .Timeout(TimeSpan.FromMilliseconds(100)); + + var en = source.GetAsyncEnumerator(); + + try + { + await en.MoveNextAsync(); + + Assert.False(true, "MoveNextAsync should have thrown"); + } + catch (TimeoutException) + { + // expected + } + finally + { + await en.DisposeAsync(); + } + } + [Fact] public async Task Timeout_Delayed_Main() { From fe6309d5ba123320a7a142f9658eb2490ca48858 Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Fri, 15 Nov 2019 05:55:16 +0000 Subject: [PATCH 09/13] Bump ApprovalTests from 4.2.2 to 4.3.0 in /Rx.NET/Source Bumps [ApprovalTests](https://github.com/approvals/ApprovalTests.Net) from 4.2.2 to 4.3.0. - [Release notes](https://github.com/approvals/ApprovalTests.Net/releases) - [Commits](https://github.com/approvals/ApprovalTests.Net/commits) Signed-off-by: dependabot-preview[bot] --- .../Tests.System.Reactive.ApiApprovals.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Tests.System.Reactive.ApiApprovals.csproj b/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Tests.System.Reactive.ApiApprovals.csproj index b1d9859757..5374d7d390 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Tests.System.Reactive.ApiApprovals.csproj +++ b/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Tests.System.Reactive.ApiApprovals.csproj @@ -31,7 +31,7 @@ - + From fa3cdc6af2f74a55001cd48bd3786d69bc186c92 Mon Sep 17 00:00:00 2001 From: Eugene Baranov <220614+eugbaranov@users.noreply.github.com> Date: Fri, 19 Jul 2019 00:49:15 +0100 Subject: [PATCH 10/13] Disposing EventLoopScheduler with in-flight items (closes #286) --- .../Concurrency/EventLoopScheduler.cs | 12 ++++++++++-- .../Tests/Concurrency/EventLoopSchedulerTest.cs | 14 ++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs index ed68a66d14..0c0e62e217 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs @@ -153,7 +153,7 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun { if (_disposed) { - throw new ObjectDisposedException(""); + throw new ObjectDisposedException(nameof(EventLoopScheduler)); } if (dueTime <= TimeSpan.Zero) @@ -351,7 +351,15 @@ private void Run() { if (!item.IsCanceled) { - item.Invoke(); + try + { + item.Invoke(); + } + catch (ObjectDisposedException ex) when (nameof(EventLoopScheduler).Equals(ex.ObjectName)) + { + // Since we are not inside the lock at this point + // the scheduler can be disposed before the item had a chance to run + } } } } diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/EventLoopSchedulerTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/EventLoopSchedulerTest.cs index 24288732ea..44a0a26159 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/EventLoopSchedulerTest.cs +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/EventLoopSchedulerTest.cs @@ -7,6 +7,7 @@ using System.Diagnostics; using System.Reactive.Concurrency; using System.Reactive.Disposables; +using System.Reactive.Linq; using System.Threading; using Microsoft.Reactive.Testing; using Xunit; @@ -41,6 +42,19 @@ public void EventLoop_Now() Assert.True(res.Seconds < 1); } + [Fact] + public void EventLoop_DisposeWithInFlightActions() + { + using (var scheduler = new EventLoopScheduler()) + using (var subscription = Observable + .Range(1, 10) + .ObserveOn(scheduler) + .Subscribe(_ => Thread.Sleep(50))) + { + Thread.Sleep(50); + } + } + [Fact] public void EventLoop_ScheduleAction() { From a28476adc29afc52184766458f85ce5ec1e8410d Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Mon, 18 Nov 2019 05:51:24 +0000 Subject: [PATCH 11/13] Bump Nerdbank.GitVersioning from 3.0.26 to 3.0.28 in /Ix.NET/Source Bumps [Nerdbank.GitVersioning](https://github.com/AArnott/Nerdbank.GitVersioning) from 3.0.26 to 3.0.28. - [Release notes](https://github.com/AArnott/Nerdbank.GitVersioning/releases) - [Commits](https://github.com/AArnott/Nerdbank.GitVersioning/commits) Signed-off-by: dependabot-preview[bot] --- Ix.NET/Source/Directory.build.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Ix.NET/Source/Directory.build.props b/Ix.NET/Source/Directory.build.props index a604effcfe..5059f8b1d4 100644 --- a/Ix.NET/Source/Directory.build.props +++ b/Ix.NET/Source/Directory.build.props @@ -23,7 +23,7 @@ - + From 36b5e495f23c669fc0fe7a055b07f894b049192e Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Mon, 18 Nov 2019 06:01:41 +0000 Subject: [PATCH 12/13] Bump Nerdbank.GitVersioning from 3.0.26 to 3.0.28 in /Rx.NET/Source Bumps [Nerdbank.GitVersioning](https://github.com/AArnott/Nerdbank.GitVersioning) from 3.0.26 to 3.0.28. - [Release notes](https://github.com/AArnott/Nerdbank.GitVersioning/releases) - [Commits](https://github.com/AArnott/Nerdbank.GitVersioning/commits) Signed-off-by: dependabot-preview[bot] --- Rx.NET/Source/Directory.build.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Rx.NET/Source/Directory.build.props b/Rx.NET/Source/Directory.build.props index 101cc3a79b..f7177a820e 100644 --- a/Rx.NET/Source/Directory.build.props +++ b/Rx.NET/Source/Directory.build.props @@ -25,7 +25,7 @@ - + From 13bc43ed35f9f351c797eee16495250adcbcf6e4 Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Tue, 19 Nov 2019 05:50:21 +0000 Subject: [PATCH 13/13] Bump ApprovalTests from 4.3.0 to 4.4.0 in /Rx.NET/Source Bumps [ApprovalTests](https://github.com/approvals/ApprovalTests.Net) from 4.3.0 to 4.4.0. - [Release notes](https://github.com/approvals/ApprovalTests.Net/releases) - [Commits](https://github.com/approvals/ApprovalTests.Net/compare/4.3.0...4.4.0) Signed-off-by: dependabot-preview[bot] --- .../Tests.System.Reactive.ApiApprovals.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Tests.System.Reactive.ApiApprovals.csproj b/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Tests.System.Reactive.ApiApprovals.csproj index 5374d7d390..ec91a0a353 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Tests.System.Reactive.ApiApprovals.csproj +++ b/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Tests.System.Reactive.ApiApprovals.csproj @@ -31,7 +31,7 @@ - +