Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: scheduling with Rx-provided schedulers will no longer leak action references #6562

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion api_guard/dist/types/index.d.ts
Expand Up @@ -251,7 +251,7 @@ export declare function forkJoin<T extends Record<string, ObservableInput<any>>>
}>;

export declare function from<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>;
export declare function from<O extends ObservableInput<any>>(input: O, scheduler: SchedulerLike): Observable<ObservedValueOf<O>>;
export declare function from<O extends ObservableInput<any>>(input: O, scheduler: SchedulerLike | undefined): Observable<ObservedValueOf<O>>;

export declare function fromEvent<T>(target: HasEventTargetAddRemove<T> | ArrayLike<HasEventTargetAddRemove<T>>, eventName: string): Observable<T>;
export declare function fromEvent<T, R>(target: HasEventTargetAddRemove<T> | ArrayLike<HasEventTargetAddRemove<T>>, eventName: string, resultSelector: (event: T) => R): Observable<R>;
Expand Down
3 changes: 2 additions & 1 deletion src/internal/observable/combineLatest.ts
Expand Up @@ -10,6 +10,7 @@ import { popResultSelector, popScheduler } from '../util/args';
import { createObject } from '../util/createObject';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { AnyCatcher } from '../AnyCatcher';
import { executeSchedule } from '../util/executeSchedule';

