diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs index 438d8f6eae..80c3dced10 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs @@ -195,9 +195,19 @@ public _(Absolute parent, IObserver observer) private bool _hasResult; private TResult _result; + private IDisposable _timerDisposable; + public void Run(IScheduler outerScheduler, TState initialState) { - SetUpstream(outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState))); + var timer = new SingleAssignmentDisposable(); + Disposable.TrySetMultiple(ref _timerDisposable, timer); + timer.Disposable = outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState)); + } + + protected override void Dispose(bool disposing) + { + Disposable.TryDispose(ref _timerDisposable); + base.Dispose(disposing); } private IDisposable InvokeRec(IScheduler self, TState state) @@ -240,7 +250,11 @@ private IDisposable InvokeRec(IScheduler self, TState state) return Disposable.Empty; } - return self.Schedule((@this: this, state), time, (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.state)); + var timer = new SingleAssignmentDisposable(); + Disposable.TrySetMultiple(ref _timerDisposable, timer); + timer.Disposable = self.Schedule((@this: this, state), time, (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.state)); + + return Disposable.Empty; } } } @@ -275,6 +289,7 @@ internal sealed class _ : IdentitySink private readonly Func _resultSelector; private readonly Func _timeSelector; + public _(Relative parent, IObserver observer) : base(observer) { @@ -290,9 +305,19 @@ public _(Relative parent, IObserver observer) private bool _hasResult; private TResult _result; + private IDisposable _timerDisposable; + public void Run(IScheduler outerScheduler, TState initialState) { - SetUpstream(outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState))); + var timer = new SingleAssignmentDisposable(); + Disposable.TrySetMultiple(ref _timerDisposable, timer); + timer.Disposable = outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState)); + } + + protected override void Dispose(bool disposing) + { + Disposable.TryDispose(ref _timerDisposable); + base.Dispose(disposing); } private IDisposable InvokeRec(IScheduler self, TState state) @@ -335,10 +360,13 @@ private IDisposable InvokeRec(IScheduler self, TState state) return Disposable.Empty; } - return self.Schedule((@this: this, state), time, (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.state)); + var timer = new SingleAssignmentDisposable(); + Disposable.TrySetMultiple(ref _timerDisposable, timer); + timer.Disposable = self.Schedule((@this: this, state), time, (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.state)); + + return Disposable.Empty; } } } } } - diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/GenerateTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/GenerateTest.cs index b575d5d728..e1b44478bd 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/GenerateTest.cs +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/GenerateTest.cs @@ -9,6 +9,7 @@ using System.Reactive.Linq; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.Tasks; using Microsoft.Reactive.Testing; using ReactiveTests.Dummies; using Xunit; @@ -454,6 +455,40 @@ public void Generate_DateTimeOffset_DefaultScheduler() Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => DateTimeOffset.Now.AddMilliseconds(x)).AssertEqual(Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => DateTimeOffset.Now.AddMilliseconds(x), DefaultScheduler.Instance)); } + [Fact] + public void Generate_TimeSpan_DisposeLater() + { + var count = 0; + var d = Observable.Generate(0, i => true, i => i + 1, i => i, _ => TimeSpan.Zero) + .WithLatestFrom(Observable.Return(1).Concat(Observable.Never()), (a, b) => a) + .Subscribe(v => Volatile.Write(ref count, v)); + + while (Volatile.Read(ref count) < 10000) + { + Thread.Sleep(10); + } + + d.Dispose(); + } + + [Fact] + public void Generate_DateTimeOffset_DisposeLater() + { + var count = 0; + + var d = Observable.Generate(0, i => true, i => i + 1, i => i, _ => DateTimeOffset.Now) + .WithLatestFrom(Observable.Return(1).Concat(Observable.Never()), (a, b) => a) + .Subscribe(v => Volatile.Write(ref count, v)); + + while (Volatile.Read(ref count) < 10000) + { + Thread.Sleep(10); + } + + d.Dispose(); + } + + #endregion } }