From f1f0dec8d672b5071a4293dbe12e93ca75809118 Mon Sep 17 00:00:00 2001 From: Jens Chr Date: Wed, 20 Nov 2019 09:52:42 +0100 Subject: [PATCH 1/2] Add test for support of large enumerables --- .../System/Linq/Operators/ToObservable.cs | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs index ba6f5cc2be..796b22132a 100644 --- a/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs +++ b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs @@ -269,6 +269,34 @@ public void ToObservable_DesNotCallMoveNextAgainWhenSubscriptionIsDisposed() Assert.Equal(1, moveNextCount); Assert.False(fail); } + + [Fact] + public void ToObservable_SupportsLargeEnumerable() + { + using var evt = new ManualResetEvent(false); + + var fail = false; + + var xs = AsyncEnumerable.Range(0, 10000).ToObservable(); + xs.Subscribe(new MyObserver( + x => + { + // ok + }, + ex => + { + fail = true; + evt.Set(); + }, + () => + { + evt.Set(); + } + )); + + evt.WaitOne(); + Assert.False(fail); + } private sealed class MyObserver : IObserver { From 09033f5190119163aa187038cefb2b47cf2a68e5 Mon Sep 17 00:00:00 2001 From: Jens Chr Date: Wed, 20 Nov 2019 09:53:00 +0100 Subject: [PATCH 2/2] Replace tail recursion with while loop --- .../System/Linq/Operators/ToObservable.cs | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs index f2e74d95bd..90f4face95 100644 --- a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs +++ b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs @@ -32,39 +32,44 @@ public IDisposable Subscribe(IObserver observer) async void Core() { - bool hasNext; - try + while (true) { - hasNext = await e.MoveNextAsync().ConfigureAwait(false); - } - catch (Exception ex) - { - if (!ctd.Token.IsCancellationRequested) + bool hasNext; + try { - observer.OnError(ex); - await e.DisposeAsync().ConfigureAwait(false); + hasNext = await e.MoveNextAsync().ConfigureAwait(false); } + catch (Exception ex) + { + if (!ctd.Token.IsCancellationRequested) + { + observer.OnError(ex); + await e.DisposeAsync().ConfigureAwait(false); + } - return; - } + return; + } - if (hasNext) - { - observer.OnNext(e.Current); + if (hasNext) + { + observer.OnNext(e.Current); + + if (!ctd.Token.IsCancellationRequested) + { + continue; + } - if (!ctd.Token.IsCancellationRequested) + // In case cancellation is requested, this could only have happened + // by disposing the returned composite disposable (see below). + // In that case, e will be disposed too, so there is no need to dispose e here. + } + else { - Core(); + observer.OnCompleted(); + await e.DisposeAsync().ConfigureAwait(false); } - // In case cancellation is requested, this could only have happened - // by disposing the returned composite disposable (see below). - // In that case, e will be disposed too, so there is no need to dispose e here. - } - else - { - observer.OnCompleted(); - await e.DisposeAsync().ConfigureAwait(false); + break; } }