From 1d968314ba49d9b41440eea40ea05caedd381e64 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 7 Apr 2021 12:54:52 -0500 Subject: [PATCH] refactor(OperatorSubscriber): Move error handler arg (#6198) * refactor(OperatorSubscriber): Move error handler arg Moves the error handler arg, because in most cases we were passing `undefined` and then a complete handler * chore: remove pointless comment --- .../observable/ConnectableObservable.ts | 8 ++++---- src/internal/observable/combineLatest.ts | 1 - src/internal/observable/dom/fetch.ts | 5 ++--- src/internal/observable/forkJoin.ts | 1 - src/internal/observable/zip.ts | 2 -- src/internal/operators/OperatorSubscriber.ts | 2 +- src/internal/operators/audit.ts | 3 +-- src/internal/operators/buffer.ts | 4 ---- src/internal/operators/bufferCount.ts | 3 ++- src/internal/operators/bufferTime.ts | 3 ++- src/internal/operators/bufferToggle.ts | 6 +----- src/internal/operators/bufferWhen.ts | 6 +++--- src/internal/operators/catchError.ts | 2 +- src/internal/operators/debounce.ts | 5 +++-- src/internal/operators/debounceTime.ts | 3 ++- src/internal/operators/defaultIfEmpty.ts | 1 - src/internal/operators/distinct.ts | 2 +- src/internal/operators/every.ts | 1 - src/internal/operators/exhaustAll.ts | 3 +-- src/internal/operators/exhaustMap.ts | 3 +-- src/internal/operators/find.ts | 1 - src/internal/operators/groupBy.ts | 8 ++++---- src/internal/operators/isEmpty.ts | 1 - src/internal/operators/materialize.ts | 8 ++++---- src/internal/operators/mergeInternals.ts | 20 +++++++------------ src/internal/operators/observeOn.ts | 4 ++-- src/internal/operators/repeat.ts | 2 +- src/internal/operators/repeatWhen.ts | 3 +-- src/internal/operators/retry.ts | 2 ++ src/internal/operators/retryWhen.ts | 2 +- src/internal/operators/sample.ts | 2 +- src/internal/operators/scanInternals.ts | 1 - src/internal/operators/sequenceEqual.ts | 1 - src/internal/operators/single.ts | 1 - src/internal/operators/skipUntil.ts | 1 - src/internal/operators/switchMap.ts | 2 -- src/internal/operators/takeLast.ts | 3 ++- src/internal/operators/takeUntil.ts | 2 +- src/internal/operators/tap.ts | 8 ++++---- src/internal/operators/throttle.ts | 5 +---- src/internal/operators/throwIfEmpty.ts | 1 - src/internal/operators/window.ts | 8 ++++---- src/internal/operators/windowCount.ts | 12 +++++------ src/internal/operators/windowTime.ts | 6 +++--- src/internal/operators/windowToggle.ts | 5 ++--- src/internal/operators/windowWhen.ts | 4 ++-- src/internal/operators/withLatestFrom.ts | 1 - 47 files changed, 73 insertions(+), 105 deletions(-) diff --git a/src/internal/observable/ConnectableObservable.ts b/src/internal/observable/ConnectableObservable.ts index 2932ea5b2d..fa85142bf8 100644 --- a/src/internal/observable/ConnectableObservable.ts +++ b/src/internal/observable/ConnectableObservable.ts @@ -66,14 +66,14 @@ export class ConnectableObservable extends Observable { new OperatorSubscriber( subject as any, undefined, - (err) => { - this._teardown(); - subject.error(err); - }, () => { this._teardown(); subject.complete(); }, + (err) => { + this._teardown(); + subject.error(err); + }, () => this._teardown() ) ) diff --git a/src/internal/observable/combineLatest.ts b/src/internal/observable/combineLatest.ts index d88c37b829..2065de3a81 100644 --- a/src/internal/observable/combineLatest.ts +++ b/src/internal/observable/combineLatest.ts @@ -251,7 +251,6 @@ export function combineLatestInit( subscriber.next(valueTransform(values.slice())); } }, - undefined, () => { if (!--active) { // We only complete the result if we have no more active diff --git a/src/internal/observable/dom/fetch.ts b/src/internal/observable/dom/fetch.ts index 8500a2690c..5a32723b8f 100644 --- a/src/internal/observable/dom/fetch.ts +++ b/src/internal/observable/dom/fetch.ts @@ -154,13 +154,12 @@ export function fromFetch( subscriber, // Values are passed through to the subscriber undefined, - // Error handling - handleError, // The projected response is complete. () => { abortable = false; subscriber.complete(); - } + }, + handleError ) ); } else { diff --git a/src/internal/observable/forkJoin.ts b/src/internal/observable/forkJoin.ts index dbe9f18ffc..a982921dd1 100644 --- a/src/internal/observable/forkJoin.ts +++ b/src/internal/observable/forkJoin.ts @@ -152,7 +152,6 @@ export function forkJoin(...args: any[]): Observable { } values[sourceIndex] = value; }, - undefined, () => { if (!--remainingCompletions || !hasValue) { if (!remainingEmissions) { diff --git a/src/internal/observable/zip.ts b/src/internal/observable/zip.ts index 640fcc0a77..b776690cbe 100644 --- a/src/internal/observable/zip.ts +++ b/src/internal/observable/zip.ts @@ -95,8 +95,6 @@ export function zip(...args: unknown[]): Observable { } } }, - // Any error is passed through the result. - undefined, () => { // This source completed. Mark it as complete so we can check it later // if we have to. diff --git a/src/internal/operators/OperatorSubscriber.ts b/src/internal/operators/OperatorSubscriber.ts index 740eaeb6fb..b910393e7c 100644 --- a/src/internal/operators/OperatorSubscriber.ts +++ b/src/internal/operators/OperatorSubscriber.ts @@ -20,8 +20,8 @@ export class OperatorSubscriber extends Subscriber { constructor( destination: Subscriber, onNext?: (value: T) => void, - onError?: (err: any) => void, onComplete?: () => void, + onError?: (err: any) => void, private onFinalize?: () => void ) { // It's important - for performance reasons - that all of this class's diff --git a/src/internal/operators/audit.ts b/src/internal/operators/audit.ts index eaa7da4e61..cf08523520 100644 --- a/src/internal/operators/audit.ts +++ b/src/internal/operators/audit.ts @@ -81,11 +81,10 @@ export function audit(durationSelector: (value: T) => ObservableInput): lastValue = value; if (!durationSubscriber) { innerFrom(durationSelector(value)).subscribe( - (durationSubscriber = new OperatorSubscriber(subscriber, endDuration, undefined, cleanupDuration)) + (durationSubscriber = new OperatorSubscriber(subscriber, endDuration, cleanupDuration)) ); } }, - undefined, () => { isComplete = true; (!hasValue || !durationSubscriber || durationSubscriber.closed) && subscriber.complete(); diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index 47c75b5f45..e7631aaecb 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -52,8 +52,6 @@ export function buffer(closingNotifier: Observable): OperatorFunction currentBuffer.push(value), - // Pass all errors to the consumer. - undefined, () => { subscriber.next(currentBuffer); subscriber.complete(); @@ -71,8 +69,6 @@ export function buffer(closingNotifier: Observable): OperatorFunction(bufferSize: number, startBufferEvery: number | nu } } }, - undefined, () => { // When the source completes, emit all of our // active buffers. @@ -111,6 +110,8 @@ export function bufferCount(bufferSize: number, startBufferEvery: number | nu } subscriber.complete(); }, + // Pass all errors through to consumer. + undefined, () => { // Clean up our memory when we teardown buffers = null!; diff --git a/src/internal/operators/bufferTime.ts b/src/internal/operators/bufferTime.ts index 042f701bf9..f7af350fc4 100644 --- a/src/internal/operators/bufferTime.ts +++ b/src/internal/operators/bufferTime.ts @@ -151,7 +151,6 @@ export function bufferTime(bufferTimeSpan: number, ...otherArgs: any[]): Oper maxBufferSize <= buffer.length && emit(record); } }, - undefined, () => { // The source completed, emit all of the active // buffers we have before we complete. @@ -162,6 +161,8 @@ export function bufferTime(bufferTimeSpan: number, ...otherArgs: any[]): Oper subscriber.complete(); subscriber.unsubscribe(); }, + // Pass all errors through to consumer. + undefined, // Clean up () => (bufferRecords = null) ); diff --git a/src/internal/operators/bufferToggle.ts b/src/internal/operators/bufferToggle.ts index 8ea0fd6409..a78c154157 100644 --- a/src/internal/operators/bufferToggle.ts +++ b/src/internal/operators/bufferToggle.ts @@ -75,11 +75,8 @@ export function bufferToggle( }; // The line below will add the subscription to the parent subscriber *and* the closing subscription. - closingSubscription.add( - innerFrom(closingSelector(openValue)).subscribe(new OperatorSubscriber(subscriber, emitBuffer, undefined, noop)) - ); + closingSubscription.add(innerFrom(closingSelector(openValue)).subscribe(new OperatorSubscriber(subscriber, emitBuffer, noop))); }, - undefined, noop ) ); @@ -93,7 +90,6 @@ export function bufferToggle( buffer.push(value); } }, - undefined, () => { // Source complete. Emit all pending buffers. while (buffers.length > 0) { diff --git a/src/internal/operators/bufferWhen.ts b/src/internal/operators/bufferWhen.ts index 17e707efb0..79df6cc9c1 100644 --- a/src/internal/operators/bufferWhen.ts +++ b/src/internal/operators/bufferWhen.ts @@ -68,7 +68,7 @@ export function bufferWhen(closingSelector: () => ObservableInput): Oper b && subscriber.next(b); // Get a new closing notifier and subscribe to it. - innerFrom(closingSelector()).subscribe((closingSubscriber = new OperatorSubscriber(subscriber, openBuffer, undefined, noop))); + innerFrom(closingSelector()).subscribe((closingSubscriber = new OperatorSubscriber(subscriber, openBuffer, noop))); }; // Start the first buffer. @@ -80,14 +80,14 @@ export function bufferWhen(closingSelector: () => ObservableInput): Oper subscriber, // Add every new value to the current buffer. (value) => buffer?.push(value), - // All errors are passed through to the consumer. - undefined, // When we complete, emit the buffer if we have one, // then complete the result. () => { buffer && subscriber.next(buffer); subscriber.complete(); }, + // Pass all errors through to consumer. + undefined, // Release memory on teardown () => (buffer = closingSubscriber = null!) ) diff --git a/src/internal/operators/catchError.ts b/src/internal/operators/catchError.ts index fe120d31fd..1b9da6f904 100644 --- a/src/internal/operators/catchError.ts +++ b/src/internal/operators/catchError.ts @@ -112,7 +112,7 @@ export function catchError>( let handledResult: Observable>; innerSub = source.subscribe( - new OperatorSubscriber(subscriber, undefined, (err) => { + new OperatorSubscriber(subscriber, undefined, undefined, (err) => { handledResult = innerFrom(selector(err, catchError(selector)(source))); if (innerSub) { innerSub.unsubscribe(); diff --git a/src/internal/operators/debounce.ts b/src/internal/operators/debounce.ts index 87b4f15e6b..e7df6ed3bb 100644 --- a/src/internal/operators/debounce.ts +++ b/src/internal/operators/debounce.ts @@ -96,17 +96,18 @@ export function debounce(durationSelector: (value: T) => ObservableInput lastValue = value; // Capture our duration subscriber, so we can unsubscribe it when we're notified // and we're going to emit the value. - durationSubscriber = new OperatorSubscriber(subscriber, emit, undefined, noop); + durationSubscriber = new OperatorSubscriber(subscriber, emit, noop); // Subscribe to the duration. innerFrom(durationSelector(value)).subscribe(durationSubscriber); }, - undefined, () => { // Source completed. // Emit any pending debounced values then complete emit(); subscriber.complete(); }, + // Pass all errors through to consumer + undefined, () => { // Teardown. lastValue = durationSubscriber = null; diff --git a/src/internal/operators/debounceTime.ts b/src/internal/operators/debounceTime.ts index e72af673e7..78cd632c63 100644 --- a/src/internal/operators/debounceTime.ts +++ b/src/internal/operators/debounceTime.ts @@ -104,13 +104,14 @@ export function debounceTime(dueTime: number, scheduler: SchedulerLike = asyn activeTask = scheduler.schedule(emitWhenIdle, dueTime); } }, - undefined, () => { // Source completed. // Emit any pending debounced values then complete emit(); subscriber.complete(); }, + // Pass all errors through to consumer. + undefined, () => { // Teardown. lastValue = activeTask = null; diff --git a/src/internal/operators/defaultIfEmpty.ts b/src/internal/operators/defaultIfEmpty.ts index 24ba90c23f..b9af8f1779 100644 --- a/src/internal/operators/defaultIfEmpty.ts +++ b/src/internal/operators/defaultIfEmpty.ts @@ -46,7 +46,6 @@ export function defaultIfEmpty(defaultValue: R): OperatorFunction { if (!hasValue) { subscriber.next(defaultValue!); diff --git a/src/internal/operators/distinct.ts b/src/internal/operators/distinct.ts index 8faa00701a..b713204fdc 100644 --- a/src/internal/operators/distinct.ts +++ b/src/internal/operators/distinct.ts @@ -84,6 +84,6 @@ export function distinct(keySelector?: (value: T) => K, flushes?: Observab }) ); - flushes?.subscribe(new OperatorSubscriber(subscriber, () => distinctKeys.clear(), undefined, noop)); + flushes?.subscribe(new OperatorSubscriber(subscriber, () => distinctKeys.clear(), noop)); }); } diff --git a/src/internal/operators/every.ts b/src/internal/operators/every.ts index 09e8568f1c..0d91df0e9a 100644 --- a/src/internal/operators/every.ts +++ b/src/internal/operators/every.ts @@ -53,7 +53,6 @@ export function every( subscriber.complete(); } }, - undefined, () => { subscriber.next(true); subscriber.complete(); diff --git a/src/internal/operators/exhaustAll.ts b/src/internal/operators/exhaustAll.ts index 54d9927a37..4548408559 100644 --- a/src/internal/operators/exhaustAll.ts +++ b/src/internal/operators/exhaustAll.ts @@ -57,14 +57,13 @@ export function exhaustAll>(): OperatorFunction { if (!innerSub) { innerSub = innerFrom(inner).subscribe( - new OperatorSubscriber(subscriber, undefined, undefined, () => { + new OperatorSubscriber(subscriber, undefined, () => { innerSub = null; isComplete && subscriber.complete(); }) ); } }, - undefined, () => { isComplete = true; !innerSub && subscriber.complete(); diff --git a/src/internal/operators/exhaustMap.ts b/src/internal/operators/exhaustMap.ts index 08efd367de..fe2532b284 100644 --- a/src/internal/operators/exhaustMap.ts +++ b/src/internal/operators/exhaustMap.ts @@ -83,14 +83,13 @@ export function exhaustMap>( subscriber, (outerValue) => { if (!innerSub) { - innerSub = new OperatorSubscriber(subscriber, undefined, undefined, () => { + innerSub = new OperatorSubscriber(subscriber, undefined, () => { innerSub = null; isComplete && subscriber.complete(); }); innerFrom(project(outerValue, index++)).subscribe(innerSub); } }, - undefined, () => { isComplete = true; !innerSub && subscriber.complete(); diff --git a/src/internal/operators/find.ts b/src/internal/operators/find.ts index f97736fe80..c09494574a 100644 --- a/src/internal/operators/find.ts +++ b/src/internal/operators/find.ts @@ -79,7 +79,6 @@ export function createFind( subscriber.complete(); } }, - undefined, () => { subscriber.next(findIndex ? -1 : undefined); subscriber.complete(); diff --git a/src/internal/operators/groupBy.ts b/src/internal/operators/groupBy.ts index bbe9d19296..cc77f3d03c 100644 --- a/src/internal/operators/groupBy.ts +++ b/src/internal/operators/groupBy.ts @@ -175,11 +175,11 @@ export function groupBy( group!.complete(); durationSubscriber?.unsubscribe(); }, + // Completions are also sent to the group, but just the group. + undefined, // Errors on the duration subscriber are sent to the group // but only the group. They are not sent to the main subscription. undefined, - // Completions are also sent to the group, but just the group. - undefined, // Teardown: Remove this group from our map. () => groups.delete(key) ); @@ -195,10 +195,10 @@ export function groupBy( handleError(err); } }, - // Error from the source. - handleError, // Source completes. () => notify((consumer) => consumer.complete()), + // Error from the source. + handleError, // Free up memory. // When the source subscription is _finally_ torn down, release the subjects and keys // in our groups Map, they may be quite large and we don't want to keep them around if we diff --git a/src/internal/operators/isEmpty.ts b/src/internal/operators/isEmpty.ts index 73ab44622a..4c7996b906 100644 --- a/src/internal/operators/isEmpty.ts +++ b/src/internal/operators/isEmpty.ts @@ -74,7 +74,6 @@ export function isEmpty(): OperatorFunction { subscriber.next(false); subscriber.complete(); }, - undefined, () => { subscriber.next(true); subscriber.complete(); diff --git a/src/internal/operators/materialize.ts b/src/internal/operators/materialize.ts index f9be446323..c713e367d8 100644 --- a/src/internal/operators/materialize.ts +++ b/src/internal/operators/materialize.ts @@ -65,13 +65,13 @@ export function materialize(): OperatorFunction & Observab (value) => { subscriber.next(Notification.createNext(value)); }, - (err) => { - subscriber.next(Notification.createError(err)); - subscriber.complete(); - }, () => { subscriber.next(Notification.createComplete()); subscriber.complete(); + }, + (err) => { + subscriber.next(Notification.createError(err)); + subscriber.complete(); } ) ); diff --git a/src/internal/operators/mergeInternals.ts b/src/internal/operators/mergeInternals.ts index b84ad40020..fc7d1a613e 100644 --- a/src/internal/operators/mergeInternals.ts +++ b/src/internal/operators/mergeInternals.ts @@ -84,13 +84,13 @@ export function mergeInternals( subscriber.next(innerValue); } }, - // Errors are passed to the destination. - undefined, () => { // Flag that we have completed, so we know to check the buffer // during finalization. innerComplete = true; }, + // Errors are passed to the destination. + undefined, () => { // During finalization, if the inner completed (it wasn't errored or // cancelled), then we want to try the next item in the buffer if @@ -129,17 +129,11 @@ export function mergeInternals( // Subscribe to our source observable. source.subscribe( - new OperatorSubscriber( - subscriber, - outerNext, - // Errors are passed through - undefined, - () => { - // Outer completed, make a note of it, and check to see if we can complete everything. - isComplete = true; - checkComplete(); - } - ) + new OperatorSubscriber(subscriber, outerNext, () => { + // Outer completed, make a note of it, and check to see if we can complete everything. + isComplete = true; + checkComplete(); + }) ); // Additional teardown (for when the destination is torn down). diff --git a/src/internal/operators/observeOn.ts b/src/internal/operators/observeOn.ts index 9ef3de8e23..70b6411b43 100644 --- a/src/internal/operators/observeOn.ts +++ b/src/internal/operators/observeOn.ts @@ -60,8 +60,8 @@ export function observeOn(scheduler: SchedulerLike, delay: number = 0): MonoT new OperatorSubscriber( subscriber, (value) => subscriber.add(scheduler.schedule(() => subscriber.next(value), delay)), - (err) => subscriber.add(scheduler.schedule(() => subscriber.error(err), delay)), - () => subscriber.add(scheduler.schedule(() => subscriber.complete(), delay)) + () => subscriber.add(scheduler.schedule(() => subscriber.complete(), delay)), + (err) => subscriber.add(scheduler.schedule(() => subscriber.error(err), delay)) ) ); }); diff --git a/src/internal/operators/repeat.ts b/src/internal/operators/repeat.ts index 1b1da315da..a365c6a521 100644 --- a/src/internal/operators/repeat.ts +++ b/src/internal/operators/repeat.ts @@ -67,7 +67,7 @@ export function repeat(count = Infinity): MonoTypeOperatorFunction { const subscribeForRepeat = () => { let syncUnsub = false; innerSub = source.subscribe( - new OperatorSubscriber(subscriber, undefined, undefined, () => { + new OperatorSubscriber(subscriber, undefined, () => { if (++soFar < count) { if (innerSub) { innerSub.unsubscribe(); diff --git a/src/internal/operators/repeatWhen.ts b/src/internal/operators/repeatWhen.ts index 05b122c3dd..7253ade1cd 100644 --- a/src/internal/operators/repeatWhen.ts +++ b/src/internal/operators/repeatWhen.ts @@ -71,7 +71,6 @@ export function repeatWhen(notifier: (notifications: Observable) => Obs syncResub = true; } }, - undefined, () => { isNotifierComplete = true; checkComplete(); @@ -86,7 +85,7 @@ export function repeatWhen(notifier: (notifications: Observable) => Obs isMainComplete = false; innerSub = source.subscribe( - new OperatorSubscriber(subscriber, undefined, undefined, () => { + new OperatorSubscriber(subscriber, undefined, () => { isMainComplete = true; // Check to see if we are complete, and complete if so. // If we are not complete. Get the subject. This calls the `notifier` function. diff --git a/src/internal/operators/retry.ts b/src/internal/operators/retry.ts index 1e892d8ba8..3a9dbe7ce4 100644 --- a/src/internal/operators/retry.ts +++ b/src/internal/operators/retry.ts @@ -84,6 +84,8 @@ export function retry(configOrCount: number | RetryConfig = Infinity): MonoTy } subscriber.next(value); }, + // Completions are passed through to consumer. + undefined, (err) => { if (soFar++ < count) { if (innerSub) { diff --git a/src/internal/operators/retryWhen.ts b/src/internal/operators/retryWhen.ts index 347d0e36f0..5d5f15c547 100644 --- a/src/internal/operators/retryWhen.ts +++ b/src/internal/operators/retryWhen.ts @@ -66,7 +66,7 @@ export function retryWhen(notifier: (errors: Observable) => Observable { innerSub = source.subscribe( - new OperatorSubscriber(subscriber, undefined, (err) => { + new OperatorSubscriber(subscriber, undefined, undefined, (err) => { if (!errors$) { errors$ = new Subject(); notifier(errors$).subscribe( diff --git a/src/internal/operators/sample.ts b/src/internal/operators/sample.ts index cee3ca9619..7ce877f917 100644 --- a/src/internal/operators/sample.ts +++ b/src/internal/operators/sample.ts @@ -60,6 +60,6 @@ export function sample(notifier: Observable): MonoTypeOperatorFunction( // Maybe send it to the consumer. emitOnNext && subscriber.next(state); }, - undefined, // If an onComplete was given, call it, otherwise // just pass through the complete notification to the consumer. emitBeforeComplete && diff --git a/src/internal/operators/sequenceEqual.ts b/src/internal/operators/sequenceEqual.ts index 86fc1affd6..740bc539f7 100644 --- a/src/internal/operators/sequenceEqual.ts +++ b/src/internal/operators/sequenceEqual.ts @@ -101,7 +101,6 @@ export function sequenceEqual( !comparator(a, buffer.shift()!) && emit(false); } }, - undefined, () => { // Or observable completed selfState.complete = true; diff --git a/src/internal/operators/single.ts b/src/internal/operators/single.ts index 1ecdc4c391..e4151a6aa1 100644 --- a/src/internal/operators/single.ts +++ b/src/internal/operators/single.ts @@ -107,7 +107,6 @@ export function single(predicate?: (value: T, index: number, source: Observab singleValue = value; } }, - undefined, () => { if (hasValue) { subscriber.next(singleValue); diff --git a/src/internal/operators/skipUntil.ts b/src/internal/operators/skipUntil.ts index eedbbc3e47..f4aa09c6b7 100644 --- a/src/internal/operators/skipUntil.ts +++ b/src/internal/operators/skipUntil.ts @@ -53,7 +53,6 @@ export function skipUntil(notifier: Observable): MonoTypeOperatorFunctio skipSubscriber?.unsubscribe(); taking = true; }, - undefined, noop ); diff --git a/src/internal/operators/switchMap.ts b/src/internal/operators/switchMap.ts index 8c4a988018..993916079e 100644 --- a/src/internal/operators/switchMap.ts +++ b/src/internal/operators/switchMap.ts @@ -110,7 +110,6 @@ export function switchMap>( // handling the deprecate result selector here. This is because with this architecture // it ends up being smaller than using the map operator. (innerValue) => subscriber.next(resultSelector ? resultSelector(value, innerValue, outerIndex, innerIndex++) : innerValue), - undefined, () => { // The inner has completed. Null out the inner subcriber to // free up memory and to signal that we have no inner subscription @@ -121,7 +120,6 @@ export function switchMap>( )) ); }, - undefined, () => { isComplete = true; checkComplete(); diff --git a/src/internal/operators/takeLast.ts b/src/internal/operators/takeLast.ts index b4ba0a97b9..3e3697ce60 100644 --- a/src/internal/operators/takeLast.ts +++ b/src/internal/operators/takeLast.ts @@ -62,7 +62,6 @@ export function takeLast(count: number): MonoTypeOperatorFunction { // want to take, we remove the oldest value from the buffer. count < buffer.length && buffer.shift(); }, - undefined, () => { // The source completed, we now know what are last values // are, emit them in the order they were received. @@ -71,6 +70,8 @@ export function takeLast(count: number): MonoTypeOperatorFunction { } subscriber.complete(); }, + // Errors are passed through to the consumer + undefined, () => { // During teardown release the values in our buffer. buffer = null!; diff --git a/src/internal/operators/takeUntil.ts b/src/internal/operators/takeUntil.ts index 9835ee1309..a8579af70a 100644 --- a/src/internal/operators/takeUntil.ts +++ b/src/internal/operators/takeUntil.ts @@ -44,7 +44,7 @@ import { noop } from '../util/noop'; */ export function takeUntil(notifier: ObservableInput): MonoTypeOperatorFunction { return operate((source, subscriber) => { - innerFrom(notifier).subscribe(new OperatorSubscriber(subscriber, () => subscriber.complete(), undefined, noop)); + innerFrom(notifier).subscribe(new OperatorSubscriber(subscriber, () => subscriber.complete(), noop)); !subscriber.closed && source.subscribe(subscriber); }); } diff --git a/src/internal/operators/tap.ts b/src/internal/operators/tap.ts index d168a38a9d..d1056b958b 100644 --- a/src/internal/operators/tap.ts +++ b/src/internal/operators/tap.ts @@ -126,13 +126,13 @@ export function tap( tapObserver.next?.(value); subscriber.next(value); }, - (err) => { - tapObserver.error?.(err); - subscriber.error(err); - }, () => { tapObserver.complete?.(); subscriber.complete(); + }, + (err) => { + tapObserver.error?.(err); + subscriber.error(err); } ) ); diff --git a/src/internal/operators/throttle.ts b/src/internal/operators/throttle.ts index 99cfa5f965..b5dec74bea 100644 --- a/src/internal/operators/throttle.ts +++ b/src/internal/operators/throttle.ts @@ -84,9 +84,7 @@ export function throttle( }; const startThrottle = (value: T) => - (throttled = innerFrom(durationSelector(value)).subscribe( - new OperatorSubscriber(subscriber, endThrottling, undefined, cleanupThrottling) - )); + (throttled = innerFrom(durationSelector(value)).subscribe(new OperatorSubscriber(subscriber, endThrottling, cleanupThrottling))); const send = () => { if (hasValue) { @@ -115,7 +113,6 @@ export function throttle( sendValue = value; !(throttled && !throttled.closed) && (leading ? send() : startThrottle(value)); }, - undefined, () => { isComplete = true; !(trailing && hasValue && throttled && !throttled.closed) && subscriber.complete(); diff --git a/src/internal/operators/throwIfEmpty.ts b/src/internal/operators/throwIfEmpty.ts index cf53be1795..b7ab366598 100644 --- a/src/internal/operators/throwIfEmpty.ts +++ b/src/internal/operators/throwIfEmpty.ts @@ -45,7 +45,6 @@ export function throwIfEmpty(errorFactory: () => any = defaultErrorFactory): hasValue = true; subscriber.next(value); }, - undefined, () => (hasValue ? subscriber.complete() : subscriber.error(errorFactory())) ) ); diff --git a/src/internal/operators/window.ts b/src/internal/operators/window.ts index 58c1b5f0f6..3be79a09cf 100644 --- a/src/internal/operators/window.ts +++ b/src/internal/operators/window.ts @@ -62,11 +62,11 @@ export function window(windowBoundaries: Observable): OperatorFunction windowSubject?.next(value), - errorHandler, () => { windowSubject.complete(); subscriber.complete(); - } + }, + errorHandler ) ); @@ -78,8 +78,8 @@ export function window(windowBoundaries: Observable): OperatorFunction(windowSize: number, startWindowEvery: number = 0) subscriber.next(window.asObservable()); } }, - (err) => { - while (windows.length > 0) { - windows.shift()!.error(err); - } - subscriber.error(err); - }, () => { while (windows.length > 0) { windows.shift()!.complete(); } subscriber.complete(); }, + (err) => { + while (windows.length > 0) { + windows.shift()!.error(err); + } + subscriber.error(err); + }, () => { starts = null!; windows = null!; diff --git a/src/internal/operators/windowTime.ts b/src/internal/operators/windowTime.ts index f37fbb6171..d5c8354b12 100644 --- a/src/internal/operators/windowTime.ts +++ b/src/internal/operators/windowTime.ts @@ -183,10 +183,10 @@ export function windowTime(windowTimeSpan: number, ...otherArgs: any[]): Oper maxWindowSize <= ++record.seen && closeWindow(record); }); }, - // Notify the windows and the downstream subscriber of the error and clean up. - (err) => terminate((consumer) => consumer.error(err)), // Complete the windows and the downstream subscriber and clean up. - () => terminate((consumer) => consumer.complete()) + () => terminate((consumer) => consumer.complete()), + // Notify the windows and the downstream subscriber of the error and clean up. + (err) => terminate((consumer) => consumer.error(err)) ) ); diff --git a/src/internal/operators/windowToggle.ts b/src/internal/operators/windowToggle.ts index 2c92c1208b..4e6eaf38d0 100644 --- a/src/internal/operators/windowToggle.ts +++ b/src/internal/operators/windowToggle.ts @@ -91,9 +91,8 @@ export function windowToggle( subscriber.next(window.asObservable()); - closingSubscription.add(closingNotifier.subscribe(new OperatorSubscriber(subscriber, closeWindow, handleError, noop))); + closingSubscription.add(closingNotifier.subscribe(new OperatorSubscriber(subscriber, closeWindow, noop, handleError))); }, - undefined, noop ) ); @@ -110,7 +109,6 @@ export function windowToggle( window.next(value); } }, - handleError, () => { // Complete all of our windows before we complete. while (0 < windows.length) { @@ -118,6 +116,7 @@ export function windowToggle( } subscriber.complete(); }, + handleError, () => { // Add this teardown so that all window subjects are // disposed of. This way, if a user tries to subscribe diff --git a/src/internal/operators/windowWhen.ts b/src/internal/operators/windowWhen.ts index e3aaf1738a..fac8d342be 100644 --- a/src/internal/operators/windowWhen.ts +++ b/src/internal/operators/windowWhen.ts @@ -94,7 +94,7 @@ export function windowWhen(closingSelector: () => ObservableInput): Oper // to capture the subscriber (aka Subscription) // so we can clean it up when we close the window // and open a new one. - closingNotifier.subscribe((closingSubscriber = new OperatorSubscriber(subscriber, openWindow, handleError, openWindow))); + closingNotifier.subscribe((closingSubscriber = new OperatorSubscriber(subscriber, openWindow, openWindow, handleError))); }; // Start the first window. @@ -105,12 +105,12 @@ export function windowWhen(closingSelector: () => ObservableInput): Oper new OperatorSubscriber( subscriber, (value) => window!.next(value), - handleError, () => { // The source completed, close the window and complete. window!.complete(); subscriber.complete(); }, + handleError, () => { // Be sure to clean up our closing subscription // when this tears down. diff --git a/src/internal/operators/withLatestFrom.ts b/src/internal/operators/withLatestFrom.ts index 8d503e1b35..9b2dc769e4 100644 --- a/src/internal/operators/withLatestFrom.ts +++ b/src/internal/operators/withLatestFrom.ts @@ -88,7 +88,6 @@ export function withLatestFrom(...inputs: any[]): OperatorFunction