// combineLatest(any)
// We put this first because we need to catch cases where the user has supplied
Expand Down Expand Up @@ -293,7 +294,7 @@ export function combineLatestInit(
*/
function maybeSchedule(scheduler: SchedulerLike | undefined, execute: () => void, subscription: Subscription) {
if (scheduler) {
subscription.add(scheduler.schedule(execute));
executeSchedule(subscription, scheduler, execute);
} else {
execute();
}
Expand Down
8 changes: 5 additions & 3 deletions src/internal/observable/concat.ts
@@ -1,11 +1,13 @@
import { Observable } from '../Observable';
import { ObservableInputTuple, SchedulerLike } from '../types';
import { concatAll } from '../operators/concatAll';
import { internalFromArray } from './fromArray';
import { popScheduler } from '../util/args';
import { from } from './from';

export function concat<T extends readonly unknown[]>(...inputs: [...ObservableInputTuple<T>]): Observable<T[number]>;
export function concat<T extends readonly unknown[]>(...inputsAndScheduler: [...ObservableInputTuple<T>, SchedulerLike]): Observable<T[number]>;
export function concat<T extends readonly unknown[]>(
...inputsAndScheduler: [...ObservableInputTuple<T>, SchedulerLike]
): Observable<T[number]>;

/**
* Creates an output Observable which sequentially emits all values from the first given
Expand Down Expand Up @@ -113,5 +115,5 @@ export function concat<T extends readonly unknown[]>(...inputsAndScheduler: [...
* Observable subscription on.
*/
export function concat(...args: any[]): Observable<unknown> {
return concatAll()(internalFromArray(args, popScheduler(args)));
return concatAll()(from(args, popScheduler(args)));
}
2 changes: 1 addition & 1 deletion src/internal/observable/defer.ts
@@ -1,6 +1,6 @@
import { Observable } from '../Observable';
import { ObservedValueOf, ObservableInput } from '../types';
import { innerFrom } from './from';
import { innerFrom } from './innerFrom';

/**
* Creates an Observable that, on subscribe, calls an Observable factory to
Expand Down
2 changes: 1 addition & 1 deletion src/internal/observable/dom/fetch.ts
@@ -1,6 +1,6 @@
import { OperatorSubscriber } from '../../operators/OperatorSubscriber';
import { Observable } from '../../Observable';
import { innerFrom } from '../../observable/from';
import { innerFrom } from '../../observable/innerFrom';
import { ObservableInput } from '../../types';

export function fromFetch<T>(
Expand Down
2 changes: 1 addition & 1 deletion src/internal/observable/forkJoin.ts
@@ -1,7 +1,7 @@
import { Observable } from '../Observable';
import { ObservedValueOf, ObservableInputTuple, ObservableInput } from '../types';
import { argsArgArrayOrObject } from '../util/argsArgArrayOrObject';
import { innerFrom } from './from';
import { innerFrom } from './innerFrom';
import { popResultSelector } from '../util/args';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
Expand Down
138 changes: 3 additions & 135 deletions src/internal/observable/from.ts
@@ -1,22 +1,11 @@
import { isArrayLike } from '../util/isArrayLike';
import { isPromise } from '../util/isPromise';
import { observable as Symbol_observable } from '../symbol/observable';
import { Subscriber } from '../Subscriber';

import { Observable } from '../Observable';
import { ObservableInput, SchedulerLike, ObservedValueOf, ReadableStreamLike } from '../types';
import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types';
import { scheduled } from '../scheduled/scheduled';
import { isFunction } from '../util/isFunction';
import { reportUnhandledError } from '../util/reportUnhandledError';
import { isInteropObservable } from '../util/isInteropObservable';
import { isAsyncIterable } from '../util/isAsyncIterable';
import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
import { isIterable } from '../util/isIterable';
import { isReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike';
import { innerFrom } from './innerFrom';

export function from<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>;
/** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled`. Details: https://rxjs.dev/deprecations/scheduler-argument */
export function from<O extends ObservableInput<any>>(input: O, scheduler: SchedulerLike): Observable<ObservedValueOf<O>>;
export function from<O extends ObservableInput<any>>(input: O, scheduler: SchedulerLike | undefined): Observable<ObservedValueOf<O>>;

/**
* Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.
Expand Down Expand Up @@ -118,124 +107,3 @@ export function from<O extends ObservableInput<any>>(input: O, scheduler: Schedu
export function from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T> {
return scheduler ? scheduled(input, scheduler) : innerFrom(input);
}

// TODO: Use this throughout the library, rather than the `from` above, to avoid
// the unnecessary scheduling check and reduce bundled sizes of operators that use `from`.
// TODO: Eventually, this just becomes `from`, as we don't have the deprecated scheduled path anymore.
export function innerFrom<T>(input: ObservableInput<T>): Observable<T> {
if (input instanceof Observable) {
return input;
}
if (input != null) {
if (isInteropObservable(input)) {
return fromInteropObservable(input);
}
if (isArrayLike(input)) {
return fromArrayLike(input);
}
if (isPromise(input)) {
return fromPromise(input);
}
if (isAsyncIterable(input)) {
return fromAsyncIterable(input);
}
if (isIterable(input)) {
return fromIterable(input);
}
if (isReadableStreamLike(input)) {
return fromReadableStreamLike(input);
}
}

throw createInvalidObservableTypeError(input);
}

/**
* Creates an RxJS Observable from an object that implements `Symbol.observable`.
* @param obj An object that properly implements `Symbol.observable`.
*/
function fromInteropObservable<T>(obj: any) {
return new Observable((subscriber: Subscriber<T>) => {
const obs = obj[Symbol_observable]();
if (isFunction(obs.subscribe)) {
return obs.subscribe(subscriber);
}
// Should be caught by observable subscribe function error handling.
throw new TypeError('Provided object does not correctly implement Symbol.observable');
});
}

/**
* Synchronously emits the values of an array like and completes.
* This is exported because there are creation functions and operators that need to
* make direct use of the same logic, and there's no reason to make them run through
* `from` conditionals because we *know* they're dealing with an array.
* @param array The array to emit values from
*/
export function fromArrayLike<T>(array: ArrayLike<T>) {
return new Observable((subscriber: Subscriber<T>) => {
// Loop over the array and emit each value. Note two things here:
// 1. We're making sure that the subscriber is not closed on each loop.
// This is so we don't continue looping over a very large array after
// something like a `take`, `takeWhile`, or other synchronous unsubscription
// has already unsubscribed.
// 2. In this form, reentrant code can alter that array we're looping over.
// This is a known issue, but considered an edge case. The alternative would
// be to copy the array before executing the loop, but this has
// performance implications.
for (let i = 0; i < array.length && !subscriber.closed; i++) {
subscriber.next(array[i]);
}
subscriber.complete();
});
}

function fromPromise<T>(promise: PromiseLike<T>) {
return new Observable((subscriber: Subscriber<T>) => {
promise
.then(
(value) => {
if (!subscriber.closed) {
subscriber.next(value);
subscriber.complete();
}
},
(err: any) => subscriber.error(err)
)
.then(null, reportUnhandledError);
});
}

function fromIterable<T>(iterable: Iterable<T>) {
return new Observable((subscriber: Subscriber<T>) => {
for (const value of iterable) {
subscriber.next(value);
if (subscriber.closed) {
return;
}
}
subscriber.complete();
});
}

function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T>) {
return new Observable((subscriber: Subscriber<T>) => {
process(asyncIterable, subscriber).catch((err) => subscriber.error(err));
});
}

function fromReadableStreamLike<T>(readableStream: ReadableStreamLike<T>) {
return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream));
}

async function process<T>(asyncIterable: AsyncIterable<T>, subscriber: Subscriber<T>) {
for await (const value of asyncIterable) {
subscriber.next(value);
// A side-effect may have closed our subscriber,
// check before the next iteration.
if (subscriber.closed) {
return;
}
}
subscriber.complete();
}
7 changes: 0 additions & 7 deletions src/internal/observable/fromArray.ts

This file was deleted.

