Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventLoopScheduler: unexpected Exception after Dispose #286

Closed
ghost opened this issue Oct 25, 2016 · 14 comments · Fixed by #969
Closed

EventLoopScheduler: unexpected Exception after Dispose #286

ghost opened this issue Oct 25, 2016 · 14 comments · Fixed by #969

Comments

@ghost
Copy link

ghost commented Oct 25, 2016

When calling Dispose on an EventLoopScheduler, that has at least one item in its working queue, it will throw an ObjectDisposedException. The exception is thrown from its worker thread (which gives us no chance to catch it).

Code to reproduce the problem:

class Program
{
    public static void Main() {
        var event_stream = new Subject<string>();

        var subscription = Observable.Using(
            resourceFactory:    () => new EventLoopScheduler(),
            observableFactory:  scheduler => event_stream
                .ObserveOn(scheduler)
                .Do(LongRunningAction))
            .Subscribe();

        event_stream.OnNext("event 1");

        Thread.Sleep(TimeSpan.FromSeconds(1));
        subscription.Dispose(); // Scheduler is still busy!

        Console.ReadLine();
    }
    private static void LongRunningAction(string event_text) {
        Thread.Sleep(TimeSpan.FromSeconds(2));
        Console.WriteLine(event_text);
    }
}

I've opened a question at stackoverflow (containing a work-around) some time ago. The workaround does not seem to catch all possible cases - I still get this (unhandled) exception from time to time:

