diff --git a/api_guard/dist/types/index.d.ts b/api_guard/dist/types/index.d.ts index 71567243f0..abbd15a343 100644 --- a/api_guard/dist/types/index.d.ts +++ b/api_guard/dist/types/index.d.ts @@ -251,7 +251,7 @@ export declare function forkJoin>> }>; export declare function from>(input: O): Observable>; -export declare function from>(input: O, scheduler: SchedulerLike): Observable>; +export declare function from>(input: O, scheduler: SchedulerLike | undefined): Observable>; export declare function fromEvent(target: HasEventTargetAddRemove | ArrayLike>, eventName: string): Observable; export declare function fromEvent(target: HasEventTargetAddRemove | ArrayLike>, eventName: string, resultSelector: (event: T) => R): Observable; diff --git a/src/internal/observable/combineLatest.ts b/src/internal/observable/combineLatest.ts index c573f02c8e..b207405fe9 100644 --- a/src/internal/observable/combineLatest.ts +++ b/src/internal/observable/combineLatest.ts @@ -10,6 +10,7 @@ import { popResultSelector, popScheduler } from '../util/args'; import { createObject } from '../util/createObject'; import { OperatorSubscriber } from '../operators/OperatorSubscriber'; import { AnyCatcher } from '../AnyCatcher'; +import { executeSchedule } from '../util/executeSchedule'; // combineLatest(any) // We put this first because we need to catch cases where the user has supplied @@ -293,7 +294,7 @@ export function combineLatestInit( */ function maybeSchedule(scheduler: SchedulerLike | undefined, execute: () => void, subscription: Subscription) { if (scheduler) { - subscription.add(scheduler.schedule(execute)); + executeSchedule(subscription, scheduler, execute); } else { execute(); } diff --git a/src/internal/observable/concat.ts b/src/internal/observable/concat.ts index de34c49b92..230c1a7bb9 100644 --- a/src/internal/observable/concat.ts +++ b/src/internal/observable/concat.ts @@ -1,11 +1,13 @@ import { Observable } from '../Observable'; import { ObservableInputTuple, SchedulerLike } from '../types'; import { concatAll } from '../operators/concatAll'; -import { internalFromArray } from './fromArray'; import { popScheduler } from '../util/args'; +import { from } from './from'; export function concat(...inputs: [...ObservableInputTuple]): Observable; -export function concat(...inputsAndScheduler: [...ObservableInputTuple, SchedulerLike]): Observable; +export function concat( + ...inputsAndScheduler: [...ObservableInputTuple, SchedulerLike] +): Observable; /** * Creates an output Observable which sequentially emits all values from the first given @@ -113,5 +115,5 @@ export function concat(...inputsAndScheduler: [... * Observable subscription on. */ export function concat(...args: any[]): Observable { - return concatAll()(internalFromArray(args, popScheduler(args))); + return concatAll()(from(args, popScheduler(args))); } diff --git a/src/internal/observable/defer.ts b/src/internal/observable/defer.ts index 051142ccfa..417f867790 100644 --- a/src/internal/observable/defer.ts +++ b/src/internal/observable/defer.ts @@ -1,6 +1,6 @@ import { Observable } from '../Observable'; import { ObservedValueOf, ObservableInput } from '../types'; -import { innerFrom } from './from'; +import { innerFrom } from './innerFrom'; /** * Creates an Observable that, on subscribe, calls an Observable factory to diff --git a/src/internal/observable/dom/fetch.ts b/src/internal/observable/dom/fetch.ts index 5a32723b8f..bc04708823 100644 --- a/src/internal/observable/dom/fetch.ts +++ b/src/internal/observable/dom/fetch.ts @@ -1,6 +1,6 @@ import { OperatorSubscriber } from '../../operators/OperatorSubscriber'; import { Observable } from '../../Observable'; -import { innerFrom } from '../../observable/from'; +import { innerFrom } from '../../observable/innerFrom'; import { ObservableInput } from '../../types'; export function fromFetch( diff --git a/src/internal/observable/forkJoin.ts b/src/internal/observable/forkJoin.ts index af9ec88949..ab5a5a19e9 100644 --- a/src/internal/observable/forkJoin.ts +++ b/src/internal/observable/forkJoin.ts @@ -1,7 +1,7 @@ import { Observable } from '../Observable'; import { ObservedValueOf, ObservableInputTuple, ObservableInput } from '../types'; import { argsArgArrayOrObject } from '../util/argsArgArrayOrObject'; -import { innerFrom } from './from'; +import { innerFrom } from './innerFrom'; import { popResultSelector } from '../util/args'; import { OperatorSubscriber } from '../operators/OperatorSubscriber'; import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs'; diff --git a/src/internal/observable/from.ts b/src/internal/observable/from.ts index 03fb1283d5..1bbea4dd92 100644 --- a/src/internal/observable/from.ts +++ b/src/internal/observable/from.ts @@ -1,22 +1,11 @@ -import { isArrayLike } from '../util/isArrayLike'; -import { isPromise } from '../util/isPromise'; -import { observable as Symbol_observable } from '../symbol/observable'; -import { Subscriber } from '../Subscriber'; - import { Observable } from '../Observable'; -import { ObservableInput, SchedulerLike, ObservedValueOf, ReadableStreamLike } from '../types'; +import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types'; import { scheduled } from '../scheduled/scheduled'; -import { isFunction } from '../util/isFunction'; -import { reportUnhandledError } from '../util/reportUnhandledError'; -import { isInteropObservable } from '../util/isInteropObservable'; -import { isAsyncIterable } from '../util/isAsyncIterable'; -import { createInvalidObservableTypeError } from '../util/throwUnobservableError'; -import { isIterable } from '../util/isIterable'; -import { isReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike'; +import { innerFrom } from './innerFrom'; export function from>(input: O): Observable>; /** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled`. Details: https://rxjs.dev/deprecations/scheduler-argument */ -export function from>(input: O, scheduler: SchedulerLike): Observable>; +export function from>(input: O, scheduler: SchedulerLike | undefined): Observable>; /** * Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object. @@ -118,124 +107,3 @@ export function from>(input: O, scheduler: Schedu export function from(input: ObservableInput, scheduler?: SchedulerLike): Observable { return scheduler ? scheduled(input, scheduler) : innerFrom(input); } - -// TODO: Use this throughout the library, rather than the `from` above, to avoid -// the unnecessary scheduling check and reduce bundled sizes of operators that use `from`. -// TODO: Eventually, this just becomes `from`, as we don't have the deprecated scheduled path anymore. -export function innerFrom(input: ObservableInput): Observable { - if (input instanceof Observable) { - return input; - } - if (input != null) { - if (isInteropObservable(input)) { - return fromInteropObservable(input); - } - if (isArrayLike(input)) { - return fromArrayLike(input); - } - if (isPromise(input)) { - return fromPromise(input); - } - if (isAsyncIterable(input)) { - return fromAsyncIterable(input); - } - if (isIterable(input)) { - return fromIterable(input); - } - if (isReadableStreamLike(input)) { - return fromReadableStreamLike(input); - } - } - - throw createInvalidObservableTypeError(input); -} - -/** - * Creates an RxJS Observable from an object that implements `Symbol.observable`. - * @param obj An object that properly implements `Symbol.observable`. - */ -function fromInteropObservable(obj: any) { - return new Observable((subscriber: Subscriber) => { - const obs = obj[Symbol_observable](); - if (isFunction(obs.subscribe)) { - return obs.subscribe(subscriber); - } - // Should be caught by observable subscribe function error handling. - throw new TypeError('Provided object does not correctly implement Symbol.observable'); - }); -} - -/** - * Synchronously emits the values of an array like and completes. - * This is exported because there are creation functions and operators that need to - * make direct use of the same logic, and there's no reason to make them run through - * `from` conditionals because we *know* they're dealing with an array. - * @param array The array to emit values from - */ -export function fromArrayLike(array: ArrayLike) { - return new Observable((subscriber: Subscriber) => { - // Loop over the array and emit each value. Note two things here: - // 1. We're making sure that the subscriber is not closed on each loop. - // This is so we don't continue looping over a very large array after - // something like a `take`, `takeWhile`, or other synchronous unsubscription - // has already unsubscribed. - // 2. In this form, reentrant code can alter that array we're looping over. - // This is a known issue, but considered an edge case. The alternative would - // be to copy the array before executing the loop, but this has - // performance implications. - for (let i = 0; i < array.length && !subscriber.closed; i++) { - subscriber.next(array[i]); - } - subscriber.complete(); - }); -} - -function fromPromise(promise: PromiseLike) { - return new Observable((subscriber: Subscriber) => { - promise - .then( - (value) => { - if (!subscriber.closed) { - subscriber.next(value); - subscriber.complete(); - } - }, - (err: any) => subscriber.error(err) - ) - .then(null, reportUnhandledError); - }); -} - -function fromIterable(iterable: Iterable) { - return new Observable((subscriber: Subscriber) => { - for (const value of iterable) { - subscriber.next(value); - if (subscriber.closed) { - return; - } - } - subscriber.complete(); - }); -} - -function fromAsyncIterable(asyncIterable: AsyncIterable) { - return new Observable((subscriber: Subscriber) => { - process(asyncIterable, subscriber).catch((err) => subscriber.error(err)); - }); -} - -function fromReadableStreamLike(readableStream: ReadableStreamLike) { - return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream)); -} - -async function process(asyncIterable: AsyncIterable, subscriber: Subscriber) { - for await (const value of asyncIterable) { - subscriber.next(value); - // A side-effect may have closed our subscriber, - // check before the next iteration. - if (subscriber.closed) { - return; - } - } - subscriber.complete(); -} diff --git a/src/internal/observable/fromArray.ts b/src/internal/observable/fromArray.ts deleted file mode 100644 index b1bdc4104c..0000000000 --- a/src/internal/observable/fromArray.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { SchedulerLike } from '../types'; -import { scheduleArray } from '../scheduled/scheduleArray'; -import { fromArrayLike } from './from'; - -export function internalFromArray(input: ArrayLike, scheduler?: SchedulerLike) { - return scheduler ? scheduleArray(input, scheduler) : fromArrayLike(input); -} diff --git a/src/internal/observable/fromEvent.ts b/src/internal/observable/fromEvent.ts index 59dae2a2c5..8044ff9da9 100644 --- a/src/internal/observable/fromEvent.ts +++ b/src/internal/observable/fromEvent.ts @@ -1,9 +1,9 @@ +import { innerFrom } from '../observable/innerFrom'; import { Observable } from '../Observable'; import { mergeMap } from '../operators/mergeMap'; import { isArrayLike } from '../util/isArrayLike'; import { isFunction } from '../util/isFunction'; import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs'; -import { internalFromArray } from './fromArray'; // These constants are used to create handler registry functions using array mapping below. const nodeEventEmitterMethods = ['addListener', 'removeListener'] as const; @@ -267,7 +267,7 @@ export function fromEvent( if (!add) { if (isArrayLike(target)) { return mergeMap((subTarget: any) => fromEvent(subTarget, eventName, options as EventListenerOptions))( - internalFromArray(target) + innerFrom(target) ) as Observable; } } diff --git a/src/internal/observable/innerFrom.ts b/src/internal/observable/innerFrom.ts new file mode 100644 index 0000000000..13e792eee0 --- /dev/null +++ b/src/internal/observable/innerFrom.ts @@ -0,0 +1,131 @@ +import { isArrayLike } from '../util/isArrayLike'; +import { isPromise } from '../util/isPromise'; +import { Observable } from '../Observable'; +import { ObservableInput, ReadableStreamLike } from '../types'; +import { isInteropObservable } from '../util/isInteropObservable'; +import { isAsyncIterable } from '../util/isAsyncIterable'; +import { createInvalidObservableTypeError } from '../util/throwUnobservableError'; +import { isIterable } from '../util/isIterable'; +import { isReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike'; +import { Subscriber } from '../Subscriber'; +import { isFunction } from '../util/isFunction'; +import { reportUnhandledError } from '../util/reportUnhandledError'; +import { observable as Symbol_observable } from '../symbol/observable'; + +export function innerFrom(input: ObservableInput): Observable { + if (input instanceof Observable) { + return input; + } + if (input != null) { + if (isInteropObservable(input)) { + return fromInteropObservable(input); + } + if (isArrayLike(input)) { + return fromArrayLike(input); + } + if (isPromise(input)) { + return fromPromise(input); + } + if (isAsyncIterable(input)) { + return fromAsyncIterable(input); + } + if (isIterable(input)) { + return fromIterable(input); + } + if (isReadableStreamLike(input)) { + return fromReadableStreamLike(input); + } + } + + throw createInvalidObservableTypeError(input); +} + +/** + * Creates an RxJS Observable from an object that implements `Symbol.observable`. + * @param obj An object that properly implements `Symbol.observable`. + */ +export function fromInteropObservable(obj: any) { + return new Observable((subscriber: Subscriber) => { + const obs = obj[Symbol_observable](); + if (isFunction(obs.subscribe)) { + return obs.subscribe(subscriber); + } + // Should be caught by observable subscribe function error handling. + throw new TypeError('Provided object does not correctly implement Symbol.observable'); + }); +} + +/** + * Synchronously emits the values of an array like and completes. + * This is exported because there are creation functions and operators that need to + * make direct use of the same logic, and there's no reason to make them run through + * `from` conditionals because we *know* they're dealing with an array. + * @param array The array to emit values from + */ +export function fromArrayLike(array: ArrayLike) { + return new Observable((subscriber: Subscriber) => { + // Loop over the array and emit each value. Note two things here: + // 1. We're making sure that the subscriber is not closed on each loop. + // This is so we don't continue looping over a very large array after + // something like a `take`, `takeWhile`, or other synchronous unsubscription + // has already unsubscribed. + // 2. In this form, reentrant code can alter that array we're looping over. + // This is a known issue, but considered an edge case. The alternative would + // be to copy the array before executing the loop, but this has + // performance implications. + for (let i = 0; i < array.length && !subscriber.closed; i++) { + subscriber.next(array[i]); + } + subscriber.complete(); + }); +} + +export function fromPromise(promise: PromiseLike) { + return new Observable((subscriber: Subscriber) => { + promise + .then( + (value) => { + if (!subscriber.closed) { + subscriber.next(value); + subscriber.complete(); + } + }, + (err: any) => subscriber.error(err) + ) + .then(null, reportUnhandledError); + }); +} + +export function fromIterable(iterable: Iterable) { + return new Observable((subscriber: Subscriber) => { + for (const value of iterable) { + subscriber.next(value); + if (subscriber.closed) { + return; + } + } + subscriber.complete(); + }); +} + +export function fromAsyncIterable(asyncIterable: AsyncIterable) { + return new Observable((subscriber: Subscriber) => { + process(asyncIterable, subscriber).catch((err) => subscriber.error(err)); + }); +} + +export function fromReadableStreamLike(readableStream: ReadableStreamLike) { + return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream)); +} + +async function process(asyncIterable: AsyncIterable, subscriber: Subscriber) { + for await (const value of asyncIterable) { + subscriber.next(value); + // A side-effect may have closed our subscriber, + // check before the next iteration. + if (subscriber.closed) { + return; + } + } + subscriber.complete(); +} diff --git a/src/internal/observable/merge.ts b/src/internal/observable/merge.ts index 4e3ce849f7..268c03363b 100644 --- a/src/internal/observable/merge.ts +++ b/src/internal/observable/merge.ts @@ -1,10 +1,10 @@ import { Observable } from '../Observable'; import { ObservableInput, ObservableInputTuple, SchedulerLike } from '../types'; import { mergeAll } from '../operators/mergeAll'; -import { internalFromArray } from './fromArray'; -import { innerFrom } from './from'; +import { innerFrom } from './innerFrom'; import { EMPTY } from './empty'; import { popNumber, popScheduler } from '../util/args'; +import { from } from './from'; export function merge(...sources: [...ObservableInputTuple]): Observable; export function merge(...sourcesAndConcurrency: [...ObservableInputTuple, number?]): Observable; @@ -95,5 +95,5 @@ export function merge(...args: (ObservableInput | number | SchedulerLik ? // One source? Just return it. innerFrom(sources[0]) : // Merge all sources - mergeAll(concurrent)(internalFromArray(sources, scheduler)); + mergeAll(concurrent)(from(sources, scheduler)); } diff --git a/src/internal/observable/of.ts b/src/internal/observable/of.ts index f9e0461800..7f9fe21380 100644 --- a/src/internal/observable/of.ts +++ b/src/internal/observable/of.ts @@ -1,8 +1,7 @@ import { SchedulerLike, ValueFromArray } from '../types'; -import { internalFromArray } from './fromArray'; import { Observable } from '../Observable'; -import { scheduleArray } from '../scheduled/scheduleArray'; import { popScheduler } from '../util/args'; +import { from } from './from'; // Devs are more likely to pass null or undefined than they are a scheduler // without accompanying values. To make things easier for (naughty) devs who @@ -80,5 +79,5 @@ export function of(...values: A): Observable(...args: Array): Observable { const scheduler = popScheduler(args); - return scheduler ? scheduleArray(args as T[], scheduler) : internalFromArray(args as T[]); + return from(args as T[], scheduler); } diff --git a/src/internal/observable/partition.ts b/src/internal/observable/partition.ts index 6e9fee2866..1c3c9dfa9b 100644 --- a/src/internal/observable/partition.ts +++ b/src/internal/observable/partition.ts @@ -2,7 +2,7 @@ import { not } from '../util/not'; import { filter } from '../operators/filter'; import { ObservableInput } from '../types'; import { Observable } from '../Observable'; -import { innerFrom } from './from'; +import { innerFrom } from './innerFrom'; /** @deprecated Use a closure instead of a `thisArg`. Signatures accepting a `thisArg` will be removed in v8. */ export function partition( diff --git a/src/internal/observable/race.ts b/src/internal/observable/race.ts index d5fbcbde71..2093ad926e 100644 --- a/src/internal/observable/race.ts +++ b/src/internal/observable/race.ts @@ -1,5 +1,5 @@ import { Observable } from '../Observable'; -import { innerFrom } from './from'; +import { innerFrom } from './innerFrom'; import { Subscription } from '../Subscription'; import { ObservableInput, ObservableInputTuple } from '../types'; import { argsOrArgArray } from '../util/argsOrArgArray'; diff --git a/src/internal/observable/using.ts b/src/internal/observable/using.ts index 7303870343..437fed9c47 100644 --- a/src/internal/observable/using.ts +++ b/src/internal/observable/using.ts @@ -1,6 +1,6 @@ import { Observable } from '../Observable'; import { Unsubscribable, ObservableInput, ObservedValueOf } from '../types'; -import { innerFrom } from './from'; +import { innerFrom } from './innerFrom'; import { EMPTY } from './empty'; /** diff --git a/src/internal/observable/zip.ts b/src/internal/observable/zip.ts index 1b3973d91c..4301089cd4 100644 --- a/src/internal/observable/zip.ts +++ b/src/internal/observable/zip.ts @@ -1,6 +1,6 @@ import { Observable } from '../Observable'; import { ObservableInputTuple } from '../types'; -import { innerFrom } from './from'; +import { innerFrom } from './innerFrom'; import { argsOrArgArray } from '../util/argsOrArgArray'; import { EMPTY } from './empty'; import { OperatorSubscriber } from '../operators/OperatorSubscriber'; diff --git a/src/internal/operators/audit.ts b/src/internal/operators/audit.ts index cf08523520..0f610608ff 100644 --- a/src/internal/operators/audit.ts +++ b/src/internal/operators/audit.ts @@ -2,7 +2,7 @@ import { Subscriber } from '../Subscriber'; import { MonoTypeOperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { OperatorSubscriber } from './OperatorSubscriber'; /** diff --git a/src/internal/operators/bufferTime.ts b/src/internal/operators/bufferTime.ts index f7af350fc4..cab6dd2b21 100644 --- a/src/internal/operators/bufferTime.ts +++ b/src/internal/operators/bufferTime.ts @@ -5,6 +5,7 @@ import { OperatorSubscriber } from './OperatorSubscriber'; import { arrRemove } from '../util/arrRemove'; import { asyncScheduler } from '../scheduler/async'; import { popScheduler } from '../util/args'; +import { executeSchedule } from '../util/executeSchedule'; /* tslint:disable:max-line-length */ export function bufferTime(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction; @@ -117,21 +118,18 @@ export function bufferTime(bufferTimeSpan: number, ...otherArgs: any[]): Oper subs, }; bufferRecords.push(record); - subs.add(scheduler.schedule(() => emit(record), bufferTimeSpan)); + executeSchedule(subs, scheduler, () => emit(record), bufferTimeSpan); } }; - bufferCreationInterval !== null && bufferCreationInterval >= 0 - ? // The user passed both a bufferTimeSpan (required), and a creation interval - // That means we need to start new buffers on the interval, and those buffers need - // to wait the required time span before emitting. - subscriber.add( - scheduler.schedule(function () { - startBuffer(); - !this.closed && subscriber.add(this.schedule(null, bufferCreationInterval)); - }, bufferCreationInterval) - ) - : (restartOnEmit = true); + if (bufferCreationInterval !== null && bufferCreationInterval >= 0) { + // The user passed both a bufferTimeSpan (required), and a creation interval + // That means we need to start new buffers on the interval, and those buffers need + // to wait the required time span before emitting. + executeSchedule(subscriber, scheduler, startBuffer, bufferCreationInterval, true); + } else { + restartOnEmit = true; + } startBuffer(); diff --git a/src/internal/operators/bufferToggle.ts b/src/internal/operators/bufferToggle.ts index a78c154157..61d3613570 100644 --- a/src/internal/operators/bufferToggle.ts +++ b/src/internal/operators/bufferToggle.ts @@ -1,7 +1,7 @@ import { Subscription } from '../Subscription'; import { OperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { OperatorSubscriber } from './OperatorSubscriber'; import { noop } from '../util/noop'; import { arrRemove } from '../util/arrRemove'; @@ -20,6 +20,7 @@ import { arrRemove } from '../util/arrRemove'; * Observable provided to `openings`, and closing and sending the buffers when * a Subscribable or Promise returned by the `closingSelector` function emits. * + * * ## Example * * Every other second, emit the click events from the next 500ms diff --git a/src/internal/operators/bufferWhen.ts b/src/internal/operators/bufferWhen.ts index 79df6cc9c1..fd8638e09b 100644 --- a/src/internal/operators/bufferWhen.ts +++ b/src/internal/operators/bufferWhen.ts @@ -3,7 +3,7 @@ import { ObservableInput, OperatorFunction } from '../types'; import { operate } from '../util/lift'; import { noop } from '../util/noop'; import { OperatorSubscriber } from './OperatorSubscriber'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; /** * Buffers the source Observable values, using a factory function of closing diff --git a/src/internal/operators/catchError.ts b/src/internal/operators/catchError.ts index 1b9da6f904..7a69d49774 100644 --- a/src/internal/operators/catchError.ts +++ b/src/internal/operators/catchError.ts @@ -2,7 +2,7 @@ import { Observable } from '../Observable'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { Subscription } from '../Subscription'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { OperatorSubscriber } from './OperatorSubscriber'; import { operate } from '../util/lift'; diff --git a/src/internal/operators/concat.ts b/src/internal/operators/concat.ts index 0aeb147198..eadb595837 100644 --- a/src/internal/operators/concat.ts +++ b/src/internal/operators/concat.ts @@ -1,8 +1,8 @@ import { ObservableInputTuple, OperatorFunction, SchedulerLike } from '../types'; import { operate } from '../util/lift'; import { concatAll } from './concatAll'; -import { internalFromArray } from '../observable/fromArray'; import { popScheduler } from '../util/args'; +import { from } from '../observable/from'; /** @deprecated Replaced with {@link concatWith}. Will be removed in v8. */ export function concat(...sources: [...ObservableInputTuple]): OperatorFunction; @@ -17,6 +17,6 @@ export function concat( export function concat(...args: any[]): OperatorFunction { const scheduler = popScheduler(args); return operate((source, subscriber) => { - concatAll()(internalFromArray([source, ...args], scheduler)).subscribe(subscriber as any); + concatAll()(from([source, ...args], scheduler)).subscribe(subscriber); }); } diff --git a/src/internal/operators/debounce.ts b/src/internal/operators/debounce.ts index e7df6ed3bb..84f5223ba0 100644 --- a/src/internal/operators/debounce.ts +++ b/src/internal/operators/debounce.ts @@ -3,7 +3,7 @@ import { MonoTypeOperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; import { noop } from '../util/noop'; import { OperatorSubscriber } from './OperatorSubscriber'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; /** * Emits a notification from the source Observable only after a particular time span diff --git a/src/internal/operators/exhaustAll.ts b/src/internal/operators/exhaustAll.ts index 4548408559..d5ebd59fa4 100644 --- a/src/internal/operators/exhaustAll.ts +++ b/src/internal/operators/exhaustAll.ts @@ -1,7 +1,7 @@ import { Subscription } from '../Subscription'; import { OperatorFunction, ObservableInput, ObservedValueOf } from '../types'; import { operate } from '../util/lift'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { OperatorSubscriber } from './OperatorSubscriber'; /** diff --git a/src/internal/operators/exhaustMap.ts b/src/internal/operators/exhaustMap.ts index 6a91568be1..b2ea51b6f4 100644 --- a/src/internal/operators/exhaustMap.ts +++ b/src/internal/operators/exhaustMap.ts @@ -2,7 +2,7 @@ import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; diff --git a/src/internal/operators/groupBy.ts b/src/internal/operators/groupBy.ts index 30c7c46e4a..f31b495849 100644 --- a/src/internal/operators/groupBy.ts +++ b/src/internal/operators/groupBy.ts @@ -1,5 +1,5 @@ import { Observable } from '../Observable'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { Subject } from '../Subject'; import { ObservableInput, Observer, OperatorFunction, SubjectLike } from '../types'; import { operate } from '../util/lift'; diff --git a/src/internal/operators/merge.ts b/src/internal/operators/merge.ts index 9e3e6c5721..d7cae4e3c0 100644 --- a/src/internal/operators/merge.ts +++ b/src/internal/operators/merge.ts @@ -1,9 +1,9 @@ import { ObservableInput, ObservableInputTuple, OperatorFunction, SchedulerLike } from '../types'; import { operate } from '../util/lift'; import { argsOrArgArray } from '../util/argsOrArgArray'; -import { internalFromArray } from '../observable/fromArray'; import { mergeAll } from './mergeAll'; import { popNumber, popScheduler } from '../util/args'; +import { from } from '../observable/from'; /** @deprecated Replaced with {@link mergeWith}. Will be removed in v8. */ export function merge(...sources: [...ObservableInputTuple]): OperatorFunction; @@ -26,6 +26,6 @@ export function merge(...args: unknown[]): OperatorFunction { args = argsOrArgArray(args); return operate((source, subscriber) => { - mergeAll(concurrent)(internalFromArray([source, ...(args as ObservableInput[])], scheduler)).subscribe(subscriber as any); + mergeAll(concurrent)(from([source, ...(args as ObservableInput[])], scheduler)).subscribe(subscriber); }); } diff --git a/src/internal/operators/mergeInternals.ts b/src/internal/operators/mergeInternals.ts index fc7d1a613e..bdb8a008ea 100644 --- a/src/internal/operators/mergeInternals.ts +++ b/src/internal/operators/mergeInternals.ts @@ -1,7 +1,8 @@ import { Observable } from '../Observable'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { Subscriber } from '../Subscriber'; import { ObservableInput, SchedulerLike } from '../types'; +import { executeSchedule } from '../util/executeSchedule'; import { OperatorSubscriber } from './OperatorSubscriber'; /** @@ -114,7 +115,11 @@ export function mergeInternals( // Particularly for `expand`, we need to check to see if a scheduler was provided // for when we want to start our inner subscription. Otherwise, we just start // are next inner subscription. - innerSubScheduler ? subscriber.add(innerSubScheduler.schedule(() => doInnerSub(bufferedValue))) : doInnerSub(bufferedValue); + if (innerSubScheduler) { + executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue)); + } else { + doInnerSub(bufferedValue); + } } // Check to see if we can complete, and complete if so. checkComplete(); diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index 5863c3342f..b159bda883 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -1,6 +1,6 @@ import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { operate } from '../util/lift'; import { mergeInternals } from './mergeInternals'; import { isFunction } from '../util/isFunction'; diff --git a/src/internal/operators/observeOn.ts b/src/internal/operators/observeOn.ts index 049c8b0326..2c27dbcec9 100644 --- a/src/internal/operators/observeOn.ts +++ b/src/internal/operators/observeOn.ts @@ -1,5 +1,6 @@ /** @prettier */ import { MonoTypeOperatorFunction, SchedulerLike } from '../types'; +import { executeSchedule } from '../util/executeSchedule'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; @@ -57,51 +58,12 @@ import { OperatorSubscriber } from './OperatorSubscriber'; */ export function observeOn(scheduler: SchedulerLike, delay = 0): MonoTypeOperatorFunction { return operate((source, subscriber) => { - /** - * Executes work with the provided scheduler and provided delay. - * This exists primarily to manage the Actions being scheduled to make - * sure they are removed from the parent Subscription. - * Actions will be retained in memory until the parent Subscription finalizes - * because they could be rescheduled at any time. We know the - * actions in this operator are NOT going to be rescheduled, so - * we want to make sure they're removed as soon as possible. - * @param execute The work to schedule with the scheduler provided - */ - const schedule = (execute: () => void) => { - let syncUnsub = false; - const actionSubs = scheduler.schedule(() => { - if (actionSubs) { - // The action fired asynchronously, so we have a subscription - // we can unsubscribe before continuing. Unsubscription will - // remove the Action/Subscription from the parent (subscriber). - actionSubs.unsubscribe(); - } else { - // The action fired synchronously, so we don't have a - // subscription we can unsubscribe just yet. Flag that - // we want to unsubscribe when we do get it. - syncUnsub = true; - } - // Execute the work required. - execute(); - }, delay); - - if (syncUnsub) { - // The action above fired synchronously, so we can tear it down. - actionSubs.unsubscribe(); - } else { - // The action hasn't fired yet. It's asynchronous. So we should - // add it to our subscriber, which is the parent Subscription, - // so it is unsubscribed if our consumer unsubscribes. - subscriber.add(actionSubs); - } - }; - source.subscribe( new OperatorSubscriber( subscriber, - (value) => schedule(() => subscriber.next(value)), - () => schedule(() => subscriber.complete()), - (err) => schedule(() => subscriber.error(err)) + (value) => executeSchedule(subscriber, scheduler, () => subscriber.next(value), delay), + () => executeSchedule(subscriber, scheduler, () => subscriber.complete(), delay), + (err) => executeSchedule(subscriber, scheduler, () => subscriber.error(err), delay) ) ); }); diff --git a/src/internal/operators/onErrorResumeNext.ts b/src/internal/operators/onErrorResumeNext.ts index 28afb41627..9fdfcb1348 100644 --- a/src/internal/operators/onErrorResumeNext.ts +++ b/src/internal/operators/onErrorResumeNext.ts @@ -1,7 +1,7 @@ import { Observable } from '../Observable'; import { ObservableInputTuple, OperatorFunction } from '../types'; import { operate } from '../util/lift'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { argsOrArgArray } from '../util/argsOrArgArray'; import { OperatorSubscriber } from './OperatorSubscriber'; import { noop } from '../util/noop'; diff --git a/src/internal/operators/retry.ts b/src/internal/operators/retry.ts index cdbe08bbec..f94f952df2 100644 --- a/src/internal/operators/retry.ts +++ b/src/internal/operators/retry.ts @@ -4,7 +4,7 @@ import { Subscription } from '../Subscription'; import { OperatorSubscriber } from './OperatorSubscriber'; import { identity } from '../util/identity'; import { timer } from '../observable/timer'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; export interface RetryConfig { /** diff --git a/src/internal/operators/skipUntil.ts b/src/internal/operators/skipUntil.ts index f4aa09c6b7..b17bdb96b8 100644 --- a/src/internal/operators/skipUntil.ts +++ b/src/internal/operators/skipUntil.ts @@ -2,7 +2,7 @@ import { Observable } from '../Observable'; import { MonoTypeOperatorFunction } from '../types'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { noop } from '../util/noop'; /** diff --git a/src/internal/operators/switchMap.ts b/src/internal/operators/switchMap.ts index befc548272..3c8cd1fa3f 100644 --- a/src/internal/operators/switchMap.ts +++ b/src/internal/operators/switchMap.ts @@ -1,6 +1,6 @@ import { Subscriber } from '../Subscriber'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; diff --git a/src/internal/operators/takeUntil.ts b/src/internal/operators/takeUntil.ts index a8579af70a..3c594a0420 100644 --- a/src/internal/operators/takeUntil.ts +++ b/src/internal/operators/takeUntil.ts @@ -1,7 +1,7 @@ import { MonoTypeOperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { noop } from '../util/noop'; /** diff --git a/src/internal/operators/throttle.ts b/src/internal/operators/throttle.ts index b5dec74bea..0cf54071d1 100644 --- a/src/internal/operators/throttle.ts +++ b/src/internal/operators/throttle.ts @@ -3,7 +3,7 @@ import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; export interface ThrottleConfig { leading?: boolean; diff --git a/src/internal/operators/timeout.ts b/src/internal/operators/timeout.ts index 8e33cde9b2..d3d8fdcdf4 100644 --- a/src/internal/operators/timeout.ts +++ b/src/internal/operators/timeout.ts @@ -4,10 +4,10 @@ import { isValidDate } from '../util/isDate'; import { Subscription } from '../Subscription'; import { operate } from '../util/lift'; import { Observable } from '../Observable'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { createErrorClass } from '../util/createErrorClass'; -import { caughtSchedule } from '../util/caughtSchedule'; import { OperatorSubscriber } from './OperatorSubscriber'; +import { executeSchedule } from '../util/executeSchedule'; export interface TimeoutConfig = ObservableInput, M = unknown> { /** @@ -342,18 +342,22 @@ export function timeout, M>( // tell how many values we have seen so far. let seen = 0; const startTimer = (delay: number) => { - timerSubscription = caughtSchedule( + timerSubscription = executeSchedule( subscriber, scheduler, () => { - originalSourceSubscription.unsubscribe(); - innerFrom( - _with!({ - meta, - lastValue, - seen, - }) - ).subscribe(subscriber); + try { + originalSourceSubscription.unsubscribe(); + innerFrom( + _with!({ + meta, + lastValue, + seen, + }) + ).subscribe(subscriber); + } catch (err) { + subscriber.error(err); + } }, delay ); diff --git a/src/internal/operators/windowTime.ts b/src/internal/operators/windowTime.ts index d5c8354b12..4301e616ad 100644 --- a/src/internal/operators/windowTime.ts +++ b/src/internal/operators/windowTime.ts @@ -7,6 +7,7 @@ import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; import { arrRemove } from '../util/arrRemove'; import { popScheduler } from '../util/args'; +import { executeSchedule } from '../util/executeSchedule'; export function windowTime(windowTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction>; export function windowTime( @@ -136,21 +137,18 @@ export function windowTime(windowTimeSpan: number, ...otherArgs: any[]): Oper }; windowRecords.push(record); subscriber.next(window.asObservable()); - subs.add(scheduler.schedule(() => closeWindow(record), windowTimeSpan)); + executeSchedule(subs, scheduler, () => closeWindow(record), windowTimeSpan); } }; - windowCreationInterval !== null && windowCreationInterval >= 0 - ? // The user passed both a windowTimeSpan (required), and a creation interval - // That means we need to start new window on the interval, and those windows need - // to wait the required time span before completing. - subscriber.add( - scheduler.schedule(function () { - startWindow(); - !this.closed && subscriber.add(this.schedule(null, windowCreationInterval)); - }, windowCreationInterval) - ) - : (restartOnClose = true); + if (windowCreationInterval !== null && windowCreationInterval >= 0) { + // The user passed both a windowTimeSpan (required), and a creation interval + // That means we need to start new window on the interval, and those windows need + // to wait the required time span before completing. + executeSchedule(subscriber, scheduler, startWindow, windowCreationInterval, true); + } else { + restartOnClose = true; + } startWindow(); diff --git a/src/internal/operators/windowToggle.ts b/src/internal/operators/windowToggle.ts index 4e6eaf38d0..ce1c3958b8 100644 --- a/src/internal/operators/windowToggle.ts +++ b/src/internal/operators/windowToggle.ts @@ -3,7 +3,7 @@ import { Subject } from '../Subject'; import { Subscription } from '../Subscription'; import { ObservableInput, OperatorFunction } from '../types'; import { operate } from '../util/lift'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { OperatorSubscriber } from './OperatorSubscriber'; import { noop } from '../util/noop'; import { arrRemove } from '../util/arrRemove'; diff --git a/src/internal/operators/windowWhen.ts b/src/internal/operators/windowWhen.ts index fac8d342be..c8ae1556de 100644 --- a/src/internal/operators/windowWhen.ts +++ b/src/internal/operators/windowWhen.ts @@ -4,7 +4,7 @@ import { Subject } from '../Subject'; import { ObservableInput, OperatorFunction } from '../types'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; /** * Branch out the source Observable values as a nested Observable using a diff --git a/src/internal/operators/withLatestFrom.ts b/src/internal/operators/withLatestFrom.ts index 9b2dc769e4..9e4bc97a17 100644 --- a/src/internal/operators/withLatestFrom.ts +++ b/src/internal/operators/withLatestFrom.ts @@ -1,7 +1,7 @@ import { OperatorFunction, ObservableInputTuple } from '../types'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; -import { innerFrom } from '../observable/from'; +import { innerFrom } from '../observable/innerFrom'; import { identity } from '../util/identity'; import { noop } from '../util/noop'; import { popResultSelector } from '../util/args'; diff --git a/src/internal/scheduled/scheduleAsyncIterable.ts b/src/internal/scheduled/scheduleAsyncIterable.ts index ffadfbf293..daa034666e 100644 --- a/src/internal/scheduled/scheduleAsyncIterable.ts +++ b/src/internal/scheduled/scheduleAsyncIterable.ts @@ -1,28 +1,31 @@ import { SchedulerLike } from '../types'; import { Observable } from '../Observable'; -import { Subscription } from '../Subscription'; +import { executeSchedule } from '../util/executeSchedule'; export function scheduleAsyncIterable(input: AsyncIterable, scheduler: SchedulerLike) { if (!input) { throw new Error('Iterable cannot be null'); } - return new Observable(subscriber => { - const sub = new Subscription(); - sub.add( - scheduler.schedule(() => { - const iterator = input[Symbol.asyncIterator](); - sub.add(scheduler.schedule(function () { - iterator.next().then(result => { + return new Observable((subscriber) => { + executeSchedule(subscriber, scheduler, () => { + const iterator = input[Symbol.asyncIterator](); + executeSchedule( + subscriber, + scheduler, + () => { + iterator.next().then((result) => { if (result.done) { + // This will remove the subscriptions from + // the parent subscription. subscriber.complete(); } else { subscriber.next(result.value); - this.schedule(); } }); - })); - }) - ); - return sub; + }, + 0, + true + ); + }); }); } diff --git a/src/internal/scheduled/scheduleIterable.ts b/src/internal/scheduled/scheduleIterable.ts index 71c3ff17d3..4b1e1d3fdc 100644 --- a/src/internal/scheduled/scheduleIterable.ts +++ b/src/internal/scheduled/scheduleIterable.ts @@ -2,7 +2,7 @@ import { Observable } from '../Observable'; import { SchedulerLike } from '../types'; import { iterator as Symbol_iterator } from '../symbol/iterator'; import { isFunction } from '../util/isFunction'; -import { caughtSchedule } from '../util/caughtSchedule'; +import { executeSchedule } from '../util/executeSchedule'; /** * Used in {@link scheduled} to create an observable from an Iterable. @@ -16,15 +16,25 @@ export function scheduleIterable(input: Iterable, scheduler: SchedulerLike // Schedule the initial creation of the iterator from // the iterable. This is so the code in the iterable is // not called until the scheduled job fires. - subscriber.add( - scheduler.schedule(() => { - // Create the iterator. - iterator = (input as any)[Symbol_iterator](); + executeSchedule(subscriber, scheduler, () => { + // Create the iterator. + iterator = (input as any)[Symbol_iterator](); + + executeSchedule( + subscriber, + scheduler, + () => { + let value: T; + let done: boolean | undefined; + try { + // Pull the value out of the iterator + ({ value, done } = iterator.next()); + } catch (err) { + // We got an error while pulling from the iterator + subscriber.error(err); + return; + } - // Schedule the first iteration and emission. - caughtSchedule(subscriber, scheduler, function () { - // Pull the value out of the iterator - const { value, done } = iterator.next(); if (done) { // If it is "done" we just complete. This mimics the // behavior of JavaScript's `for..of` consumption of @@ -34,13 +44,12 @@ export function scheduleIterable(input: Iterable, scheduler: SchedulerLike } else { // The iterable is not done, emit the value. subscriber.next(value); - // Reschedule. This will cause this function to be - // called again on the same scheduled delay. - this.schedule(); } - }); - }) - ); + }, + 0, + true + ); + }); // During teardown, if we see this iterator has a `return` method, // then we know it is a Generator, and not just an Iterator. So we call diff --git a/src/internal/scheduled/scheduleObservable.ts b/src/internal/scheduled/scheduleObservable.ts index 97d6c0d570..29ba3b5032 100644 --- a/src/internal/scheduled/scheduleObservable.ts +++ b/src/internal/scheduled/scheduleObservable.ts @@ -1,19 +1,8 @@ -import { Observable } from '../Observable'; -import { Subscription } from '../Subscription'; -import { observable as Symbol_observable } from '../symbol/observable'; -import { InteropObservable, SchedulerLike, Subscribable } from '../types'; +import { innerFrom } from '../observable/innerFrom'; +import { observeOn } from '../operators/observeOn'; +import { subscribeOn } from '../operators/subscribeOn'; +import { InteropObservable, SchedulerLike } from '../types'; export function scheduleObservable(input: InteropObservable, scheduler: SchedulerLike) { - return new Observable(subscriber => { - const sub = new Subscription(); - sub.add(scheduler.schedule(() => { - const observable: Subscribable = (input as any)[Symbol_observable](); - sub.add(observable.subscribe({ - next(value) { sub.add(scheduler.schedule(() => subscriber.next(value))); }, - error(err) { sub.add(scheduler.schedule(() => subscriber.error(err))); }, - complete() { sub.add(scheduler.schedule(() => subscriber.complete())); }, - })); - })); - return sub; - }); + return innerFrom(input).pipe(subscribeOn(scheduler), observeOn(scheduler)); } diff --git a/src/internal/scheduled/schedulePromise.ts b/src/internal/scheduled/schedulePromise.ts index 00da90278b..f1211d05c4 100644 --- a/src/internal/scheduled/schedulePromise.ts +++ b/src/internal/scheduled/schedulePromise.ts @@ -1,22 +1,8 @@ -import { Observable } from '../Observable'; +import { innerFrom } from '../observable/innerFrom'; +import { observeOn } from '../operators/observeOn'; +import { subscribeOn } from '../operators/subscribeOn'; import { SchedulerLike } from '../types'; export function schedulePromise(input: PromiseLike, scheduler: SchedulerLike) { - return new Observable((subscriber) => { - return scheduler.schedule(() => - input.then( - (value) => { - subscriber.add( - scheduler.schedule(() => { - subscriber.next(value); - subscriber.add(scheduler.schedule(() => subscriber.complete())); - }) - ); - }, - (err) => { - subscriber.add(scheduler.schedule(() => subscriber.error(err))); - } - ) - ); - }); + return innerFrom(input).pipe(subscribeOn(scheduler), observeOn(scheduler)); } diff --git a/src/internal/util/caughtSchedule.ts b/src/internal/util/caughtSchedule.ts deleted file mode 100644 index 5ff9bdcd4c..0000000000 --- a/src/internal/util/caughtSchedule.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { SchedulerAction, SchedulerLike } from '../types'; - -export function caughtSchedule( - subscriber: Subscriber, - scheduler: SchedulerLike, - execute: (this: SchedulerAction) => void, - delay = 0 -): Subscription { - const subscription = scheduler.schedule(function () { - try { - execute.call(this); - } catch (err) { - subscriber.error(err); - } - }, delay); - subscriber.add(subscription); - return subscription; -} diff --git a/src/internal/util/executeSchedule.ts b/src/internal/util/executeSchedule.ts new file mode 100644 index 0000000000..1bcb9900ff --- /dev/null +++ b/src/internal/util/executeSchedule.ts @@ -0,0 +1,44 @@ +import { Subscription } from '../Subscription'; +import { SchedulerAction, SchedulerLike } from '../types'; + +export function executeSchedule( + parentSubscription: Subscription, + scheduler: SchedulerLike, + work: () => void, + delay: number, + repeat: true +): void; +export function executeSchedule( + parentSubscription: Subscription, + scheduler: SchedulerLike, + work: () => void, + delay?: number, + repeat?: false +): Subscription; + +export function executeSchedule( + parentSubscription: Subscription, + scheduler: SchedulerLike, + work: () => void, + delay = 0, + repeat = false +): Subscription | void { + const scheduleSubscription = scheduler.schedule(function (this: SchedulerAction) { + work(); + if (repeat) { + parentSubscription.add(this.schedule(null, delay)); + } else { + this.unsubscribe(); + } + }, delay); + + parentSubscription.add(scheduleSubscription); + + if (!repeat) { + // Because user-land scheduler implementations are unlikely to properly reuse + // Actions for repeat scheduling, we can't trust that the returned subscription + // will control repeat subscription scenarios. So we're trying to avoid using them + // incorrectly within this library. + return scheduleSubscription; + } +}