4 changes: 2 additions & 2 deletions src/internal/observable/fromEvent.ts
@@ -1,9 +1,9 @@
import { innerFrom } from '../observable/innerFrom';
import { Observable } from '../Observable';
import { mergeMap } from '../operators/mergeMap';
import { isArrayLike } from '../util/isArrayLike';
import { isFunction } from '../util/isFunction';
import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
import { internalFromArray } from './fromArray';

// These constants are used to create handler registry functions using array mapping below.
const nodeEventEmitterMethods = ['addListener', 'removeListener'] as const;
Expand Down Expand Up @@ -267,7 +267,7 @@ export function fromEvent<T>(
if (!add) {
if (isArrayLike(target)) {
return mergeMap((subTarget: any) => fromEvent(subTarget, eventName, options as EventListenerOptions))(
internalFromArray(target)
innerFrom(target)
) as Observable<T>;
}
}
Expand Down
131 changes: 131 additions & 0 deletions src/internal/observable/innerFrom.ts
@@ -0,0 +1,131 @@
import { isArrayLike } from '../util/isArrayLike';
import { isPromise } from '../util/isPromise';
import { Observable } from '../Observable';
import { ObservableInput, ReadableStreamLike } from '../types';
import { isInteropObservable } from '../util/isInteropObservable';
import { isAsyncIterable } from '../util/isAsyncIterable';
import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
import { isIterable } from '../util/isIterable';
import { isReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike';
import { Subscriber } from '../Subscriber';
import { isFunction } from '../util/isFunction';
import { reportUnhandledError } from '../util/reportUnhandledError';
import { observable as Symbol_observable } from '../symbol/observable';

export function innerFrom<T>(input: ObservableInput<T>): Observable<T> {
if (input instanceof Observable) {
return input;
}
if (input != null) {
if (isInteropObservable(input)) {
return fromInteropObservable(input);
}
if (isArrayLike(input)) {
return fromArrayLike(input);
}
if (isPromise(input)) {
return fromPromise(input);
}
if (isAsyncIterable(input)) {
return fromAsyncIterable(input);
}
if (isIterable(input)) {
return fromIterable(input);
}
if (isReadableStreamLike(input)) {
return fromReadableStreamLike(input);
}
}

throw createInvalidObservableTypeError(input);
}

/**
* Creates an RxJS Observable from an object that implements `Symbol.observable`.
* @param obj An object that properly implements `Symbol.observable`.
*/
export function fromInteropObservable<T>(obj: any) {
return new Observable((subscriber: Subscriber<T>) => {
const obs = obj[Symbol_observable]();
if (isFunction(obs.subscribe)) {
return obs.subscribe(subscriber);
}
// Should be caught by observable subscribe function error handling.
throw new TypeError('Provided object does not correctly implement Symbol.observable');
});
}

/**
* Synchronously emits the values of an array like and completes.
* This is exported because there are creation functions and operators that need to
* make direct use of the same logic, and there's no reason to make them run through
* `from` conditionals because we *know* they're dealing with an array.
* @param array The array to emit values from
*/
export function fromArrayLike<T>(array: ArrayLike<T>) {
return new Observable((subscriber: Subscriber<T>) => {
// Loop over the array and emit each value. Note two things here:
// 1. We're making sure that the subscriber is not closed on each loop.
// This is so we don't continue looping over a very large array after
// something like a `take`, `takeWhile`, or other synchronous unsubscription
// has already unsubscribed.
// 2. In this form, reentrant code can alter that array we're looping over.
// This is a known issue, but considered an edge case. The alternative would
// be to copy the array before executing the loop, but this has
// performance implications.
for (let i = 0; i < array.length && !subscriber.closed; i++) {
subscriber.next(array[i]);
}
subscriber.complete();
});
}

export function fromPromise<T>(promise: PromiseLike<T>) {
return new Observable((subscriber: Subscriber<T>) => {
promise
.then(
(value) => {
if (!subscriber.closed) {
subscriber.next(value);
subscriber.complete();
}
},
(err: any) => subscriber.error(err)
)
.then(null, reportUnhandledError);
});
}

export function fromIterable<T>(iterable: Iterable<T>) {
return new Observable((subscriber: Subscriber<T>) => {
for (const value of iterable) {
subscriber.next(value);
if (subscriber.closed) {
return;
}
}
subscriber.complete();
});
}

export function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T>) {
return new Observable((subscriber: Subscriber<T>) => {
process(asyncIterable, subscriber).catch((err) => subscriber.error(err));
});
}

export function fromReadableStreamLike<T>(readableStream: ReadableStreamLike<T>) {
return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream));
}

async function process<T>(asyncIterable: AsyncIterable<T>, subscriber: Subscriber<T>) {
for await (const value of asyncIterable) {
subscriber.next(value);
// A side-effect may have closed our subscriber,
// check before the next iteration.
if (subscriber.closed) {
return;
}
}
subscriber.complete();
}