Skip to content

Commit

Permalink
Merge pull request #915 from quinmars/fix-contract-in-ToObservable
Browse files Browse the repository at this point in the history
Ix: Do not dispose the enumerator while enumerating in the ToObservable operator.
  • Loading branch information
danielcweber committed Nov 20, 2019
2 parents c61438e + fa33d38 commit 46d757b
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 32 deletions.
Expand Up @@ -108,7 +108,7 @@ public void ToObservable3()
}

[Fact]
public void ToObservable4()
public void ToObservable_ThrowOnMoveNext()
{
using var evt = new ManualResetEvent(false);

Expand Down Expand Up @@ -139,6 +139,37 @@ public void ToObservable4()
Assert.Equal(ex1, ex_);
}

[Fact]
public void ToObservable_ThrowOnCurrent()
{
var ex1 = new Exception("Bang!");
var ex_ = default(Exception);
var fail = false;

var ae = AsyncEnumerable.Create(
_ => new ThrowOnCurrentAsyncEnumerator(ex1)
);

ae.ToObservable()
.Subscribe(new MyObserver<int>(
x =>
{
fail = true;
},
ex =>
{
ex_ = ex;
},
() =>
{
fail = true;
}
));

Assert.False(fail);
Assert.Equal(ex1, ex_);
}

[Fact]
public void ToObservable_DisposesEnumeratorOnCompletion()
{
Expand Down Expand Up @@ -269,6 +300,34 @@ public void ToObservable_DesNotCallMoveNextAgainWhenSubscriptionIsDisposed()
Assert.Equal(1, moveNextCount);
Assert.False(fail);
}

[Fact]
public void ToObservable_SupportsLargeEnumerable()
{
using var evt = new ManualResetEvent(false);

var fail = false;

var xs = AsyncEnumerable.Range(0, 10000).ToObservable();
xs.Subscribe(new MyObserver<int>(
x =>
{
// ok
},
ex =>
{
fail = true;
evt.Set();
},
() =>
{
evt.Set();
}
));

evt.WaitOne();
Assert.False(fail);
}

private sealed class MyObserver<T> : IObserver<T>
{
Expand All @@ -289,5 +348,18 @@ public MyObserver(Action<T> onNext, Action<Exception> onError, Action onComplete

public void OnNext(T value) => _onNext(value);
}

private sealed class ThrowOnCurrentAsyncEnumerator : IAsyncEnumerator<int>
{
readonly private Exception _exception;
public ThrowOnCurrentAsyncEnumerator(Exception ex)
{
_exception = ex;
}

public int Current => throw _exception;
public ValueTask DisposeAsync() => default;
public ValueTask<bool> MoveNextAsync() => new ValueTask<bool>(true);
}
}
}
Expand Up @@ -28,51 +28,50 @@ public ToObservableObservable(IAsyncEnumerable<T> source)
public IDisposable Subscribe(IObserver<T> observer)
{
var ctd = new CancellationTokenDisposable();
var e = _source.GetAsyncEnumerator(ctd.Token);

async void Core()
{
bool hasNext;
try
await using (var e = _source.GetAsyncEnumerator(ctd.Token))
{
hasNext = await e.MoveNextAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
if (!ctd.Token.IsCancellationRequested)
do
{
observer.OnError(ex);
await e.DisposeAsync().ConfigureAwait(false);
}
bool hasNext;
var value = default(T)!;

return;
}
try
{
hasNext = await e.MoveNextAsync().ConfigureAwait(false);
if (hasNext)
{
value = e.Current;
}
}
catch (Exception ex)
{
if (!ctd.Token.IsCancellationRequested)
{
observer.OnError(ex);
}

if (hasNext)
{
observer.OnNext(e.Current);
return;
}

if (!ctd.Token.IsCancellationRequested)
{
Core();
}
if (!hasNext)
{
observer.OnCompleted();
return;
}

// In case cancellation is requested, this could only have happened
// by disposing the returned composite disposable (see below).
// In that case, e will be disposed too, so there is no need to dispose e here.
}
else
{
observer.OnCompleted();
await e.DisposeAsync().ConfigureAwait(false);
observer.OnNext(value);
}
while (!ctd.Token.IsCancellationRequested);
}
}

// Fire and forget
Core();

// REVIEW: Safety of concurrent dispose operation; fire-and-forget nature of dispose?

return Disposable.Create(ctd, Disposable.Create(() => { e.DisposeAsync(); }));
return ctd;
}
}
}
Expand Down

0 comments on commit 46d757b

Please sign in to comment.