diff --git a/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs index ba6f5cc2be..d8d20d6e04 100644 --- a/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs +++ b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs @@ -108,7 +108,7 @@ public void ToObservable3() } [Fact] - public void ToObservable4() + public void ToObservable_ThrowOnMoveNext() { using var evt = new ManualResetEvent(false); @@ -139,6 +139,37 @@ public void ToObservable4() Assert.Equal(ex1, ex_); } + [Fact] + public void ToObservable_ThrowOnCurrent() + { + var ex1 = new Exception("Bang!"); + var ex_ = default(Exception); + var fail = false; + + var ae = AsyncEnumerable.Create( + _ => new ThrowOnCurrentAsyncEnumerator(ex1) + ); + + ae.ToObservable() + .Subscribe(new MyObserver( + x => + { + fail = true; + }, + ex => + { + ex_ = ex; + }, + () => + { + fail = true; + } + )); + + Assert.False(fail); + Assert.Equal(ex1, ex_); + } + [Fact] public void ToObservable_DisposesEnumeratorOnCompletion() { @@ -269,6 +300,34 @@ public void ToObservable_DesNotCallMoveNextAgainWhenSubscriptionIsDisposed() Assert.Equal(1, moveNextCount); Assert.False(fail); } + + [Fact] + public void ToObservable_SupportsLargeEnumerable() + { + using var evt = new ManualResetEvent(false); + + var fail = false; + + var xs = AsyncEnumerable.Range(0, 10000).ToObservable(); + xs.Subscribe(new MyObserver( + x => + { + // ok + }, + ex => + { + fail = true; + evt.Set(); + }, + () => + { + evt.Set(); + } + )); + + evt.WaitOne(); + Assert.False(fail); + } private sealed class MyObserver : IObserver { @@ -289,5 +348,18 @@ public MyObserver(Action onNext, Action onError, Action onComplete public void OnNext(T value) => _onNext(value); } + + private sealed class ThrowOnCurrentAsyncEnumerator : IAsyncEnumerator + { + readonly private Exception _exception; + public ThrowOnCurrentAsyncEnumerator(Exception ex) + { + _exception = ex; + } + + public int Current => throw _exception; + public ValueTask DisposeAsync() => default; + public ValueTask MoveNextAsync() => new ValueTask(true); + } } } diff --git a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs index f2e74d95bd..9ea713e281 100644 --- a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs +++ b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs @@ -28,51 +28,50 @@ public ToObservableObservable(IAsyncEnumerable source) public IDisposable Subscribe(IObserver observer) { var ctd = new CancellationTokenDisposable(); - var e = _source.GetAsyncEnumerator(ctd.Token); async void Core() { - bool hasNext; - try + await using (var e = _source.GetAsyncEnumerator(ctd.Token)) { - hasNext = await e.MoveNextAsync().ConfigureAwait(false); - } - catch (Exception ex) - { - if (!ctd.Token.IsCancellationRequested) + do { - observer.OnError(ex); - await e.DisposeAsync().ConfigureAwait(false); - } + bool hasNext; + var value = default(T)!; - return; - } + try + { + hasNext = await e.MoveNextAsync().ConfigureAwait(false); + if (hasNext) + { + value = e.Current; + } + } + catch (Exception ex) + { + if (!ctd.Token.IsCancellationRequested) + { + observer.OnError(ex); + } - if (hasNext) - { - observer.OnNext(e.Current); + return; + } - if (!ctd.Token.IsCancellationRequested) - { - Core(); - } + if (!hasNext) + { + observer.OnCompleted(); + return; + } - // In case cancellation is requested, this could only have happened - // by disposing the returned composite disposable (see below). - // In that case, e will be disposed too, so there is no need to dispose e here. - } - else - { - observer.OnCompleted(); - await e.DisposeAsync().ConfigureAwait(false); + observer.OnNext(value); + } + while (!ctd.Token.IsCancellationRequested); } } + // Fire and forget Core(); - // REVIEW: Safety of concurrent dispose operation; fire-and-forget nature of dispose? - - return Disposable.Create(ctd, Disposable.Create(() => { e.DisposeAsync(); })); + return ctd; } } }