Skip to content

Commit

Permalink
Proposal: Fixes dotnet#286 - EventLoopScheduler - unexpected Exceptio…
Browse files Browse the repository at this point in the history
…n after Dispose

- Added unique id as objectName when creating ObjectDisposedException
- EventLoopScheduler.Run() -> catch & handle ObjectDisposedException
  • Loading branch information
Daniel Mueller committed Oct 31, 2016
1 parent 5a0f1e4 commit 4c0f9bf
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 3 deletions.
Expand Up @@ -82,6 +82,11 @@ public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDi
/// </summary>
private bool _disposed;

/// <summary>
/// Field to cache the unique instance id. Use <see cref="GetInstanceId"/> to obtain the instance id.
/// </summary>
private string _cachedInstanceId;

#endregion

#region Constructors
Expand Down Expand Up @@ -166,7 +171,7 @@ public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Fun
lock (_gate)
{
if (_disposed)
throw new ObjectDisposedException("");
throw new ObjectDisposedException(GetInstanceId());

if (dueTime <= TimeSpan.Zero)
{
Expand Down Expand Up @@ -346,8 +351,23 @@ private void Run()
{
foreach (var item in ready)
{
if (!item.IsCanceled)
item.Invoke();
if (!item.IsCanceled)
{
try
{
item.Invoke();
}
catch (ObjectDisposedException ex)
{
/* Check if the scheduler has been disposed with
* non-empty working queue.
*/
if (ex.ObjectName == GetInstanceId())
break;

throw;
}
}
}
}

Expand Down Expand Up @@ -382,6 +402,18 @@ private void Tick(object state)
}
}

private string GetInstanceId() {
if (_cachedInstanceId != null)
return _cachedInstanceId;

var instance_id = GetType().Name
+ "/"
+ Guid.NewGuid();

Interlocked.CompareExchange(ref _cachedInstanceId, instance_id, null);
return _cachedInstanceId;
}

#endregion
}
}
Expand Up @@ -7,6 +7,8 @@
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using Microsoft.Reactive.Testing;
using Xunit;
Expand Down Expand Up @@ -135,6 +137,44 @@ public void EventLoop_SchedulerDisposed()
Assert.Equal(2, d);
}

[Fact]
public void EventLoop_SchedulerDisposedDuringLongRunningAction() {

var handledEvents = new List<string>();
var longRunningActionStarted = new ManualResetEvent(false);
var schedulerHasBeenDisposed = new ManualResetEvent(false);
var eventStream = new Subject<string>();
var maxWaitTime = TimeSpan.FromSeconds(5);

var subscription = Observable.Using(
resourceFactory: () => new EventLoopScheduler(),
observableFactory: scheduler => eventStream
.ObserveOn(scheduler)
.Do(@event => {
handledEvents.Add(@event);
longRunningActionStarted.Set();
schedulerHasBeenDisposed.WaitOne(maxWaitTime);
}))
.Subscribe();

eventStream.OnNext("#1 long running action");
eventStream.OnNext("#2 long running action");
longRunningActionStarted.WaitOne(maxWaitTime);

subscription.Dispose();
schedulerHasBeenDisposed.Set();

/* Wait for EventLoopScheduler's worker thread:
* without fix https://github.com/Reactive-Extensions/Rx.NET/issues/286
* this will throw an unhandled exception => test runner dies.
*/
longRunningActionStarted.WaitOne(TimeSpan.FromSeconds(1));

Assert.Contains("#1 long running action", handledEvents);
Assert.DoesNotContain("#2 long running action", handledEvents);
}

[Fact]
public void EventLoop_ScheduleTimeOrderedActions()
{
Expand Down

0 comments on commit 4c0f9bf

Please sign in to comment.