Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Fixes #286 - EventLoopScheduler - unexpected Exception after Dispose #288

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -82,6 +82,11 @@ public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDi
/// </summary>
private bool _disposed;

/// <summary>
/// Field to cache the unique instance id. Use <see cref="GetInstanceId"/> to obtain the instance id.
/// </summary>
private string _cachedInstanceId;

#endregion

#region Constructors
Expand Down Expand Up @@ -166,7 +171,7 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
lock (_gate)
{
if (_disposed)
throw new ObjectDisposedException("");
throw new ObjectDisposedException(GetInstanceId());

if (dueTime <= TimeSpan.Zero)
{
Expand Down Expand Up @@ -346,8 +351,23 @@ private void Run()
{
foreach (var item in ready)
{
if (!item.IsCanceled)
item.Invoke();
if (!item.IsCanceled)
{
try
{
item.Invoke();
}
catch (ObjectDisposedException ex)
{
/* Check if the scheduler has been disposed with
* non-empty working queue.
*/
if (ex.ObjectName == GetInstanceId())
break;

throw;
}
}
}
}

Expand Down Expand Up @@ -382,6 +402,18 @@ private void Tick(object state)
}
}

private string GetInstanceId() {
if (_cachedInstanceId != null)
return _cachedInstanceId;

var instance_id = GetType().Name
+ "/"
+ Guid.NewGuid();

Interlocked.CompareExchange(ref _cachedInstanceId, instance_id, null);
return _cachedInstanceId;
}

#endregion
}
}
Expand Up @@ -7,6 +7,8 @@
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using Microsoft.Reactive.Testing;
using Xunit;
Expand Down Expand Up @@ -135,6 +137,44 @@ public void EventLoop_SchedulerDisposed()
Assert.Equal(2, d);
}

[Fact]
public void EventLoop_SchedulerDisposedDuringLongRunningAction() {

var handledEvents = new List<string>();
var longRunningActionStarted = new ManualResetEvent(false);
var schedulerHasBeenDisposed = new ManualResetEvent(false);
var eventStream = new Subject<string>();
var maxWaitTime = TimeSpan.FromSeconds(5);

var subscription = Observable.Using(
resourceFactory: () => new EventLoopScheduler(),
observableFactory: scheduler => eventStream
.ObserveOn(scheduler)
.Do(@event => {
handledEvents.Add(@event);
longRunningActionStarted.Set();
schedulerHasBeenDisposed.WaitOne(maxWaitTime);

}))
.Subscribe();

eventStream.OnNext("#1 long running action");
eventStream.OnNext("#2 long running action");
longRunningActionStarted.WaitOne(maxWaitTime);

subscription.Dispose();
schedulerHasBeenDisposed.Set();

/* Wait for EventLoopScheduler's worker thread:
* without fix https://github.com/Reactive-Extensions/Rx.NET/issues/286
* this will throw an unhandled exception => test runner dies.
*/
longRunningActionStarted.WaitOne(TimeSpan.FromSeconds(1));

Assert.Contains("#1 long running action", handledEvents);
Assert.DoesNotContain("#2 long running action", handledEvents);
}

[Fact]
public void EventLoop_ScheduleTimeOrderedActions()
{
Expand Down