diff --git a/src/internal/operators/observeOn.ts b/src/internal/operators/observeOn.ts index 0eeb52becc..049c8b0326 100644 --- a/src/internal/operators/observeOn.ts +++ b/src/internal/operators/observeOn.ts @@ -1,6 +1,4 @@ /** @prettier */ -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, SchedulerLike } from '../types'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; @@ -52,44 +50,58 @@ import { OperatorSubscriber } from './OperatorSubscriber'; * * @see {@link delay} * - * @param {SchedulerLike} scheduler Scheduler that will be used to reschedule notifications from source Observable. - * @param {number} [delay] Number of milliseconds that states with what delay every notification should be rescheduled. + * @param scheduler Scheduler that will be used to reschedule notifications from source Observable. + * @param delay Number of milliseconds that states with what delay every notification should be rescheduled. * @return A function that returns an Observable that emits the same * notifications as the source Observable, but with provided scheduler. */ -export function observeOn(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction { - function dispatch(sub: Subscriber, act: () => void) { - const context: { subscription: Subscription | null; sync: boolean } = { sync: false, subscription: null }; - context.subscription = scheduler.schedule( - (state) => { - act(); - const { subscription } = state; - state.subscription = null; - - if (subscription) { - subscription.unsubscribe(); - sub.remove(subscription); +export function observeOn(scheduler: SchedulerLike, delay = 0): MonoTypeOperatorFunction { + return operate((source, subscriber) => { + /** + * Executes work with the provided scheduler and provided delay. + * This exists primarily to manage the Actions being scheduled to make + * sure they are removed from the parent Subscription. + * Actions will be retained in memory until the parent Subscription finalizes + * because they could be rescheduled at any time. We know the + * actions in this operator are NOT going to be rescheduled, so + * we want to make sure they're removed as soon as possible. + * @param execute The work to schedule with the scheduler provided + */ + const schedule = (execute: () => void) => { + let syncUnsub = false; + const actionSubs = scheduler.schedule(() => { + if (actionSubs) { + // The action fired asynchronously, so we have a subscription + // we can unsubscribe before continuing. Unsubscription will + // remove the Action/Subscription from the parent (subscriber). + actionSubs.unsubscribe(); } else { - state.sync = true; + // The action fired synchronously, so we don't have a + // subscription we can unsubscribe just yet. Flag that + // we want to unsubscribe when we do get it. + syncUnsub = true; } - }, - delay, - context - ); + // Execute the work required. + execute(); + }, delay); + + if (syncUnsub) { + // The action above fired synchronously, so we can tear it down. + actionSubs.unsubscribe(); + } else { + // The action hasn't fired yet. It's asynchronous. So we should + // add it to our subscriber, which is the parent Subscription, + // so it is unsubscribed if our consumer unsubscribes. + subscriber.add(actionSubs); + } + }; - if (!context.sync) { - sub.add(context.subscription); - } else { - context.subscription.unsubscribe(); - } - } - return operate((source, subscriber) => { source.subscribe( new OperatorSubscriber( subscriber, - (value) => dispatch(subscriber, () => subscriber.next(value)), - () => dispatch(subscriber, () => subscriber.complete()), - (err) => dispatch(subscriber, () => subscriber.error(err)) + (value) => schedule(() => subscriber.next(value)), + () => schedule(() => subscriber.complete()), + (err) => schedule(() => subscriber.error(err)) ) ); });