From cdd5e2717ac35341dd89f9ed6f31395f22addc1c Mon Sep 17 00:00:00 2001 From: Peter Wehrfritz Date: Wed, 5 Jun 2019 23:58:47 +0200 Subject: [PATCH 1/6] Do not dispose the enumerator while enumerating in the ToObservable operator. --- .../System/Linq/Operators/ToObservable.cs | 40 +++++++------------ 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs index f2e74d95bd..88f2396f7b 100644 --- a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs +++ b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Collections.Generic; +using System.Threading.Tasks; namespace System.Linq { @@ -28,51 +29,40 @@ public ToObservableObservable(IAsyncEnumerable source) public IDisposable Subscribe(IObserver observer) { var ctd = new CancellationTokenDisposable(); - var e = _source.GetAsyncEnumerator(ctd.Token); async void Core() { - bool hasNext; try { - hasNext = await e.MoveNextAsync().ConfigureAwait(false); - } - catch (Exception ex) - { - if (!ctd.Token.IsCancellationRequested) + await foreach (var element in _source.WithCancellation(ctd.Token).ConfigureAwait(false)) { - observer.OnError(ex); - await e.DisposeAsync().ConfigureAwait(false); - } + observer.OnNext(element); - return; + if (ctd.Token.IsCancellationRequested) + { + return; + } + } } - - if (hasNext) + catch (Exception error) { - observer.OnNext(e.Current); - if (!ctd.Token.IsCancellationRequested) { - Core(); + observer.OnError(error); } - - // In case cancellation is requested, this could only have happened - // by disposing the returned composite disposable (see below). - // In that case, e will be disposed too, so there is no need to dispose e here. + return; } - else + + if (!ctd.Token.IsCancellationRequested) { observer.OnCompleted(); - await e.DisposeAsync().ConfigureAwait(false); } } + // Fire and forget Core(); - // REVIEW: Safety of concurrent dispose operation; fire-and-forget nature of dispose? - - return Disposable.Create(ctd, Disposable.Create(() => { e.DisposeAsync(); })); + return ctd; } } } From de239616ff3d74d3a0985d72509cf6b0f65e50b8 Mon Sep 17 00:00:00 2001 From: Peter Wehrfritz Date: Thu, 6 Jun 2019 08:27:21 +0200 Subject: [PATCH 2/6] A failing observer shall never be given its own exception. --- .../System/Linq/Operators/ToObservable.cs | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs index 88f2396f7b..1a70f43911 100644 --- a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs +++ b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information. using System.Collections.Generic; -using System.Threading.Tasks; namespace System.Linq { @@ -32,30 +31,35 @@ public IDisposable Subscribe(IObserver observer) async void Core() { - try + // REVIEW: fire-and-forget DisposeAsync? + await using (var e = _source.GetAsyncEnumerator(ctd.Token)) { - await foreach (var element in _source.WithCancellation(ctd.Token).ConfigureAwait(false)) + do { - observer.OnNext(element); + bool hasNext; + try + { + hasNext = await e.MoveNextAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + if (!ctd.Token.IsCancellationRequested) + { + observer.OnError(ex); + } - if (ctd.Token.IsCancellationRequested) + return; + } + + if (!hasNext) { + observer.OnCompleted(); return; } - } - } - catch (Exception error) - { - if (!ctd.Token.IsCancellationRequested) - { - observer.OnError(error); - } - return; - } - if (!ctd.Token.IsCancellationRequested) - { - observer.OnCompleted(); + observer.OnNext(e.Current); + } + while (!ctd.Token.IsCancellationRequested); } } From 7b146d4d0a8e4981b783dc04123afad06f7de73a Mon Sep 17 00:00:00 2001 From: Peter Wehrfritz Date: Thu, 6 Jun 2019 21:17:16 +0200 Subject: [PATCH 3/6] Add a throw on Current test for ToObservable --- .../System/Linq/Operators/ToObservable.cs | 46 ++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs index 27dc41a24f..966a23e1d2 100644 --- a/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs +++ b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs @@ -105,7 +105,7 @@ public void ToObservable3() } [Fact] - public void ToObservable4() + public void ToObservable_ThrowOnMoveNext() { var ex1 = new Exception("Bang!"); var ex_ = default(Exception); @@ -135,6 +135,37 @@ public void ToObservable4() Assert.Equal(ex1, ex_); } + [Fact] + public void ToObservable_ThrowOnCurrent() + { + var ex1 = new Exception("Bang!"); + var ex_ = default(Exception); + var fail = false; + + var ae = AsyncEnumerable.Create( + _ => new ThrowOnCurrentAsyncEnumerator(ex1) + ); + + ae.ToObservable() + .Subscribe(new MyObserver( + x => + { + fail = true; + }, + ex => + { + ex_ = ex; + }, + () => + { + fail = true; + } + )); + + Assert.False(fail); + Assert.Equal(ex1, ex_); + } + [Fact] public void ToObservable_DisposesEnumeratorOnCompletion() { @@ -280,5 +311,18 @@ public MyObserver(Action onNext, Action onError, Action onComplete public void OnNext(T value) => _onNext(value); } + + private sealed class ThrowOnCurrentAsyncEnumerator : IAsyncEnumerator + { + readonly private Exception _exception; + public ThrowOnCurrentAsyncEnumerator(Exception ex) + { + _exception = ex; + } + + public int Current => throw _exception; + public ValueTask DisposeAsync() => default; + public ValueTask MoveNextAsync() => new ValueTask(true); + } } } From 8fea9a888f0f4026268bb37bf188a8fbfa792af7 Mon Sep 17 00:00:00 2001 From: Peter Wehrfritz Date: Thu, 6 Jun 2019 21:21:09 +0200 Subject: [PATCH 4/6] Forward an exception thrown by Current to the observer --- .../System/Linq/Operators/ToObservable.cs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs index 1a70f43911..cab4cd42ac 100644 --- a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs +++ b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs @@ -31,7 +31,6 @@ public IDisposable Subscribe(IObserver observer) async void Core() { - // REVIEW: fire-and-forget DisposeAsync? await using (var e = _source.GetAsyncEnumerator(ctd.Token)) { do @@ -57,7 +56,17 @@ await using (var e = _source.GetAsyncEnumerator(ctd.Token)) return; } - observer.OnNext(e.Current); + T v; + try + { + v= e.Current; + } + catch (Exception ex) + { + observer.OnError(ex); + return; + } + observer.OnNext(v); } while (!ctd.Token.IsCancellationRequested); } From f1e9933715143b57714ddabaf3ac51454fc719d2 Mon Sep 17 00:00:00 2001 From: Peter Wehrfritz Date: Thu, 6 Jun 2019 22:49:10 +0200 Subject: [PATCH 5/6] Use a combined `try ... catch` for MoveNext & Current --- .../System/Linq/Operators/ToObservable.cs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs index cab4cd42ac..9ea713e281 100644 --- a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs +++ b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs @@ -36,9 +36,15 @@ await using (var e = _source.GetAsyncEnumerator(ctd.Token)) do { bool hasNext; + var value = default(T)!; + try { hasNext = await e.MoveNextAsync().ConfigureAwait(false); + if (hasNext) + { + value = e.Current; + } } catch (Exception ex) { @@ -56,17 +62,7 @@ await using (var e = _source.GetAsyncEnumerator(ctd.Token)) return; } - T v; - try - { - v= e.Current; - } - catch (Exception ex) - { - observer.OnError(ex); - return; - } - observer.OnNext(v); + observer.OnNext(value); } while (!ctd.Token.IsCancellationRequested); } From 2709284b554d1ffdfdd6b3c0d92e49ef1740651f Mon Sep 17 00:00:00 2001 From: Jens Chr Date: Wed, 20 Nov 2019 09:52:42 +0100 Subject: [PATCH 6/6] Add test for support of large enumerables --- .../System/Linq/Operators/ToObservable.cs | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs index ce0701d91e..d8d20d6e04 100644 --- a/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs +++ b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs @@ -300,6 +300,34 @@ public void ToObservable_DesNotCallMoveNextAgainWhenSubscriptionIsDisposed() Assert.Equal(1, moveNextCount); Assert.False(fail); } + + [Fact] + public void ToObservable_SupportsLargeEnumerable() + { + using var evt = new ManualResetEvent(false); + + var fail = false; + + var xs = AsyncEnumerable.Range(0, 10000).ToObservable(); + xs.Subscribe(new MyObserver( + x => + { + // ok + }, + ex => + { + fail = true; + evt.Set(); + }, + () => + { + evt.Set(); + } + )); + + evt.WaitOne(); + Assert.False(fail); + } private sealed class MyObserver : IObserver {