diff --git a/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs b/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs index 1e5b1e4b44..ff52ed17ce 100644 --- a/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs +++ b/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/EventLoopScheduler.cs @@ -82,6 +82,11 @@ public sealed class EventLoopScheduler : LocalScheduler, ISchedulerPeriodic, IDi /// private bool _disposed; + /// + /// Field to cache the unique instance id. Use to obtain the instance id. + /// + private string _cachedInstanceId; + #endregion #region Constructors @@ -166,7 +171,7 @@ public override IDisposable Schedule(TState state, TimeSpan dueTime, Fun lock (_gate) { if (_disposed) - throw new ObjectDisposedException(""); + throw new ObjectDisposedException(GetInstanceId()); if (dueTime <= TimeSpan.Zero) { @@ -346,8 +351,23 @@ private void Run() { foreach (var item in ready) { - if (!item.IsCanceled) - item.Invoke(); + if (!item.IsCanceled) + { + try + { + item.Invoke(); + } + catch (ObjectDisposedException ex) + { + /* Check if the scheduler has been disposed with + * non-empty working queue. + */ + if (ex.ObjectName == GetInstanceId()) + break; + + throw; + } + } } } @@ -382,6 +402,18 @@ private void Tick(object state) } } + private string GetInstanceId() { + if (_cachedInstanceId != null) + return _cachedInstanceId; + + var instance_id = GetType().Name + + "/" + + Guid.NewGuid(); + + Interlocked.CompareExchange(ref _cachedInstanceId, instance_id, null); + return _cachedInstanceId; + } + #endregion } } \ No newline at end of file diff --git a/Rx.NET/Source/Tests.System.Reactive/Tests/Concurrency/EventLoopSchedulerTest.cs b/Rx.NET/Source/Tests.System.Reactive/Tests/Concurrency/EventLoopSchedulerTest.cs index 803deb34f1..d5dd29b715 100644 --- a/Rx.NET/Source/Tests.System.Reactive/Tests/Concurrency/EventLoopSchedulerTest.cs +++ b/Rx.NET/Source/Tests.System.Reactive/Tests/Concurrency/EventLoopSchedulerTest.cs @@ -7,6 +7,8 @@ using System.Diagnostics; using System.Reactive.Concurrency; using System.Reactive.Disposables; +using System.Reactive.Linq; +using System.Reactive.Subjects; using System.Threading; using Microsoft.Reactive.Testing; using Xunit; @@ -135,6 +137,44 @@ public void EventLoop_SchedulerDisposed() Assert.Equal(2, d); } + [Fact] + public void EventLoop_SchedulerDisposedDuringLongRunningAction() { + + var handledEvents = new List(); + var longRunningActionStarted = new ManualResetEvent(false); + var schedulerHasBeenDisposed = new ManualResetEvent(false); + var eventStream = new Subject(); + var maxWaitTime = TimeSpan.FromSeconds(5); + + var subscription = Observable.Using( + resourceFactory: () => new EventLoopScheduler(), + observableFactory: scheduler => eventStream + .ObserveOn(scheduler) + .Do(@event => { + handledEvents.Add(@event); + longRunningActionStarted.Set(); + schedulerHasBeenDisposed.WaitOne(maxWaitTime); + + })) + .Subscribe(); + + eventStream.OnNext("#1 long running action"); + eventStream.OnNext("#2 long running action"); + longRunningActionStarted.WaitOne(maxWaitTime); + + subscription.Dispose(); + schedulerHasBeenDisposed.Set(); + + /* Wait for EventLoopScheduler's worker thread: + * without fix https://github.com/Reactive-Extensions/Rx.NET/issues/286 + * this will throw an unhandled exception => test runner dies. + */ + longRunningActionStarted.WaitOne(TimeSpan.FromSeconds(1)); + + Assert.Contains("#1 long running action", handledEvents); + Assert.DoesNotContain("#2 long running action", handledEvents); + } + [Fact] public void EventLoop_ScheduleTimeOrderedActions() {