New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Observable.Interval().Take() hangs with ImmediateScheduler #1983
Comments
This is the limitation of the |
@akarnokd wrote:
I've got two reasons for thinking that this is an inaccurate diagnosis of this problem:
So this isn't a case of "Can we fix this?". (The answer is: "Yes we can.") The question is "should we?" In inclined to say that no, we should not fix this. I will now explain why I believe the fundamental issue here is the way in which The problem occurs because It might seem like that's an unsurmountable problem with the And we can exploit that to make the
And just to be clear, this continues to provide immediate semantics if you choose the public void Run(Periodic parent, TimeSpan dueTime)
{
//
// Optimize for the case of Observable.Interval.
//
if (dueTime == _period)
{
// Experimental hack to ensure the following:
// 1) that we are able to return an IDisposable before doing anything
// (even if the caller told us to use the ImmediateScheduler)
// so that in cases where this is nested inside subscription
// via some Producer-based downstream operator (e.g. Take),
// the downstream has a chance to call SetUpstream before we
// start our periodic timer.
// 2) that when we kick off the SchedulePeriodic against the
// ImmediateScheduler, we force it into a position where it
// has to defer the start of that timer, meaning that we're
// able to get hold of the IDisposable that will enable us
// to cancel it.
CurrentThreadScheduler.Instance.Schedule(
(This: this, parent, dueTime),
static (innerCtScheduler, p) => {
// In cases where subscription has come in through a downstream Producer-based sink
// this callback will effectively run inside Producer<TTarget, TSink>.SubscribeRaw via the
// CurrentThreadScheduler trampoline. (And if there was no trampoline already above
// us on the stack, CurrentThreadScheduler.Instance.Schedule will just invoke
// us synchronously.)
//
return p.parent._scheduler.Schedule(
p.This,
p.dueTime,
static (IScheduler innerScheduler, _ @this) =>
{
// This callback is invoked by whatever scheduler the caller specified.
// If that's the ImmediateScheduler, we don't want to call SchedulePeriodic
// from here, because it won't return until it's done, meaning we will have
// no way of obtaining an IDisposable through which to cancel it.
//
// But we can force it to defer execution of the SchedulePeriodic work item
// by doubly-nesting through the innerScheduler.
//
// If the caller specified ImmediateScheduler, the innerScheduler here will
// be an ImmediateScheduler.AsyncLockScheduler. If we schedule a work item
// through that inner scheduler from this context, it will just invoke it
// immediately (because ImmediateScheduler always tries to do that). However,
// it detects re-entrancy, and if you're already in a work item scheduled
// by that inner scheduler and you try to schedule another work item, THAT
// work item does not run immediately (because if it did, there would be
// a high risk of stack overflow in recursive scheduling scenarios).
// So in those cases, the ImmediateScheduler.AsyncLockScheduler queues up
// the work (the queuing actually happens inside AsyncLock, which also
// supplies its own trampoline to drain the queue), meaning it returns
// immediately, enabling us to get hold of an IDisposable representing
// the periodic work item.
//
// So that way, we can set it as our Upstream. And since going through
// CurrentThreadScheduler.Instance.Schedule above means that any Producer-based
// downstream will have us as an upstream, this enables auto-shutdown
// (e.g., when a Take determines it has reached the end) to work, because
// we had a chance to set up all of the necessary upstreams before beginning
// to run the period work item.
//
// We still get immediate semantics when using the ImmediateScheduler -
// a call to the downstream operator's Subscribe won't return until
// completion. But because of the deferred kick-off here, all upstreams
// are in place, meaning the auto-teardown works, and Subscribe will
// return once we are done.
return innerScheduler.Schedule(@this, static (IScheduler innerScheduler, _ @this) =>
{
// Let's just recap how we got here.
// We used CurrentThreadScheduler.Instance.Schedule to kick off the
// outermost work item, meaning that if subscription occurred
// via a Producer-based downstream, the initial Subscribe call will
// have returned all the way up to the most downstream call to
// Producer.SubscribeRaw, and the outer work item will have been
// invoked by the CurrentThreadScheduler trampoline from inside
// that SubscribeRaw. Critically, this means that the corresponding
// Sink.Run that ultimately called this Run method will have completed
// its call to SetUpstream, because this Run method has already
// returned.
// Then we used the parent._scheduler to run a work item. If the
// caller specified ImmediateScheduler, that will have executed
// our work item synchronously. That work item then called
// innerScheduler.Schedule, and again if the caller specified
// ImmediateScheduler, that will also have run synchronously,
// and that's how we end up here. Here's a simplified version
// of the call stack that led us here:
// Here
// innerScheduler.Schedule (ImmediateScheduler.AsyncLockScheduler)
// parent._scheduler.Schedule (ImmediateScheduler)
// CurrentThreadScheduler Trampoline in outermost Producer-based downstream
// Application call to Subscribe
//
// Two important points:
// 1) We're still inside the app's call to Subscribe, so this still looks
// 'immediate' to the app, which is what it asked for
// 2) The ImmediateScheduler.AsyncLockScheduler's AsyncLock is busy, so
// any further nested scheduling will be queued (but will be executed
// as soon as we return from this callback, so it still looks
// immediate to the app)
//
// The consequence of 2 is that this call to SchedulePeriodic will queue
// up its work item, enabling us to stash the IDisposable representing
// that work item as our upstream before that work item begins, making
// it possible for us to cancel the periodic work item.
// (With most schedulers, SchedulePeriodic would have returned us an
// IDisposable without these contortions. It's only the ImmediateScheduler
// that requires this level of persuasion to make a periodic work item
// cancellable.)
IDisposable d = innerScheduler.SchedulePeriodic(@this, @this._period, static @this => @this.Tick());
@this.SetUpstream(d);
return d;
});
});
});
}
else
{
SetUpstream(parent._scheduler.Schedule(this, dueTime, static (innerScheduler, @this) => @this.InvokeStart(innerScheduler)));
}
} And just to prove that it really does work, I ran the example shown at the top of this work item after making this change:
That illustrates that we still get the "immediate" semantics (the position of the |
Library version: 6.0.0
This is arguably just an unsupported scenario, since
Observable.Interval
is designed to work with schedulers suitable for time-based operation, andImmediateScheduler
is not, in general, suitable for that.However, there's a scenario that people might reasonably expect to work:
This initially seems to work, producing this output:
But it then hangs. The call to
Subscribe
never returns. And internally, the interval continues merrily ticking away, delivering notifications into aTake
operator that is no longer listening.Although the
ImmediateScheduler
is not designed for timed operations, it does still implement the time-basedIScheduler
methods, and as the output above shows, they do work. They block, not returning until the specified time elapses, because that's whatImmediateScheduler
does. (Well, aDateTimeOffset
-based scheduler for more than 10 seconds into the future is not handled immediately, but that's not relevant to this issue.) But they do call you when you asked them to.It's not totally unreasonable to expect this code to work.
Take
normally unsubscribes from its source once it has received the specified number of elements. Its source here is theInterval
, so you'd expect that to stop trying to schedule any further work, meaning theImmediateScheduler
would be able to return, since all scheduled work has completed.But it turns out that in this scenario,
Take
does not unsubscribe from its source once it has received the 5th element. And that's because of this code here:reactive/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs
Lines 134 to 147 in 5903ac6
These calls to
SetUpstream
are how theTimer
(which is whatInterval
is using internally) arranges to get shut down when its subscriber unsubscribes. Logically, it is setting itself as the upstream source forTake
in this example. Normally, whenTake
hits its count, it callsOnComplete
on its subscriber, which then triggers a tear down of the subscription, at which pointTake
disposes its upstream, which is what's supposed to notifyTake
's source that it can stop.That normally all works fine, but it doesn't work with
ImmediateScheduler
, because those calls toSetUpstream
can't happen until the call toSchedule
returns. The argument toSetUpstream
in each case here is a call to theparent._scheduler
. But if that's anImmediateScheduler
, it won't return until it's done. So those calls toSetUpstream
can't occur until after the scheduler determines that it has no more work to do, but it can't discover that there's no more work to do if the calls toSetUpstream
hasn't happened yet.On the one hand, the basic problem here is that we're using a scheduler that doesn't work in the way
Interval
requires, so this could be dismissed as a non-bug.On the other hand, this raises the question of whether a race condition exists in supported scenarios in which that
SetUpstream
might not occur before a downstream sink tries to shut down the timer, but fails to do so because it did that before the call toSchedule
returns. E.g., if we're using a timer-friendly scheduler, might it determine that it can actually execute the timed work immediately? Or more subtly, what if it schedules work on a queue, then the system bogs down, and the thread that calledSchedule
doesn't return up the stack fast enough, and the scheduled work manages to run and then tries to shut down the subscription, but the timer runs forever because the call toSetUpstream
happened after the sink we're callingSetUpstream
on has already stopped. We might be OK because it might be using a disposable that handles this race correctly, but we should at least check.So it would be worth verifying that the possible race is handled correctly with non-immediate schedulers. It's also worth reviewing the rational for the way the
SchedulePeriodic
extension method emulates periodic scheduling on schedulers that don't inherently support it—it was changed at some point in the past, and there's a lengthy and complex explanation of the thinking behind that change. But I think that fixing this for the immediate scheduler would be tricky because in the current design for periodic scheduling, the only way to cancel a periodically scheduled work item is toDispose
the object returned bySchedulePeriodic
, and that's fundamentally incompatible with immediate scheduling. This is why theImmediateScheduler
is, in general, considered unsuitable for use in timer-based sources, so this is probably just going to be resolved as "by design".The text was updated successfully, but these errors were encountered: