Skip to content

Commit

Permalink
refactor: Slight clean up of the approach, and some detailed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Aug 13, 2021
1 parent c61e57c commit 5d0e41e
Showing 1 changed file with 43 additions and 31 deletions.
74 changes: 43 additions & 31 deletions src/internal/operators/observeOn.ts
@@ -1,6 +1,4 @@
/** @prettier */
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
Expand Down Expand Up @@ -52,44 +50,58 @@ import { OperatorSubscriber } from './OperatorSubscriber';
*
* @see {@link delay}
*
* @param {SchedulerLike} scheduler Scheduler that will be used to reschedule notifications from source Observable.
* @param {number} [delay] Number of milliseconds that states with what delay every notification should be rescheduled.
* @param scheduler Scheduler that will be used to reschedule notifications from source Observable.
* @param delay Number of milliseconds that states with what delay every notification should be rescheduled.
* @return A function that returns an Observable that emits the same
* notifications as the source Observable, but with provided scheduler.
*/
export function observeOn<T>(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction<T> {
function dispatch(sub: Subscriber<any>, act: () => void) {
const context: { subscription: Subscription | null; sync: boolean } = { sync: false, subscription: null };
context.subscription = scheduler.schedule(
(state) => {
act();
const { subscription } = state;
state.subscription = null;

if (subscription) {
subscription.unsubscribe();
sub.remove(subscription);
export function observeOn<T>(scheduler: SchedulerLike, delay = 0): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
/**
* Executes work with the provided scheduler and provided delay.
* This exists primarily to manage the Actions being scheduled to make
* sure they are removed from the parent Subscription.
* Actions will be retained in memory until the parent Subscription finalizes
* because they could be rescheduled at any time. We know the
* actions in this operator are NOT going to be rescheduled, so
* we want to make sure they're removed as soon as possible.
* @param execute The work to schedule with the scheduler provided
*/
const schedule = (execute: () => void) => {
let syncUnsub = false;
const actionSubs = scheduler.schedule(() => {
if (actionSubs) {
// The action fired asynchronously, so we have a subscription
// we can unsubscribe before continuing. Unsubscription will
// remove the Action/Subscription from the parent (subscriber).
actionSubs.unsubscribe();
} else {
state.sync = true;
// The action fired synchronously, so we don't have a
// subscription we can unsubscribe just yet. Flag that
// we want to unsubscribe when we do get it.
syncUnsub = true;
}
},
delay,
context
);
// Execute the work required.
execute();
}, delay);

if (syncUnsub) {
// The action above fired synchronously, so we can tear it down.
actionSubs.unsubscribe();
} else {
// The action hasn't fired yet. It's asynchronous. So we should
// add it to our subscriber, which is the parent Subscription,
// so it is unsubscribed if our consumer unsubscribes.
subscriber.add(actionSubs);
}
};

if (!context.sync) {
sub.add(context.subscription);
} else {
context.subscription.unsubscribe();
}
}
return operate((source, subscriber) => {
source.subscribe(
new OperatorSubscriber(
subscriber,
(value) => dispatch(subscriber, () => subscriber.next(value)),
() => dispatch(subscriber, () => subscriber.complete()),
(err) => dispatch(subscriber, () => subscriber.error(err))
(value) => schedule(() => subscriber.next(value)),
() => schedule(() => subscriber.complete()),
(err) => schedule(() => subscriber.error(err))
)
);
});
Expand Down

0 comments on commit 5d0e41e

Please sign in to comment.