Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Generate (timed) crash upon disposing a long sequence #1067

Merged
merged 2 commits into from Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 33 additions & 5 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs
Expand Up @@ -195,9 +195,19 @@ public _(Absolute parent, IObserver<TResult> observer)
private bool _hasResult;
private TResult _result;

private IDisposable _timerDisposable;

public void Run(IScheduler outerScheduler, TState initialState)
{
SetUpstream(outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState)));
var timer = new SingleAssignmentDisposable();
Disposable.TrySetMultiple(ref _timerDisposable, timer);
timer.Disposable = outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState));
}

protected override void Dispose(bool disposing)
{
Disposable.TryDispose(ref _timerDisposable);
base.Dispose(disposing);
}

private IDisposable InvokeRec(IScheduler self, TState state)
Expand Down Expand Up @@ -240,7 +250,11 @@ private IDisposable InvokeRec(IScheduler self, TState state)
return Disposable.Empty;
}

return self.Schedule((@this: this, state), time, (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.state));
var timer = new SingleAssignmentDisposable();
Disposable.TrySetMultiple(ref _timerDisposable, timer);
timer.Disposable = self.Schedule((@this: this, state), time, (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.state));

return Disposable.Empty;
}
}
}
Expand Down Expand Up @@ -275,6 +289,7 @@ internal sealed class _ : IdentitySink<TResult>
private readonly Func<TState, TResult> _resultSelector;
private readonly Func<TState, TimeSpan> _timeSelector;


public _(Relative parent, IObserver<TResult> observer)
: base(observer)
{
Expand All @@ -290,9 +305,19 @@ public _(Relative parent, IObserver<TResult> observer)
private bool _hasResult;
private TResult _result;

private IDisposable _timerDisposable;

public void Run(IScheduler outerScheduler, TState initialState)
{
SetUpstream(outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState)));
var timer = new SingleAssignmentDisposable();
Disposable.TrySetMultiple(ref _timerDisposable, timer);
timer.Disposable = outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState));
}

protected override void Dispose(bool disposing)
{
Disposable.TryDispose(ref _timerDisposable);
base.Dispose(disposing);
}

private IDisposable InvokeRec(IScheduler self, TState state)
Expand Down Expand Up @@ -335,10 +360,13 @@ private IDisposable InvokeRec(IScheduler self, TState state)
return Disposable.Empty;
}

return self.Schedule((@this: this, state), time, (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.state));
var timer = new SingleAssignmentDisposable();
Disposable.TrySetMultiple(ref _timerDisposable, timer);
timer.Disposable = self.Schedule((@this: this, state), time, (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.state));

return Disposable.Empty;
}
}
}
}
}

Expand Up @@ -9,6 +9,7 @@
using System.Reactive.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Reactive.Testing;
using ReactiveTests.Dummies;
using Xunit;
Expand Down Expand Up @@ -454,6 +455,40 @@ public void Generate_DateTimeOffset_DefaultScheduler()
Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => DateTimeOffset.Now.AddMilliseconds(x)).AssertEqual(Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => DateTimeOffset.Now.AddMilliseconds(x), DefaultScheduler.Instance));
}

[Fact]
public void Generate_TimeSpan_DisposeLater()
{
var count = 0;
var d = Observable.Generate(0, i => true, i => i + 1, i => i, _ => TimeSpan.Zero)
.WithLatestFrom(Observable.Return(1).Concat(Observable.Never<int>()), (a, b) => a)
.Subscribe(v => Volatile.Write(ref count, v));

while (Volatile.Read(ref count) < 10000)
{
Thread.Sleep(10);
}

d.Dispose();
}

[Fact]
public void Generate_DateTimeOffset_DisposeLater()
{
var count = 0;

var d = Observable.Generate(0, i => true, i => i + 1, i => i, _ => DateTimeOffset.Now)
.WithLatestFrom(Observable.Return(1).Concat(Observable.Never<int>()), (a, b) => a)
.Subscribe(v => Volatile.Write(ref count, v));

while (Volatile.Read(ref count) < 10000)
{
Thread.Sleep(10);
}

d.Dispose();
}


#endregion
}
}