Skip to content

Commit

Permalink
Merge pull request #1079 from akarnokd/ZipWithEnumerableDisposeFix
Browse files Browse the repository at this point in the history
Serialize the dispose of Zip(IEnumerable) with MoveNext/Current
  • Loading branch information
danielcweber committed Nov 19, 2019
2 parents fc17494 + b9321cd commit c61438e
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 13 deletions.
51 changes: 38 additions & 13 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs
Expand Up @@ -274,6 +274,8 @@ public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> obser
_resultSelector = resultSelector;
}

int _enumerationInProgress;

private IEnumerator<TSecond> _rightEnumerator;

private static readonly IEnumerator<TSecond> DisposedEnumerator = MakeDisposedEnumerator();
Expand Down Expand Up @@ -315,37 +317,60 @@ protected override void Dispose(bool disposing)
{
if (disposing)
{
Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose();
if (Interlocked.Increment(ref _enumerationInProgress) == 1)
{
Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose();
}
}
base.Dispose(disposing);
}

public override void OnNext(TFirst value)
{
bool hasNext;
try
var currentEnumerator = Volatile.Read(ref _rightEnumerator);
if (currentEnumerator == DisposedEnumerator)
{
hasNext = _rightEnumerator.MoveNext();
return;
}
catch (Exception ex)
if (Interlocked.Increment(ref _enumerationInProgress) != 1)
{
ForwardOnError(ex);
return;
}

if (hasNext)
bool hasNext;
TSecond right = default;
var wasDisposed = false;
try
{
TSecond right;
try
{
right = _rightEnumerator.Current;
hasNext = currentEnumerator.MoveNext();
if (hasNext)
{
right = currentEnumerator.Current;
}
}
catch (Exception ex)
finally
{
ForwardOnError(ex);
return;
if (Interlocked.Decrement(ref _enumerationInProgress) != 0)
{
Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose();
wasDisposed = true;
}
}
}
catch (Exception ex)
{
ForwardOnError(ex);
return;
}

if (wasDisposed)
{
return;
}

if (hasNext)
{
TResult result;
try
{
Expand Down
Expand Up @@ -3,10 +3,13 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using Microsoft.Reactive.Testing;
using ReactiveTests.Dummies;
Expand Down Expand Up @@ -4054,6 +4057,113 @@ public void ZipWithEnumerable_SelectorThrows()
);
}

[Fact]
public void ZipWithEnumerable_NoAsyncDisposeOnMoveNext()
{
var source = new Subject<int>();

var disposable = new SingleAssignmentDisposable();

var other = new MoveNextDisposeDetectEnumerable(disposable, true);

disposable.Disposable = source.Zip(other, (a, b) => a + b).Subscribe();

source.OnNext(1);

Assert.True(other.IsDisposed);
Assert.False(other.DisposedWhileMoveNext);
Assert.False(other.DisposedWhileCurrent);
}

[Fact]
public void ZipWithEnumerable_NoAsyncDisposeOnCurrent()
{
var source = new Subject<int>();

var disposable = new SingleAssignmentDisposable();

var other = new MoveNextDisposeDetectEnumerable(disposable, false);

disposable.Disposable = source.Zip(other, (a, b) => a + b).Subscribe();

source.OnNext(1);

Assert.True(other.IsDisposed);
Assert.False(other.DisposedWhileMoveNext);
Assert.False(other.DisposedWhileCurrent);
}

private class MoveNextDisposeDetectEnumerable : IEnumerable<int>, IEnumerator<int>
{
readonly IDisposable _disposable;

readonly bool _disposeOnMoveNext;

private bool _moveNextRunning;

private bool _currentRunning;

internal bool DisposedWhileMoveNext;

internal bool DisposedWhileCurrent;

internal bool IsDisposed;

internal MoveNextDisposeDetectEnumerable(IDisposable disposable, bool disposeOnMoveNext)
{
_disposable = disposable;
_disposeOnMoveNext = disposeOnMoveNext;
}
public int Current
{
get
{
_currentRunning = true;
if (!_disposeOnMoveNext)
{
_disposable.Dispose();
}
_currentRunning = false;
return 0;
}
}

object IEnumerator.Current => Current;

public void Dispose()
{
DisposedWhileMoveNext = _moveNextRunning;
DisposedWhileCurrent = _currentRunning;
IsDisposed = true;
}

public IEnumerator<int> GetEnumerator()
{
return this;
}

public bool MoveNext()
{
_moveNextRunning = true;
if (_disposeOnMoveNext)
{
_disposable.Dispose();
}
_moveNextRunning = false;
return true;
}

public void Reset()
{
throw new NotSupportedException();
}

IEnumerator IEnumerable.GetEnumerator()
{
return this;
}
}

private IEnumerable<int> EnumerableNever(ManualResetEvent evt)
{
evt.WaitOne();
Expand Down

0 comments on commit c61438e

Please sign in to comment.