Skip to content

Commit

Permalink
feat(onErrorResumeNextWith): renamed onErrorResumeNext and exported…
Browse files Browse the repository at this point in the history
… from top level

The `onErrorResumeNext` operator is deprecated due to a rename. The rename was done so we can move all operator exports to the top level `rxjs` export site. We were not currently exporting that operator.

This commit also refactors to logic to be based on the creation function, which is more widely used
  • Loading branch information
benlesh committed Jan 11, 2022
1 parent 17380f1 commit eb50553
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 43 deletions.
3 changes: 3 additions & 0 deletions api_guard/dist/types/index.d.ts
Expand Up @@ -500,6 +500,9 @@ export declare function of<A extends readonly unknown[]>(...values: A): Observab
export declare function onErrorResumeNext<A extends readonly unknown[]>(sources: [...ObservableInputTuple<A>]): Observable<A[number]>;
export declare function onErrorResumeNext<A extends readonly unknown[]>(...sources: [...ObservableInputTuple<A>]): Observable<A[number]>;

export declare function onErrorResumeNextWith<T, A extends readonly unknown[]>(sources: [...ObservableInputTuple<A>]): OperatorFunction<T, T | A[number]>;
export declare function onErrorResumeNextWith<T, A extends readonly unknown[]>(...sources: [...ObservableInputTuple<A>]): OperatorFunction<T, T | A[number]>;

export interface Operator<T, R> {
call(subscriber: Subscriber<R>, source: any): TeardownLogic;
}
Expand Down
3 changes: 1 addition & 2 deletions api_guard/dist/types/operators/index.d.ts
Expand Up @@ -191,8 +191,7 @@ export declare function multicast<T, O extends ObservableInput<any>>(subjectFact

export declare function observeOn<T>(scheduler: SchedulerLike, delay?: number): MonoTypeOperatorFunction<T>;

export declare function onErrorResumeNext<T, A extends readonly unknown[]>(sources: [...ObservableInputTuple<A>]): OperatorFunction<T, T | A[number]>;
export declare function onErrorResumeNext<T, A extends readonly unknown[]>(...sources: [...ObservableInputTuple<A>]): OperatorFunction<T, T | A[number]>;
export declare const onErrorResumeNext: typeof onErrorResumeNextWith;

export declare function pairwise<T>(): OperatorFunction<T, [T, T]>;

Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Expand Up @@ -155,6 +155,7 @@ export { mergeWith } from './internal/operators/mergeWith';
export { min } from './internal/operators/min';
export { multicast } from './internal/operators/multicast';
export { observeOn } from './internal/operators/observeOn';
export { onErrorResumeNextWith } from './internal/operators/onErrorResumeNextWith';
export { pairwise } from './internal/operators/pairwise';
export { pluck } from './internal/operators/pluck';
export { publish } from './internal/operators/publish';
Expand Down
28 changes: 25 additions & 3 deletions src/internal/observable/onErrorResumeNext.ts
@@ -1,8 +1,9 @@
import { Observable } from '../Observable';
import { ObservableInputTuple } from '../types';
import { EMPTY } from './empty';
import { onErrorResumeNext as onErrorResumeNextWith } from '../operators/onErrorResumeNext';
import { argsOrArgArray } from '../util/argsOrArgArray';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { noop } from '../util/noop';
import { innerFrom } from './innerFrom';

/* tslint:disable:max-line-length */
export function onErrorResumeNext<A extends readonly unknown[]>(sources: [...ObservableInputTuple<A>]): Observable<A[number]>;
Expand Down Expand Up @@ -75,5 +76,26 @@ export function onErrorResumeNext<A extends readonly unknown[]>(...sources: [...
export function onErrorResumeNext<A extends readonly unknown[]>(
...sources: [[...ObservableInputTuple<A>]] | [...ObservableInputTuple<A>]
): Observable<A[number]> {
return onErrorResumeNextWith(argsOrArgArray(sources))(EMPTY);
const nextSources: ObservableInputTuple<A> = argsOrArgArray(sources) as any;

return new Observable((subscriber) => {
let sourceIndex = 0;
const subscribeNext = () => {
if (sourceIndex < nextSources.length) {
let nextSource: Observable<A[number]>;
try {
nextSource = innerFrom(nextSources[sourceIndex++]);
} catch (err) {
subscribeNext();
return;
}
const innerSubscriber = new OperatorSubscriber(subscriber, undefined, noop, noop);
nextSource.subscribe(innerSubscriber);
innerSubscriber.add(subscribeNext);
} else {
subscriber.complete();
}
};
subscribeNext();
});
}
@@ -1,15 +1,11 @@
import { Observable } from '../Observable';
import { ObservableInputTuple, OperatorFunction } from '../types';
import { operate } from '../util/lift';
import { innerFrom } from '../observable/innerFrom';
import { argsOrArgArray } from '../util/argsOrArgArray';
import { OperatorSubscriber } from './OperatorSubscriber';
import { noop } from '../util/noop';
import { onErrorResumeNext as oERNCreate } from '../observable/onErrorResumeNext';

export function onErrorResumeNext<T, A extends readonly unknown[]>(
export function onErrorResumeNextWith<T, A extends readonly unknown[]>(
sources: [...ObservableInputTuple<A>]
): OperatorFunction<T, T | A[number]>;
export function onErrorResumeNext<T, A extends readonly unknown[]>(
export function onErrorResumeNextWith<T, A extends readonly unknown[]>(
...sources: [...ObservableInputTuple<A>]
): OperatorFunction<T, T | A[number]>;

Expand Down Expand Up @@ -85,41 +81,19 @@ export function onErrorResumeNext<T, A extends readonly unknown[]>(
* Observable, but - if it errors - subscribes to the next passed Observable
* and so on, until it completes or runs out of Observables.
*/
export function onErrorResumeNext<T, A extends readonly unknown[]>(
export function onErrorResumeNextWith<T, A extends readonly unknown[]>(
...sources: [[...ObservableInputTuple<A>]] | [...ObservableInputTuple<A>]
): OperatorFunction<T, T | A[number]> {
// For some reason, TS 4.1 RC gets the inference wrong here and infers the
// result to be `A[number][]` - completely dropping the ObservableInput part
// of the type. This makes no sense whatsoever. As a workaround, the type is
// asserted explicitly.
const nextSources = (argsOrArgArray(sources) as unknown) as ObservableInputTuple<A>;
const nextSources = argsOrArgArray(sources) as unknown as ObservableInputTuple<A>;

return operate((source, subscriber) => {
const remaining = [source, ...nextSources];
const subscribeNext = () => {
if (!subscriber.closed) {
if (remaining.length > 0) {
let nextSource: Observable<A[number]>;
try {
nextSource = innerFrom<T | A[number]>(remaining.shift()!);
} catch (err) {
subscribeNext();
return;
}

// Here we have to use one of our Subscribers, or it does not wire up
// The `closed` property of upstream Subscribers synchronously, that
// would result in situation were we could not stop a synchronous firehose
// with something like `take(3)`.
const innerSub = new OperatorSubscriber(subscriber, undefined, noop, noop);
subscriber.add(nextSource.subscribe(innerSub));
innerSub.add(subscribeNext);
} else {
subscriber.complete();
}
}
};

subscribeNext();
});
return (source) => oERNCreate(source, ...nextSources);
}

/**
* @deprecated Renamed. Use {@link onErrorResumeNextWith} instead. Will be removed in v8.
*/
export const onErrorResumeNext = onErrorResumeNextWith;
2 changes: 1 addition & 1 deletion src/operators/index.ts
Expand Up @@ -57,7 +57,7 @@ export { mergeWith } from '../internal/operators/mergeWith';
export { min } from '../internal/operators/min';
export { multicast } from '../internal/operators/multicast';
export { observeOn } from '../internal/operators/observeOn';
export { onErrorResumeNext } from '../internal/operators/onErrorResumeNext';
export { onErrorResumeNext } from '../internal/operators/onErrorResumeNextWith';
export { pairwise } from '../internal/operators/pairwise';
export { partition } from '../internal/operators/partition';
export { pluck } from '../internal/operators/pluck';
Expand Down

0 comments on commit eb50553

Please sign in to comment.