diff --git a/src/internal/observable/combineLatest.ts b/src/internal/observable/combineLatest.ts index c573f02c8e4..b207405fe9e 100644 --- a/src/internal/observable/combineLatest.ts +++ b/src/internal/observable/combineLatest.ts @@ -10,6 +10,7 @@ import { popResultSelector, popScheduler } from '../util/args'; import { createObject } from '../util/createObject'; import { OperatorSubscriber } from '../operators/OperatorSubscriber'; import { AnyCatcher } from '../AnyCatcher'; +import { executeSchedule } from '../util/executeSchedule'; // combineLatest(any) // We put this first because we need to catch cases where the user has supplied @@ -293,7 +294,7 @@ export function combineLatestInit( */ function maybeSchedule(scheduler: SchedulerLike | undefined, execute: () => void, subscription: Subscription) { if (scheduler) { - subscription.add(scheduler.schedule(execute)); + executeSchedule(subscription, scheduler, execute); } else { execute(); } diff --git a/src/internal/operators/bufferTime.ts b/src/internal/operators/bufferTime.ts index f7af350fc49..cab6dd2b21e 100644 --- a/src/internal/operators/bufferTime.ts +++ b/src/internal/operators/bufferTime.ts @@ -5,6 +5,7 @@ import { OperatorSubscriber } from './OperatorSubscriber'; import { arrRemove } from '../util/arrRemove'; import { asyncScheduler } from '../scheduler/async'; import { popScheduler } from '../util/args'; +import { executeSchedule } from '../util/executeSchedule'; /* tslint:disable:max-line-length */ export function bufferTime(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction; @@ -117,21 +118,18 @@ export function bufferTime(bufferTimeSpan: number, ...otherArgs: any[]): Oper subs, }; bufferRecords.push(record); - subs.add(scheduler.schedule(() => emit(record), bufferTimeSpan)); + executeSchedule(subs, scheduler, () => emit(record), bufferTimeSpan); } }; - bufferCreationInterval !== null && bufferCreationInterval >= 0 - ? // The user passed both a bufferTimeSpan (required), and a creation interval - // That means we need to start new buffers on the interval, and those buffers need - // to wait the required time span before emitting. - subscriber.add( - scheduler.schedule(function () { - startBuffer(); - !this.closed && subscriber.add(this.schedule(null, bufferCreationInterval)); - }, bufferCreationInterval) - ) - : (restartOnEmit = true); + if (bufferCreationInterval !== null && bufferCreationInterval >= 0) { + // The user passed both a bufferTimeSpan (required), and a creation interval + // That means we need to start new buffers on the interval, and those buffers need + // to wait the required time span before emitting. + executeSchedule(subscriber, scheduler, startBuffer, bufferCreationInterval, true); + } else { + restartOnEmit = true; + } startBuffer(); diff --git a/src/internal/operators/mergeInternals.ts b/src/internal/operators/mergeInternals.ts index fc7d1a613e7..be995949389 100644 --- a/src/internal/operators/mergeInternals.ts +++ b/src/internal/operators/mergeInternals.ts @@ -2,6 +2,7 @@ import { Observable } from '../Observable'; import { innerFrom } from '../observable/from'; import { Subscriber } from '../Subscriber'; import { ObservableInput, SchedulerLike } from '../types'; +import { executeSchedule } from '../util/executeSchedule'; import { OperatorSubscriber } from './OperatorSubscriber'; /** @@ -114,7 +115,11 @@ export function mergeInternals( // Particularly for `expand`, we need to check to see if a scheduler was provided // for when we want to start our inner subscription. Otherwise, we just start // are next inner subscription. - innerSubScheduler ? subscriber.add(innerSubScheduler.schedule(() => doInnerSub(bufferedValue))) : doInnerSub(bufferedValue); + if (innerSubScheduler) { + executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue)); + } else { + doInnerSub(bufferedValue); + } } // Check to see if we can complete, and complete if so. checkComplete(); diff --git a/src/internal/operators/observeOn.ts b/src/internal/operators/observeOn.ts index 049c8b03268..2c27dbcec9d 100644 --- a/src/internal/operators/observeOn.ts +++ b/src/internal/operators/observeOn.ts @@ -1,5 +1,6 @@ /** @prettier */ import { MonoTypeOperatorFunction, SchedulerLike } from '../types'; +import { executeSchedule } from '../util/executeSchedule'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; @@ -57,51 +58,12 @@ import { OperatorSubscriber } from './OperatorSubscriber'; */ export function observeOn(scheduler: SchedulerLike, delay = 0): MonoTypeOperatorFunction { return operate((source, subscriber) => { - /** - * Executes work with the provided scheduler and provided delay. - * This exists primarily to manage the Actions being scheduled to make - * sure they are removed from the parent Subscription. - * Actions will be retained in memory until the parent Subscription finalizes - * because they could be rescheduled at any time. We know the - * actions in this operator are NOT going to be rescheduled, so - * we want to make sure they're removed as soon as possible. - * @param execute The work to schedule with the scheduler provided - */ - const schedule = (execute: () => void) => { - let syncUnsub = false; - const actionSubs = scheduler.schedule(() => { - if (actionSubs) { - // The action fired asynchronously, so we have a subscription - // we can unsubscribe before continuing. Unsubscription will - // remove the Action/Subscription from the parent (subscriber). - actionSubs.unsubscribe(); - } else { - // The action fired synchronously, so we don't have a - // subscription we can unsubscribe just yet. Flag that - // we want to unsubscribe when we do get it. - syncUnsub = true; - } - // Execute the work required. - execute(); - }, delay); - - if (syncUnsub) { - // The action above fired synchronously, so we can tear it down. - actionSubs.unsubscribe(); - } else { - // The action hasn't fired yet. It's asynchronous. So we should - // add it to our subscriber, which is the parent Subscription, - // so it is unsubscribed if our consumer unsubscribes. - subscriber.add(actionSubs); - } - }; - source.subscribe( new OperatorSubscriber( subscriber, - (value) => schedule(() => subscriber.next(value)), - () => schedule(() => subscriber.complete()), - (err) => schedule(() => subscriber.error(err)) + (value) => executeSchedule(subscriber, scheduler, () => subscriber.next(value), delay), + () => executeSchedule(subscriber, scheduler, () => subscriber.complete(), delay), + (err) => executeSchedule(subscriber, scheduler, () => subscriber.error(err), delay) ) ); }); diff --git a/src/internal/operators/timeout.ts b/src/internal/operators/timeout.ts index 8e33cde9b20..1d17e40a845 100644 --- a/src/internal/operators/timeout.ts +++ b/src/internal/operators/timeout.ts @@ -6,8 +6,8 @@ import { operate } from '../util/lift'; import { Observable } from '../Observable'; import { innerFrom } from '../observable/from'; import { createErrorClass } from '../util/createErrorClass'; -import { caughtSchedule } from '../util/caughtSchedule'; import { OperatorSubscriber } from './OperatorSubscriber'; +import { executeSchedule } from '../util/executeSchedule'; export interface TimeoutConfig = ObservableInput, M = unknown> { /** @@ -342,18 +342,22 @@ export function timeout, M>( // tell how many values we have seen so far. let seen = 0; const startTimer = (delay: number) => { - timerSubscription = caughtSchedule( + timerSubscription = executeSchedule( subscriber, scheduler, () => { - originalSourceSubscription.unsubscribe(); - innerFrom( - _with!({ - meta, - lastValue, - seen, - }) - ).subscribe(subscriber); + try { + originalSourceSubscription.unsubscribe(); + innerFrom( + _with!({ + meta, + lastValue, + seen, + }) + ).subscribe(subscriber); + } catch (err) { + subscriber.error(err); + } }, delay ); diff --git a/src/internal/operators/windowTime.ts b/src/internal/operators/windowTime.ts index d5c8354b126..4301e616ad4 100644 --- a/src/internal/operators/windowTime.ts +++ b/src/internal/operators/windowTime.ts @@ -7,6 +7,7 @@ import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; import { arrRemove } from '../util/arrRemove'; import { popScheduler } from '../util/args'; +import { executeSchedule } from '../util/executeSchedule'; export function windowTime(windowTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction>; export function windowTime( @@ -136,21 +137,18 @@ export function windowTime(windowTimeSpan: number, ...otherArgs: any[]): Oper }; windowRecords.push(record); subscriber.next(window.asObservable()); - subs.add(scheduler.schedule(() => closeWindow(record), windowTimeSpan)); + executeSchedule(subs, scheduler, () => closeWindow(record), windowTimeSpan); } }; - windowCreationInterval !== null && windowCreationInterval >= 0 - ? // The user passed both a windowTimeSpan (required), and a creation interval - // That means we need to start new window on the interval, and those windows need - // to wait the required time span before completing. - subscriber.add( - scheduler.schedule(function () { - startWindow(); - !this.closed && subscriber.add(this.schedule(null, windowCreationInterval)); - }, windowCreationInterval) - ) - : (restartOnClose = true); + if (windowCreationInterval !== null && windowCreationInterval >= 0) { + // The user passed both a windowTimeSpan (required), and a creation interval + // That means we need to start new window on the interval, and those windows need + // to wait the required time span before completing. + executeSchedule(subscriber, scheduler, startWindow, windowCreationInterval, true); + } else { + restartOnClose = true; + } startWindow(); diff --git a/src/internal/scheduled/scheduleAsyncIterable.ts b/src/internal/scheduled/scheduleAsyncIterable.ts index ffadfbf2937..b5b44abffe0 100644 --- a/src/internal/scheduled/scheduleAsyncIterable.ts +++ b/src/internal/scheduled/scheduleAsyncIterable.ts @@ -1,28 +1,32 @@ import { SchedulerLike } from '../types'; import { Observable } from '../Observable'; import { Subscription } from '../Subscription'; +import { executeSchedule } from '../util/executeSchedule'; export function scheduleAsyncIterable(input: AsyncIterable, scheduler: SchedulerLike) { if (!input) { throw new Error('Iterable cannot be null'); } - return new Observable(subscriber => { - const sub = new Subscription(); - sub.add( - scheduler.schedule(() => { - const iterator = input[Symbol.asyncIterator](); - sub.add(scheduler.schedule(function () { - iterator.next().then(result => { + return new Observable((subscriber) => { + executeSchedule(subscriber, scheduler, () => { + const iterator = input[Symbol.asyncIterator](); + executeSchedule( + subscriber, + scheduler, + () => { + iterator.next().then((result) => { if (result.done) { + // This will remove the subscriptions from + // the parent subscription. subscriber.complete(); } else { subscriber.next(result.value); - this.schedule(); } }); - })); - }) - ); - return sub; + }, + 0, + true + ); + }); }); } diff --git a/src/internal/scheduled/scheduleIterable.ts b/src/internal/scheduled/scheduleIterable.ts index 71c3ff17d3d..4b1e1d3fdcf 100644 --- a/src/internal/scheduled/scheduleIterable.ts +++ b/src/internal/scheduled/scheduleIterable.ts @@ -2,7 +2,7 @@ import { Observable } from '../Observable'; import { SchedulerLike } from '../types'; import { iterator as Symbol_iterator } from '../symbol/iterator'; import { isFunction } from '../util/isFunction'; -import { caughtSchedule } from '../util/caughtSchedule'; +import { executeSchedule } from '../util/executeSchedule'; /** * Used in {@link scheduled} to create an observable from an Iterable. @@ -16,15 +16,25 @@ export function scheduleIterable(input: Iterable, scheduler: SchedulerLike // Schedule the initial creation of the iterator from // the iterable. This is so the code in the iterable is // not called until the scheduled job fires. - subscriber.add( - scheduler.schedule(() => { - // Create the iterator. - iterator = (input as any)[Symbol_iterator](); + executeSchedule(subscriber, scheduler, () => { + // Create the iterator. + iterator = (input as any)[Symbol_iterator](); + + executeSchedule( + subscriber, + scheduler, + () => { + let value: T; + let done: boolean | undefined; + try { + // Pull the value out of the iterator + ({ value, done } = iterator.next()); + } catch (err) { + // We got an error while pulling from the iterator + subscriber.error(err); + return; + } - // Schedule the first iteration and emission. - caughtSchedule(subscriber, scheduler, function () { - // Pull the value out of the iterator - const { value, done } = iterator.next(); if (done) { // If it is "done" we just complete. This mimics the // behavior of JavaScript's `for..of` consumption of @@ -34,13 +44,12 @@ export function scheduleIterable(input: Iterable, scheduler: SchedulerLike } else { // The iterable is not done, emit the value. subscriber.next(value); - // Reschedule. This will cause this function to be - // called again on the same scheduled delay. - this.schedule(); } - }); - }) - ); + }, + 0, + true + ); + }); // During teardown, if we see this iterator has a `return` method, // then we know it is a Generator, and not just an Iterator. So we call diff --git a/src/internal/scheduled/scheduleObservable.ts b/src/internal/scheduled/scheduleObservable.ts index 97d6c0d5707..0e724f5a1fe 100644 --- a/src/internal/scheduled/scheduleObservable.ts +++ b/src/internal/scheduled/scheduleObservable.ts @@ -1,19 +1,8 @@ -import { Observable } from '../Observable'; -import { Subscription } from '../Subscription'; -import { observable as Symbol_observable } from '../symbol/observable'; -import { InteropObservable, SchedulerLike, Subscribable } from '../types'; +import { innerFrom } from '../observable/from'; +import { observeOn } from '../operators/observeOn'; +import { subscribeOn } from '../operators/subscribeOn'; +import { InteropObservable, SchedulerLike } from '../types'; export function scheduleObservable(input: InteropObservable, scheduler: SchedulerLike) { - return new Observable(subscriber => { - const sub = new Subscription(); - sub.add(scheduler.schedule(() => { - const observable: Subscribable = (input as any)[Symbol_observable](); - sub.add(observable.subscribe({ - next(value) { sub.add(scheduler.schedule(() => subscriber.next(value))); }, - error(err) { sub.add(scheduler.schedule(() => subscriber.error(err))); }, - complete() { sub.add(scheduler.schedule(() => subscriber.complete())); }, - })); - })); - return sub; - }); + return innerFrom(input).pipe(subscribeOn(scheduler), observeOn(scheduler)); } diff --git a/src/internal/scheduled/schedulePromise.ts b/src/internal/scheduled/schedulePromise.ts index 00da90278bb..e04b7a10f64 100644 --- a/src/internal/scheduled/schedulePromise.ts +++ b/src/internal/scheduled/schedulePromise.ts @@ -1,22 +1,8 @@ -import { Observable } from '../Observable'; +import { innerFrom } from '../observable/from'; +import { observeOn } from '../operators/observeOn'; +import { subscribeOn } from '../operators/subscribeOn'; import { SchedulerLike } from '../types'; export function schedulePromise(input: PromiseLike, scheduler: SchedulerLike) { - return new Observable((subscriber) => { - return scheduler.schedule(() => - input.then( - (value) => { - subscriber.add( - scheduler.schedule(() => { - subscriber.next(value); - subscriber.add(scheduler.schedule(() => subscriber.complete())); - }) - ); - }, - (err) => { - subscriber.add(scheduler.schedule(() => subscriber.error(err))); - } - ) - ); - }); + return innerFrom(input).pipe(subscribeOn(scheduler), observeOn(scheduler)); } diff --git a/src/internal/util/caughtSchedule.ts b/src/internal/util/caughtSchedule.ts deleted file mode 100644 index 5ff9bdcd4ca..00000000000 --- a/src/internal/util/caughtSchedule.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { SchedulerAction, SchedulerLike } from '../types'; - -export function caughtSchedule( - subscriber: Subscriber, - scheduler: SchedulerLike, - execute: (this: SchedulerAction) => void, - delay = 0 -): Subscription { - const subscription = scheduler.schedule(function () { - try { - execute.call(this); - } catch (err) { - subscriber.error(err); - } - }, delay); - subscriber.add(subscription); - return subscription; -} diff --git a/src/internal/util/executeSchedule.ts b/src/internal/util/executeSchedule.ts new file mode 100644 index 00000000000..1bcb9900ff9 --- /dev/null +++ b/src/internal/util/executeSchedule.ts @@ -0,0 +1,44 @@ +import { Subscription } from '../Subscription'; +import { SchedulerAction, SchedulerLike } from '../types'; + +export function executeSchedule( + parentSubscription: Subscription, + scheduler: SchedulerLike, + work: () => void, + delay: number, + repeat: true +): void; +export function executeSchedule( + parentSubscription: Subscription, + scheduler: SchedulerLike, + work: () => void, + delay?: number, + repeat?: false +): Subscription; + +export function executeSchedule( + parentSubscription: Subscription, + scheduler: SchedulerLike, + work: () => void, + delay = 0, + repeat = false +): Subscription | void { + const scheduleSubscription = scheduler.schedule(function (this: SchedulerAction) { + work(); + if (repeat) { + parentSubscription.add(this.schedule(null, delay)); + } else { + this.unsubscribe(); + } + }, delay); + + parentSubscription.add(scheduleSubscription); + + if (!repeat) { + // Because user-land scheduler implementations are unlikely to properly reuse + // Actions for repeat scheduling, we can't trust that the returned subscription + // will control repeat subscription scenarios. So we're trying to avoid using them + // incorrectly within this library. + return scheduleSubscription; + } +}