Skip to content

Commit

Permalink
fix: scheduling with Rx-provided schedulers will no longer leak actio…
Browse files Browse the repository at this point in the history
…n references (#6562)

* fix: scheduling with Rx-provided schedulers will no longer leak action references

Resolves #6561

* refactor: Remove circular dependencies and redundant code

* chore: update api_guardian files
  • Loading branch information
benlesh committed Aug 14, 2021
1 parent 5d0e41e commit ff5a748
Show file tree
Hide file tree
Showing 47 changed files with 318 additions and 345 deletions.
2 changes: 1 addition & 1 deletion api_guard/dist/types/index.d.ts
Expand Up @@ -251,7 +251,7 @@ export declare function forkJoin<T extends Record<string, ObservableInput<any>>>
}>;

export declare function from<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>;
export declare function from<O extends ObservableInput<any>>(input: O, scheduler: SchedulerLike): Observable<ObservedValueOf<O>>;
export declare function from<O extends ObservableInput<any>>(input: O, scheduler: SchedulerLike | undefined): Observable<ObservedValueOf<O>>;

export declare function fromEvent<T>(target: HasEventTargetAddRemove<T> | ArrayLike<HasEventTargetAddRemove<T>>, eventName: string): Observable<T>;
export declare function fromEvent<T, R>(target: HasEventTargetAddRemove<T> | ArrayLike<HasEventTargetAddRemove<T>>, eventName: string, resultSelector: (event: T) => R): Observable<R>;
Expand Down
3 changes: 2 additions & 1 deletion src/internal/observable/combineLatest.ts
Expand Up @@ -10,6 +10,7 @@ import { popResultSelector, popScheduler } from '../util/args';
import { createObject } from '../util/createObject';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { AnyCatcher } from '../AnyCatcher';
import { executeSchedule } from '../util/executeSchedule';

