Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize the dispose of Zip(IEnumerable) with MoveNext/Current #1079

Merged
merged 4 commits into from Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 38 additions & 13 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs
Expand Up @@ -274,6 +274,8 @@ public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> obser
_resultSelector = resultSelector;
}

int _enumerationInProgress;

private IEnumerator<TSecond> _rightEnumerator;

private static readonly IEnumerator<TSecond> DisposedEnumerator = MakeDisposedEnumerator();
Expand Down Expand Up @@ -315,37 +317,60 @@ protected override void Dispose(bool disposing)
{
if (disposing)
{
Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose();
if (Interlocked.Increment(ref _enumerationInProgress) == 1)
{
Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose();
}
}
base.Dispose(disposing);
}

public override void OnNext(TFirst value)
{
bool hasNext;
try
var currentEnumerator = Volatile.Read(ref _rightEnumerator);
if (currentEnumerator == DisposedEnumerator)
{
hasNext = _rightEnumerator.MoveNext();
return;
}
catch (Exception ex)
if (Interlocked.Increment(ref _enumerationInProgress) != 1)
{
ForwardOnError(ex);
return;
}

if (hasNext)
bool hasNext;
TSecond right = default;
var wasDisposed = false;
try
{
TSecond right;
try
{
right = _rightEnumerator.Current;
hasNext = currentEnumerator.MoveNext();
if (hasNext)
{
right = currentEnumerator.Current;
}
}
catch (Exception ex)
finally
{
ForwardOnError(ex);
return;
if (Interlocked.Decrement(ref _enumerationInProgress) != 0)
{
Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose();
wasDisposed = true;
}
}
}
catch (Exception ex)
{
ForwardOnError(ex);
return;
}

if (wasDisposed)
{
return;
}

if (hasNext)
{
TResult result;
try
{
Expand Down
Expand Up @@ -3,10 +3,13 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using Microsoft.Reactive.Testing;
using ReactiveTests.Dummies;
Expand Down Expand Up @@ -4054,6 +4057,113 @@ public void ZipWithEnumerable_SelectorThrows()
);
}

[Fact]
public void ZipWithEnumerable_NoAsyncDisposeOnMoveNext()
{
var source = new Subject<int>();

var disposable = new SingleAssignmentDisposable();

var other = new MoveNextDisposeDetectEnumerable(disposable, true);

disposable.Disposable = source.Zip(other, (a, b) => a + b).Subscribe();

source.OnNext(1);

Assert.True(other.IsDisposed);
Assert.False(other.DisposedWhileMoveNext);
Assert.False(other.DisposedWhileCurrent);
}

[Fact]
public void ZipWithEnumerable_NoAsyncDisposeOnCurrent()
{
var source = new Subject<int>();

var disposable = new SingleAssignmentDisposable();

var other = new MoveNextDisposeDetectEnumerable(disposable, false);

disposable.Disposable = source.Zip(other, (a, b) => a + b).Subscribe();

source.OnNext(1);

Assert.True(other.IsDisposed);
Assert.False(other.DisposedWhileMoveNext);
Assert.False(other.DisposedWhileCurrent);
}

private class MoveNextDisposeDetectEnumerable : IEnumerable<int>, IEnumerator<int>
{
readonly IDisposable _disposable;

readonly bool _disposeOnMoveNext;

private bool _moveNextRunning;

private bool _currentRunning;

internal bool DisposedWhileMoveNext;

internal bool DisposedWhileCurrent;

internal bool IsDisposed;

internal MoveNextDisposeDetectEnumerable(IDisposable disposable, bool disposeOnMoveNext)
{
_disposable = disposable;
_disposeOnMoveNext = disposeOnMoveNext;
}
public int Current
{
get
{
_currentRunning = true;
if (!_disposeOnMoveNext)
{
_disposable.Dispose();
}
_currentRunning = false;
return 0;
}
}

object IEnumerator.Current => Current;

public void Dispose()
{
DisposedWhileMoveNext = _moveNextRunning;
DisposedWhileCurrent = _currentRunning;
IsDisposed = true;
}

public IEnumerator<int> GetEnumerator()
{
return this;
}

public bool MoveNext()
{
_moveNextRunning = true;
if (_disposeOnMoveNext)
{
_disposable.Dispose();
}
_moveNextRunning = false;
return true;
}

public void Reset()
{
throw new NotSupportedException();
}

IEnumerator IEnumerable.GetEnumerator()
{
return this;
}
}

private IEnumerable<int> EnumerableNever(ManualResetEvent evt)
{
evt.WaitOne();
Expand Down