From 14a7043981f938fc4d38cabc4d0145b569ffe599 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 14 Nov 2019 20:16:16 +0100 Subject: [PATCH 1/3] Serialize the dispose of Zip(IEnumerable) with MoveNext/Current --- .../System.Reactive/Linq/Observable/Zip.cs | 52 ++++++--- .../Tests/Linq/Observable/ZipTest.cs | 110 ++++++++++++++++++ 2 files changed, 149 insertions(+), 13 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs index e8258aa6b6..9162fadd26 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs @@ -274,6 +274,8 @@ public _(Func resultSelector, IObserver obser _resultSelector = resultSelector; } + int _enumerationInProgress; + private IEnumerator _rightEnumerator; private static readonly IEnumerator DisposedEnumerator = MakeDisposedEnumerator(); @@ -315,37 +317,61 @@ protected override void Dispose(bool disposing) { if (disposing) { - Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose(); + if (Interlocked.Increment(ref _enumerationInProgress) == 1) + { + Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose(); + } } base.Dispose(disposing); } public override void OnNext(TFirst value) { - bool hasNext; - try + var currentEnumerator = Volatile.Read(ref _rightEnumerator); + if (currentEnumerator == DisposedEnumerator) { - hasNext = _rightEnumerator.MoveNext(); + return; } - catch (Exception ex) + if (Interlocked.Increment(ref _enumerationInProgress) != 1) { - ForwardOnError(ex); + currentEnumerator.Dispose(); return; } - - if (hasNext) + bool hasNext; + TSecond right = default; + var wasDisposed = false; + try { - TSecond right; try { - right = _rightEnumerator.Current; + hasNext = currentEnumerator.MoveNext(); + if (hasNext) + { + right = currentEnumerator.Current; + } } - catch (Exception ex) + finally { - ForwardOnError(ex); - return; + if (Interlocked.Decrement(ref _enumerationInProgress) != 0) + { + currentEnumerator.Dispose(); + wasDisposed = true; + } } + } + catch (Exception ex) + { + ForwardOnError(ex); + return; + } + if (wasDisposed) + { + return; + } + + if (hasNext) + { TResult result; try { diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/ZipTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/ZipTest.cs index e8091e3b29..beb804a51c 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/ZipTest.cs +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/ZipTest.cs @@ -3,10 +3,13 @@ // See the LICENSE file in the project root for more information. using System; +using System.Collections; using System.Collections.Generic; using System.Linq; using System.Reactive; +using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Subjects; using System.Threading; using Microsoft.Reactive.Testing; using ReactiveTests.Dummies; @@ -4054,6 +4057,113 @@ public void ZipWithEnumerable_SelectorThrows() ); } + [Fact] + public void ZipWithEnumerable_NoAsyncDisposeOnMoveNext() + { + var source = new Subject(); + + var disposable = new SingleAssignmentDisposable(); + + var other = new MoveNextDisposeDetectEnumerable(disposable, true); + + disposable.Disposable = source.Zip(other, (a, b) => a + b).Subscribe(); + + source.OnNext(1); + + Assert.True(other.IsDisposed); + Assert.False(other.DisposedWhileMoveNext); + Assert.False(other.DisposedWhileCurrent); + } + + [Fact] + public void ZipWithEnumerable_NoAsyncDisposeOnCurrent() + { + var source = new Subject(); + + var disposable = new SingleAssignmentDisposable(); + + var other = new MoveNextDisposeDetectEnumerable(disposable, false); + + disposable.Disposable = source.Zip(other, (a, b) => a + b).Subscribe(); + + source.OnNext(1); + + Assert.True(other.IsDisposed); + Assert.False(other.DisposedWhileMoveNext); + Assert.False(other.DisposedWhileCurrent); + } + + private class MoveNextDisposeDetectEnumerable : IEnumerable, IEnumerator + { + readonly IDisposable _disposable; + + readonly bool _disposeOnMoveNext; + + private bool _moveNextRunning; + + private bool _currentRunning; + + internal bool DisposedWhileMoveNext; + + internal bool DisposedWhileCurrent; + + internal bool IsDisposed; + + internal MoveNextDisposeDetectEnumerable(IDisposable disposable, bool disposeOnMoveNext) + { + _disposable = disposable; + _disposeOnMoveNext = disposeOnMoveNext; + } + public int Current + { + get + { + _currentRunning = true; + if (!_disposeOnMoveNext) + { + _disposable.Dispose(); + } + _currentRunning = false; + return 0; + } + } + + object IEnumerator.Current => Current; + + public void Dispose() + { + DisposedWhileMoveNext = _moveNextRunning; + DisposedWhileCurrent = _currentRunning; + IsDisposed = true; + } + + public IEnumerator GetEnumerator() + { + return this; + } + + public bool MoveNext() + { + _moveNextRunning = true; + if (_disposeOnMoveNext) + { + _disposable.Dispose(); + } + _moveNextRunning = false; + return true; + } + + public void Reset() + { + throw new NotSupportedException(); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return this; + } + } + private IEnumerable EnumerableNever(ManualResetEvent evt) { evt.WaitOne(); From 628cba83829706b118ff000a438a5c16df20f52c Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 14 Nov 2019 20:24:01 +0100 Subject: [PATCH 2/3] Remove unnecessary dispose call. --- Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs index 9162fadd26..c2d523b340 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs @@ -334,7 +334,6 @@ public override void OnNext(TFirst value) } if (Interlocked.Increment(ref _enumerationInProgress) != 1) { - currentEnumerator.Dispose(); return; } bool hasNext; From 87aa2b0a155c2d78bc6f9bc14c0cf6956eac94f7 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 14 Nov 2019 20:26:50 +0100 Subject: [PATCH 3/3] Clear the enumerator in OnNext --- Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs index c2d523b340..ab11ada92e 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs @@ -353,7 +353,7 @@ public override void OnNext(TFirst value) { if (Interlocked.Decrement(ref _enumerationInProgress) != 0) { - currentEnumerator.Dispose(); + Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose(); wasDisposed = true; } }