// combineLatest(any)
// We put this first because we need to catch cases where the user has supplied
Expand Down Expand Up @@ -293,7 +294,7 @@ export function combineLatestInit(
*/
function maybeSchedule(scheduler: SchedulerLike | undefined, execute: () => void, subscription: Subscription) {
if (scheduler) {
subscription.add(scheduler.schedule(execute));
executeSchedule(subscription, scheduler, execute);
} else {
execute();
}
Expand Down
8 changes: 5 additions & 3 deletions src/internal/observable/concat.ts
@@ -1,11 +1,13 @@
import { Observable } from '../Observable';
import { ObservableInputTuple, SchedulerLike } from '../types';
import { concatAll } from '../operators/concatAll';
import { internalFromArray } from './fromArray';
import { popScheduler } from '../util/args';
import { from } from './from';

export function concat<T extends readonly unknown[]>(...inputs: [...ObservableInputTuple<T>]): Observable<T[number]>;
export function concat<T extends readonly unknown[]>(...inputsAndScheduler: [...ObservableInputTuple<T>, SchedulerLike]): Observable<T[number]>;
export function concat<T extends readonly unknown[]>(
...inputsAndScheduler: [...ObservableInputTuple<T>, SchedulerLike]
): Observable<T[number]>;

/**
* Creates an output Observable which sequentially emits all values from the first given
Expand Down Expand Up @@ -113,5 +115,5 @@ export function concat<T extends readonly unknown[]>(...inputsAndScheduler: [...
* Observable subscription on.
*/
export function concat(...args: any[]): Observable<unknown> {
return concatAll()(internalFromArray(args, popScheduler(args)));
return concatAll()(from(args, popScheduler(args)));
}
2 changes: 1 addition & 1 deletion src/internal/observable/defer.ts
@@ -1,6 +1,6 @@
import { Observable } from '../Observable';
import { ObservedValueOf, ObservableInput } from '../types';
import { innerFrom } from './from';
import { innerFrom } from './innerFrom';

/**
* Creates an Observable that, on subscribe, calls an Observable factory to
Expand Down
2 changes: 1 addition & 1 deletion src/internal/observable/dom/fetch.ts
@@ -1,6 +1,6 @@
import { OperatorSubscriber } from '../../operators/OperatorSubscriber';
import { Observable } from '../../Observable';
import { innerFrom } from '../../observable/from';
import { innerFrom } from '../../observable/innerFrom';
import { ObservableInput } from '../../types';

export function fromFetch<T>(
Expand Down
2 changes: 1 addition & 1 deletion src/internal/observable/forkJoin.ts
@@ -1,7 +1,7 @@
import { Observable } from '../Observable';
import { ObservedValueOf, ObservableInputTuple, ObservableInput } from '../types';
import { argsArgArrayOrObject } from '../util/argsArgArrayOrObject';
import { innerFrom } from './from';
import { innerFrom } from './innerFrom';
import { popResultSelector } from '../util/args';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
Expand Down
138 changes: 3 additions & 135 deletions src/internal/observable/from.ts
@@ -1,22 +1,11 @@
import { isArrayLike } from '../util/isArrayLike';
import { isPromise } from '../util/isPromise';
import { observable as Symbol_observable } from '../symbol/observable';
import { Subscriber } from '../Subscriber';

import { Observable } from '../Observable';
import { ObservableInput, SchedulerLike, ObservedValueOf, ReadableStreamLike } from '../types';
import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types';
import { scheduled } from '../scheduled/scheduled';
import { isFunction } from '../util/isFunction';
import { reportUnhandledError } from '../util/reportUnhandledError';
import { isInteropObservable } from '../util/isInteropObservable';
import { isAsyncIterable } from '../util/isAsyncIterable';
import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
import { isIterable } from '../util/isIterable';
import { isReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike';
import { innerFrom } from './innerFrom';

export function from<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>;
/** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled`. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function from<O extends ObservableInput<any>>(input: O, scheduler: SchedulerLike): Observable<ObservedValueOf<O>>;
export function from<O extends ObservableInput<any>>(input: O, scheduler: SchedulerLike | undefined): Observable<ObservedValueOf<O>>;

/**
* Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.
Expand Down Expand Up @@ -118,124 +107,3 @@ export function from<O extends ObservableInput<any>>(input: O, scheduler: Schedu
export function from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T> {
return scheduler ? scheduled(input, scheduler) : innerFrom(input);
}

// TODO: Use this throughout the library, rather than the `from` above, to avoid
// the unnecessary scheduling check and reduce bundled sizes of operators that use `from`.
// TODO: Eventually, this just becomes `from`, as we don't have the deprecated scheduled path anymore.
export function innerFrom<T>(input: ObservableInput<T>): Observable<T> {
if (input instanceof Observable) {
return input;
}
if (input != null) {
if (isInteropObservable(input)) {
return fromInteropObservable(input);
}
if (isArrayLike(input)) {
return fromArrayLike(input);
}
if (isPromise(input)) {
return fromPromise(input);
}
if (isAsyncIterable(input)) {
return fromAsyncIterable(input);
}
if (isIterable(input)) {
return fromIterable(input);
}
if (isReadableStreamLike(input)) {
return fromReadableStreamLike(input);
}
}

throw createInvalidObservableTypeError(input);
}

/**
* Creates an RxJS Observable from an object that implements `Symbol.observable`.
* @param obj An object that properly implements `Symbol.observable`.
*/
function fromInteropObservable<T>(obj: any) {
return new Observable((subscriber: Subscriber<T>) => {
const obs = obj[Symbol_observable]();
if (isFunction(obs.subscribe)) {
return obs.subscribe(subscriber);
}
// Should be caught by observable subscribe function error handling.
throw new TypeError('Provided object does not correctly implement Symbol.observable');
});
}

/**
* Synchronously emits the values of an array like and completes.
* This is exported because there are creation functions and operators that need to
* make direct use of the same logic, and there's no reason to make them run through
* `from` conditionals because we *know* they're dealing with an array.
* @param array The array to emit values from
*/
export function fromArrayLike<T>(array: ArrayLike<T>) {
return new Observable((subscriber: Subscriber<T>) => {
// Loop over the array and emit each value. Note two things here:
// 1. We're making sure that the subscriber is not closed on each loop.
// This is so we don't continue looping over a very large array after
// something like a `take`, `takeWhile`, or other synchronous unsubscription
// has already unsubscribed.
// 2. In this form, reentrant code can alter that array we're looping over.
// This is a known issue, but considered an edge case. The alternative would
// be to copy the array before executing the loop, but this has
// performance implications.
for (let i = 0; i < array.length && !subscriber.closed; i++) {
subscriber.next(array[i]);
}
subscriber.complete();
});
}

function fromPromise<T>(promise: PromiseLike<T>) {
return new Observable((subscriber: Subscriber<T>) => {
promise
.then(
(value) => {
if (!subscriber.closed) {
subscriber.next(value);
subscriber.complete();
}
},
(err: any) => subscriber.error(err)
)
.then(null, reportUnhandledError);
});
}

function fromIterable<T>(iterable: Iterable<T>) {
return new Observable((subscriber: Subscriber<T>) => {
for (const value of iterable) {
subscriber.next(value);
if (subscriber.closed) {
return;
}
}
subscriber.complete();
});
}

function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T>) {
return new Observable((subscriber: Subscriber<T>) => {
process(asyncIterable, subscriber).catch((err) => subscriber.error(err));
});
}

function fromReadableStreamLike<T>(readableStream: ReadableStreamLike<T>) {
return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream));
}

async function process<T>(asyncIterable: AsyncIterable<T>, subscriber: Subscriber<T>) {
for await (const value of asyncIterable) {
subscriber.next(value);
// A side-effect may have closed our subscriber,
// check before the next iteration.
if (subscriber.closed) {
return;
}
}
subscriber.complete();
}
7 changes: 0 additions & 7 deletions src/internal/observable/fromArray.ts

This file was deleted.