System.ObjectDisposedException: Cannot access a disposed object.
   at System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
   at System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.<>c__DisplayClass50`1.<InvokeRec1>b__4e(TState state2)
   at System.Reactive.ScheduledObserver`1.Run(Object state, Action`1 recurse)
   at System.Reactive.Concurrency.Scheduler.<>c__DisplayClass50`1.<InvokeRec1>b__4d(TState state1)
   at System.Reactive.Concurrency.Scheduler.<>c__DisplayClass50`1.<>c__DisplayClass52.<InvokeRec1>b__4f(IScheduler scheduler1, TState state3)
   at System.Reactive.Concurrency.ScheduledItem`2.InvokeCore()
   at System.Reactive.Concurrency.EventLoopScheduler.Run()
   at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.ThreadHelper.ThreadStart()
@ghost ghost changed the title EventLoopScheduler: unexpected behavior after Dispose - ObjectDisposedException EventLoopScheduler: unexpected Exception after Dispose Oct 25, 2016
ghost pushed a commit to danm-de/Rx.NET that referenced this issue Oct 31, 2016
…n after Dispose

- Added unique id as objectName when creating ObjectDisposedException
- EventLoopScheduler.Run() -> catch & handle ObjectDisposedException
@HebuSan
Copy link

HebuSan commented May 1, 2018

Hi,
Suffering the same issue, for both personnal and pro (Trading application) projects.
Even after reading the Stack link provided above, I also wish it could be fixed.

Exposing a Dispose interface for such things as an EvenLoppScheduler makes sense.
No matter if the underlying thread is "supposed" to be "suspended", Dispose Method is available,
and knowing it can litteraly blow you code when you "simply" want to use it sounds weird ...

If this fix, or another, is not accepted, could you please provide an "official" way to avoid this ?

Even if I use ELS on specific places, shared for specific dispatching purpose, I know the need to properly
dispose everything - including underlying thread - can occur, especially on an application running almost 24 / hours a day.... We really would appreciate some feedbacks on this one.

Thanks for all you great work.

Update: @DanielMue : for my usage, I think I can workaround the issue by scheduling the dispose ... on the scheduler ( I am in a situation where I can be sure to stop enqueuing taks in the scheduler, and therefore ensure this dispose task will be the last one - or at least that is what I think, and could not reproduce crash by doing so -)

While updating this post, I noticed that, on stack, you also talk about: "To workaround this exception, I currently wrap the EventLoopScheduler and call scheduler.Schedule(() => scheduler.Dispose()) on rapper.Dispose()".
-> Sounds similar to what I found.

If so, can you confirm that this "workaround", no matter how weird it is, always worked for you (no more exceptions)?

Thx.

@ghost
Copy link
Author

ghost commented May 2, 2018

Hi @HebuSan,

Unfortunately it did not work in all situations. The exception was very hard to reproduce and we did no further investigation. Our projects currently use a modified EventLoopScheduler (commit 4c0f9bf).

To avoid wrong usage, we've implemented an service interface (injected):

public interface ISchedulerService {
        ... snip ...
      
        /// <summary>
        /// Constructs an observable sequence that depends on a <see cref="EventLoopScheduler"/>, whose lifetime is tied to the resulting observable sequence's lifetime.
        /// </summary>
        /// <typeparam name="T">The type of the elements in the produced sequence.</typeparam>
        /// <param name="observable_factory"></param>
        /// <exception cref="T:System.ArgumentNullException"><paramref name="observable_factory"/> is null.</exception>
        /// <returns>An observable sequence whose lifetime controls the lifetime of the dependent resource object.</returns>
        IObservable<T> UsingEventLoop<T>(Func<IScheduler, IObservable<T>> observable_factory);
}

Usage:

var externObservable = CreateAnObservable(..);

subscription = scheduler_service
                .UsingEventLoop(
                    scheduler => externObservable
                        .ObserveOn(scheduler)
			.Do(something))
		.Do(somethingElse)
                .Subscribe();

I can post the code on GitHub if needed for reference.

@HebuSan
Copy link

HebuSan commented May 2, 2018

Hi @DanielMue ,

So far, could not reproduce the crash anymore with the dispose scheduling trick, but will try to play with your fix a bit. Sounds cleaner anyway.

Thx a lot for your answer !

@Enigmativity
Copy link

This issue here is that you end up with a race condition because you are creating the EventLoopScheduler in an Observable.Using. The thing with this is that I don't see the point. The EventLoopScheduler is there to allow a bunch of threads to schedule themselves on a single thread and to ensure serial execution - it's like putting a global lock around all code run on the EventLoopScheduler. A single observable more or less does this anyway. There's little need to use an EventLoopScheduler.

Can you explain why you're trying to use it like this?

@ghost
Copy link
Author

ghost commented May 22, 2018

Hi @Enigmativity,

We start & stop a bunch of device drivers (each in its own thread) that continuously emit events. The output requires us to ensure a serial execution. The drivers MUST NOT be blocked because of further processing (back-pressure). The EventLoopScheduler helps us to decouple the post-processing from event generation.

Using this example

var outputPipe = driver1.Events
                .Merge(driver2.Events)
                .Merge(driverN.Events)
                .Select(Transform)
                .Do(..);

..it would block each driver during the execution of Transform(..).

The simple solution was to queue the events:

var outputPipe = driver1.Events
                .Merge(driver2.Events)
                .Merge(driverN.Events)
                .ObserveOn(eventLoopScheduler)
                .Select(Transform)
                .Do(..);

Do you think we can achieve the same without EventLoopScheduler?

Maybe our solution is not in an optimal way. Anyway, I think the current behavior of EventLoopScheduler.Dispose() is unexpected and unusable (see the numerous questions at stackoverflow and so on).

@Enigmativity
Copy link

You should find that both queries you've just shown me serialize the calls to Transform - that's part of the contract with Rx. Should you have a "rogue" query that breaks the contract then use the .Synchronize() operator to make it serialize.

You should only need to use an EventLoopScheduler if you have multiple queries that run independently and concurrently that you want to serialize - and then you'd share a single instance with multiple observables. However, your query shows that you're creating an EventLoopScheduler for a single query so that shouldn't be the case.

If you do need to stick with an EventLoopScheduler for whatever reason then it's a simple matter of creating the EventLoopScheduler inside a .Create and dispose of it like this:

var query = Observable.Create<int>(o =>
{
	var els = new EventLoopScheduler();
	return
		Observable
			.Return(42)
			.Finally(() => els.Schedule(() => els.Dispose()))
			.Subscribe(o);
});

query.Subscribe();

@ghost
Copy link
Author

ghost commented May 22, 2018

Hi @Enigmativity,

thanks for your answer.

Maybe I forgot to explain that all driver instances are running independently and MUST NOT block on Observable.OnNext. That was the reason for us to use EventLoopScheduler. In case there are too many events (for a short period of time), it just adds the events into a queue and schedules them for later processing. This approach avoids back-pressure (of the slow Transform call) up to the driver instances.

Sample code to explain our scenario:
https://gist.github.com/danielmue/67f070fe1e7f26fb8f670482e0997125
(please uncomment //.ObserveOn(new EventLoopScheduler()) to see the difference)

Your proposal works and is similar to the already described workaround.

Although it is really interesting to discuss different solutions - Initially I did not open the issue to find out proper usage of EventLoopScheduler :-). The issue is there because users currently get an unhandled exception 💥 after calling Dispose(). Since it implements IDisposable I would expect it to stop the queue/scheduling and cleanup resources.

@HebuSan
Copy link

HebuSan commented Jun 1, 2018

Hi @Enigmativity,

I also suffered similar issues and discovered the only way to solve it was to schedule the Dispose on the scheduler.

Using .Finally is really nice (and way easier to read). My workaround was a bit "dirtier" (Wrapping it using Dispose.Create), making the fix less "obvious" to someone reading the code.

If "it makes code work", like @DanielMue, it sounds strange to me that somehow you "have to figure out how to make the proper call to the IDisposable interface".

And to know that calling it in a different way (without scheduling it) makes everything crash.

To me, IDisposable interfaces should be working the same way all the time.

I could not test @DanielMue fix, but if it works, why not accepting it ?

@Enigmativity
Copy link

@HebuSan - When you say "To me, IDisposable interfaces should be working the same way all the time." you need to understand that when working with multiple threads and concurrency that things that otherwise should work can fail. Even value++ can fail with using multiple threads.

There are many times that you have to write your code the right way to get it to work in certain circumstances. This is no different.

@HebuSan
Copy link

HebuSan commented Jun 5, 2018

@Enigmativity,

Thx for answering.

I don't think you value++ is a good exemple at all. Comparing such a low level method, known to be defensless against multithreading issues (Interlocked.Increment can do the job), with an interface implementation sounds strange to me.

I'll try to explain it more clearly, sorry for my bad english.

Most (all in fact) of the ES logic behind the IScheduler interface is "protected" against threads.
There are gates a bit everywhere, that is perfectly normal. Nowhere it is said that developpers should handle multithreaded matters of the IScheduler implementation of ES "on their side" of the code. No. This is done internally.

When you schedule an Action with ES, you can peacefully call it knowing the implementation of ES class for this IScheduler interface will handle race conditions.

And more, the logic behind IDisposable implementation of the ES also uses the same gate, and the IDisposable logic of the ES already interacts with the IScheduler implementation of the ES. (_disposed is used a bit everywhere. So for me it tends to show there is a (normal) care for the ES implementation to handle all its logic (both froml IScheduler and IDisposable interface) in a consistent way, no matter how many threads are involved.

The fact @DanielMue could fix this so "easyly" (No offense, I mean by just by patching internally, without modifying interfaces / adding weird code) shows it can be done, and it could just be a matter of forgotten use case.

I say could because the exception throwing was there intentionaly. So IF this is done willingly, there indeed is something I don't understand. But if so, to me it is not a "simple" multi threading issue, as (IF i understand) you suggest with the value++ case.

I admit my english may not be good enough, and saying ""To me, IDisposable interfaces should be working the same way all the time." without further explanation may sound strange.

Knowing IDisposable interfaces can be used in many way with RX (within CompositeDisposable, ....) I really think it is saffer to ensure such matters are handled on implementation side of the interface, rather than having to remember that this specific implementation of IDisposable can blow my code.

I hope this (longer) answer can help either validating the PR, or at least getting a clear explanation of why this was done this way.

The stack link given above tends to proove it could be good having either this PR validated (to confirm this is a bug) or a clear answer regarding the IDisposable usage of the ES.

Because even with people like Lee Campbell in the thread, no clear answer came from the discussion (or at least none I could clearly understand).

@BADF00D
Copy link

BADF00D commented Jun 7, 2018

Maybe there are different ideas about the meaning of different objects / methods.

For me:
What is the purpose of a IScheduler?
Scheduler an action in a specific way. In case of EventloopScheduler this means, shift the action to a specific thread and processes them one by one in the order they arrived.

What the purpose of Dispose-Method?
Stop what every you are doing and clean up your resources. When I call Dispose on a RX-Stream, I expect the stream to stop immediately, When I call Dispose on a ES, I expect the loop to break immediately (or maybe right after ES finished working on the current item). But I never expect a Dispose-method the throw an Exception, because that behavior is unspecified. What's the meaning?

  • Call me again later?
  • You have the shutdown the app, because there might be some memoryleak?

Maybe @Enigmativity can tell me, if any of my expectations is wrong?

@Enigmativity
Copy link

@BADF00D - You're thinking about this too simplistically. This isn't a single thread running here. It's multi-threading and multi-threading is hard. I think this is a case where the coding of observable and schedulers don't work unless you code it correctly.

This is no different to knowing when to dispose a database connection if you've got an IQueryable<T> that has yet to execute. You just have to know the right way to do things.

@eugbaranov
Copy link
Contributor

eugbaranov commented Jul 18, 2019

I've got bitten by this unexpected behavior as well - lots of things were not getting disposed because EventLoopScheduler was blowing up.

My minimal repro:

using (var scheduler = new EventLoopScheduler())
using (var subscription = Observable.Range(1, 10).ObserveOn(scheduler).Subscribe(x =>
{
	Thread.Sleep(500);
	Console.WriteLine(x);
}))
{
	Thread.Sleep(100);
}

Blows up with ObjectDisposedException

at System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
at System.Reactive.ObserveOnObserverNew`1.DrainShortRunning(IScheduler recursiveScheduler)
at System.Reactive.Concurrency.EventLoopScheduler.Run()
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.ThreadHelper.ThreadStart()

@HebuSan
Copy link

HebuSan commented Nov 20, 2019

Thx !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants