Skip to content

Commit

Permalink
refactor: Remove unnecessary addition of subscription to subscriber (#…
Browse files Browse the repository at this point in the history
…6754)

Removes the subscriber.add call because an RxJS native OperatorSubscriber will automatically do that upon subscribing.

Also adds an additional test to sanity check that the previous finalization occurs before the next subscription.
  • Loading branch information
benlesh committed Jan 12, 2022
1 parent 769d180 commit 443ac7e
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 3 deletions.
54 changes: 53 additions & 1 deletion spec/operators/onErrorResumeNext-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/** @prettier */
import { expect } from 'chai';
import { TestScheduler } from 'rxjs/testing';
import { onErrorResumeNext, take, finalize } from 'rxjs/operators';
import { onErrorResumeNext, take, finalize, tap } from 'rxjs/operators';
import { concat, throwError, of, Observable } from 'rxjs';
import { asInteropObservable } from '../helpers/interop-helper';
import { observableMatcher } from '../helpers/observableMatcher';
Expand Down Expand Up @@ -261,4 +261,56 @@ describe('onErrorResumeNext', () => {

expect(results).to.deep.equal([1, 'finalize 1', 2, 'finalize 2', 3, 'finalize 3', 4, 'finalize 4', 'complete']);
});

it('should not subscribe to the next source until after the previous is finalized.', () => {
const results: any[] = [];

of(1)
.pipe(
tap({
subscribe: () => results.push('subscribe 1'),
finalize: () => results.push('finalize 1'),
}),
onErrorResumeNext(
of(2).pipe(
tap({
subscribe: () => results.push('subscribe 2'),
finalize: () => results.push('finalize 2'),
})
),
of(3).pipe(
tap({
subscribe: () => results.push('subscribe 3'),
finalize: () => results.push('finalize 3'),
})
),
of(4).pipe(
tap({
subscribe: () => results.push('subscribe 4'),
finalize: () => results.push('finalize 4'),
})
)
)
)
.subscribe({
next: (value) => results.push(value),
complete: () => results.push('complete'),
});

expect(results).to.deep.equal([
'subscribe 1',
1,
'finalize 1',
'subscribe 2',
2,
'finalize 2',
'subscribe 3',
3,
'finalize 3',
'subscribe 4',
4,
'finalize 4',
'complete',
]);
});
});
4 changes: 2 additions & 2 deletions src/internal/operators/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export function onErrorResumeNext<T, A extends readonly unknown[]>(
// result to be `A[number][]` - completely dropping the ObservableInput part
// of the type. This makes no sense whatsoever. As a workaround, the type is
// asserted explicitly.
const nextSources = (argsOrArgArray(sources) as unknown) as ObservableInputTuple<A>;
const nextSources = argsOrArgArray(sources) as unknown as ObservableInputTuple<A>;

return operate((source, subscriber) => {
const remaining = [source, ...nextSources];
Expand All @@ -112,7 +112,7 @@ export function onErrorResumeNext<T, A extends readonly unknown[]>(
// would result in situation were we could not stop a synchronous firehose
// with something like `take(3)`.
const innerSub = new OperatorSubscriber(subscriber, undefined, noop, noop);
subscriber.add(nextSource.subscribe(innerSub));
nextSource.subscribe(innerSub);
innerSub.add(subscribeNext);
} else {
subscriber.complete();
Expand Down

0 comments on commit 443ac7e

Please sign in to comment.