4 changes: 2 additions & 2 deletions src/internal/observable/fromEvent.ts
@@ -1,9 +1,9 @@
import { innerFrom } from '../observable/innerFrom';
import { Observable } from '../Observable';
import { mergeMap } from '../operators/mergeMap';
import { isArrayLike } from '../util/isArrayLike';
import { isFunction } from '../util/isFunction';
import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
import { internalFromArray } from './fromArray';

// These constants are used to create handler registry functions using array mapping below.
const nodeEventEmitterMethods = ['addListener', 'removeListener'] as const;
Expand Down Expand Up @@ -267,7 +267,7 @@ export function fromEvent<T>(
if (!add) {
if (isArrayLike(target)) {
return mergeMap((subTarget: any) => fromEvent(subTarget, eventName, options as EventListenerOptions))(
internalFromArray(target)
innerFrom(target)
) as Observable<T>;
}
}
Expand Down
131 changes: 131 additions & 0 deletions src/internal/observable/innerFrom.ts
@@ -0,0 +1,131 @@
import { isArrayLike } from '../util/isArrayLike';
import { isPromise } from '../util/isPromise';
import { Observable } from '../Observable';
import { ObservableInput, ReadableStreamLike } from '../types';
import { isInteropObservable } from '../util/isInteropObservable';
import { isAsyncIterable } from '../util/isAsyncIterable';
import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
import { isIterable } from '../util/isIterable';
import { isReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike';
import { Subscriber } from '../Subscriber';
import { isFunction } from '../util/isFunction';
import { reportUnhandledError } from '../util/reportUnhandledError';
import { observable as Symbol_observable } from '../symbol/observable';

export function innerFrom<T>(input: ObservableInput<T>): Observable<T> {
if (input instanceof Observable) {
return input;
}
if (input != null) {
if (isInteropObservable(input)) {
return fromInteropObservable(input);
}
if (isArrayLike(input)) {
return fromArrayLike(input);
}
if (isPromise(input)) {
return fromPromise(input);
}
if (isAsyncIterable(input)) {
return fromAsyncIterable(input);
}
if (isIterable(input)) {
return fromIterable(input);
}
if (isReadableStreamLike(input)) {
return fromReadableStreamLike(input);
}
}

throw createInvalidObservableTypeError(input);
}

/**
* Creates an RxJS Observable from an object that implements `Symbol.observable`.
* @param obj An object that properly implements `Symbol.observable`.
*/
export function fromInteropObservable<T>(obj: any) {
return new Observable((subscriber: Subscriber<T>) => {
const obs = obj[Symbol_observable]();
if (isFunction(obs.subscribe)) {
return obs.subscribe(subscriber);
}
// Should be caught by observable subscribe function error handling.
throw new TypeError('Provided object does not correctly implement Symbol.observable');
});
}

/**
* Synchronously emits the values of an array like and completes.
* This is exported because there are creation functions and operators that need to
* make direct use of the same logic, and there's no reason to make them run through
* `from` conditionals because we *know* they're dealing with an array.
* @param array The array to emit values from
*/
export function fromArrayLike<T>(array: ArrayLike<T>) {
return new Observable((subscriber: Subscriber<T>) => {
// Loop over the array and emit each value. Note two things here:
// 1. We're making sure that the subscriber is not closed on each loop.
// This is so we don't continue looping over a very large array after
// something like a `take`, `takeWhile`, or other synchronous unsubscription
// has already unsubscribed.
// 2. In this form, reentrant code can alter that array we're looping over.
// This is a known issue, but considered an edge case. The alternative would
// be to copy the array before executing the loop, but this has
// performance implications.
for (let i = 0; i < array.length && !subscriber.closed; i++) {
subscriber.next(array[i]);
}
subscriber.complete();
});
}

export function fromPromise<T>(promise: PromiseLike<T>) {
return new Observable((subscriber: Subscriber<T>) => {
promise
.then(
(value) => {
if (!subscriber.closed) {
subscriber.next(value);
subscriber.complete();
}
},
(err: any) => subscriber.error(err)
)
.then(null, reportUnhandledError);
});
}

export function fromIterable<T>(iterable: Iterable<T>) {
return new Observable((subscriber: Subscriber<T>) => {
for (const value of iterable) {
subscriber.next(value);
if (subscriber.closed) {
return;
}
}
subscriber.complete();
});
}

export function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T>) {
return new Observable((subscriber: Subscriber<T>) => {
process(asyncIterable, subscriber).catch((err) => subscriber.error(err));
});
}

export function fromReadableStreamLike<T>(readableStream: ReadableStreamLike<T>) {
return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream));
}

async function process<T>(asyncIterable: AsyncIterable<T>, subscriber: Subscriber<T>) {
for await (const value of asyncIterable) {
subscriber.next(value);
// A side-effect may have closed our subscriber,
// check before the next iteration.
if (subscriber.closed) {
return;
}
}
subscriber.complete();
}

0 comments on commit ff5a748

Please sign in to comment.