Skip to content

Commit

Permalink
refactor(OperatorSubscriber): Move error handler arg (#6198)
Browse files Browse the repository at this point in the history
* refactor(OperatorSubscriber): Move error handler arg

Moves the error handler arg, because in most cases we were passing `undefined` and then a complete handler

* chore: remove pointless comment
  • Loading branch information
benlesh committed Apr 7, 2021
1 parent 8cf5370 commit 1d96831
Show file tree
Hide file tree
Showing 47 changed files with 73 additions and 105 deletions.
8 changes: 4 additions & 4 deletions src/internal/observable/ConnectableObservable.ts
Expand Up @@ -66,14 +66,14 @@ export class ConnectableObservable<T> extends Observable<T> {
new OperatorSubscriber(
subject as any,
undefined,
(err) => {
this._teardown();
subject.error(err);
},
() => {
this._teardown();
subject.complete();
},
(err) => {
this._teardown();
subject.error(err);
},
() => this._teardown()
)
)
Expand Down
1 change: 0 additions & 1 deletion src/internal/observable/combineLatest.ts
Expand Up @@ -251,7 +251,6 @@ export function combineLatestInit(
subscriber.next(valueTransform(values.slice()));
}
},
undefined,
() => {
if (!--active) {
// We only complete the result if we have no more active
Expand Down
5 changes: 2 additions & 3 deletions src/internal/observable/dom/fetch.ts
Expand Up @@ -154,13 +154,12 @@ export function fromFetch<T>(
subscriber,
// Values are passed through to the subscriber
undefined,
// Error handling
handleError,
// The projected response is complete.
() => {
abortable = false;
subscriber.complete();
}
},
handleError
)
);
} else {
Expand Down
1 change: 0 additions & 1 deletion src/internal/observable/forkJoin.ts
Expand Up @@ -152,7 +152,6 @@ export function forkJoin(...args: any[]): Observable<any> {
}
values[sourceIndex] = value;
},
undefined,
() => {
if (!--remainingCompletions || !hasValue) {
if (!remainingEmissions) {
Expand Down
2 changes: 0 additions & 2 deletions src/internal/observable/zip.ts
Expand Up @@ -95,8 +95,6 @@ export function zip(...args: unknown[]): Observable<unknown> {
}
}
},
// Any error is passed through the result.
undefined,
() => {
// This source completed. Mark it as complete so we can check it later
// if we have to.
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/OperatorSubscriber.ts
Expand Up @@ -20,8 +20,8 @@ export class OperatorSubscriber<T> extends Subscriber<T> {
constructor(
destination: Subscriber<any>,
onNext?: (value: T) => void,
onError?: (err: any) => void,
onComplete?: () => void,
onError?: (err: any) => void,
private onFinalize?: () => void
) {
// It's important - for performance reasons - that all of this class's
Expand Down
3 changes: 1 addition & 2 deletions src/internal/operators/audit.ts
Expand Up @@ -81,11 +81,10 @@ export function audit<T>(durationSelector: (value: T) => ObservableInput<any>):
lastValue = value;
if (!durationSubscriber) {
innerFrom(durationSelector(value)).subscribe(
(durationSubscriber = new OperatorSubscriber(subscriber, endDuration, undefined, cleanupDuration))
(durationSubscriber = new OperatorSubscriber(subscriber, endDuration, cleanupDuration))
);
}
},
undefined,
() => {
isComplete = true;
(!hasValue || !durationSubscriber || durationSubscriber.closed) && subscriber.complete();
Expand Down
4 changes: 0 additions & 4 deletions src/internal/operators/buffer.ts
Expand Up @@ -52,8 +52,6 @@ export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T,
new OperatorSubscriber(
subscriber,
(value) => currentBuffer.push(value),
// Pass all errors to the consumer.
undefined,
() => {
subscriber.next(currentBuffer);
subscriber.complete();
Expand All @@ -71,8 +69,6 @@ export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T,
currentBuffer = [];
subscriber.next(b);
},
// Pass all errors to the consumer.
undefined,
noop
)
);
Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/bufferCount.ts
Expand Up @@ -102,7 +102,6 @@ export function bufferCount<T>(bufferSize: number, startBufferEvery: number | nu
}
}
},
undefined,
() => {
// When the source completes, emit all of our
// active buffers.
Expand All @@ -111,6 +110,8 @@ export function bufferCount<T>(bufferSize: number, startBufferEvery: number | nu
}
subscriber.complete();
},
// Pass all errors through to consumer.
undefined,
() => {
// Clean up our memory when we teardown
buffers = null!;
Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/bufferTime.ts
Expand Up @@ -151,7 +151,6 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper
maxBufferSize <= buffer.length && emit(record);
}
},
undefined,
() => {
// The source completed, emit all of the active
// buffers we have before we complete.
Expand All @@ -162,6 +161,8 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper
subscriber.complete();
subscriber.unsubscribe();
},
// Pass all errors through to consumer.
undefined,
// Clean up
() => (bufferRecords = null)
);
Expand Down
6 changes: 1 addition & 5 deletions src/internal/operators/bufferToggle.ts
Expand Up @@ -75,11 +75,8 @@ export function bufferToggle<T, O>(
};

// The line below will add the subscription to the parent subscriber *and* the closing subscription.
closingSubscription.add(
innerFrom(closingSelector(openValue)).subscribe(new OperatorSubscriber(subscriber, emitBuffer, undefined, noop))
);
closingSubscription.add(innerFrom(closingSelector(openValue)).subscribe(new OperatorSubscriber(subscriber, emitBuffer, noop)));
},
undefined,
noop
)
);
Expand All @@ -93,7 +90,6 @@ export function bufferToggle<T, O>(
buffer.push(value);
}
},
undefined,
() => {
// Source complete. Emit all pending buffers.
while (buffers.length > 0) {
Expand Down
6 changes: 3 additions & 3 deletions src/internal/operators/bufferWhen.ts
Expand Up @@ -68,7 +68,7 @@ export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): Oper
b && subscriber.next(b);

// Get a new closing notifier and subscribe to it.
innerFrom(closingSelector()).subscribe((closingSubscriber = new OperatorSubscriber(subscriber, openBuffer, undefined, noop)));
innerFrom(closingSelector()).subscribe((closingSubscriber = new OperatorSubscriber(subscriber, openBuffer, noop)));
};

// Start the first buffer.
Expand All @@ -80,14 +80,14 @@ export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): Oper
subscriber,
// Add every new value to the current buffer.
(value) => buffer?.push(value),
// All errors are passed through to the consumer.
undefined,
// When we complete, emit the buffer if we have one,
// then complete the result.
() => {
buffer && subscriber.next(buffer);
subscriber.complete();
},
// Pass all errors through to consumer.
undefined,
// Release memory on teardown
() => (buffer = closingSubscriber = null!)
)
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/catchError.ts
Expand Up @@ -112,7 +112,7 @@ export function catchError<T, O extends ObservableInput<any>>(
let handledResult: Observable<ObservedValueOf<O>>;

innerSub = source.subscribe(
new OperatorSubscriber(subscriber, undefined, (err) => {
new OperatorSubscriber(subscriber, undefined, undefined, (err) => {
handledResult = innerFrom(selector(err, catchError(selector)(source)));
if (innerSub) {
innerSub.unsubscribe();
Expand Down
5 changes: 3 additions & 2 deletions src/internal/operators/debounce.ts
Expand Up @@ -96,17 +96,18 @@ export function debounce<T>(durationSelector: (value: T) => ObservableInput<any>
lastValue = value;
// Capture our duration subscriber, so we can unsubscribe it when we're notified
// and we're going to emit the value.
durationSubscriber = new OperatorSubscriber(subscriber, emit, undefined, noop);
durationSubscriber = new OperatorSubscriber(subscriber, emit, noop);
// Subscribe to the duration.
innerFrom(durationSelector(value)).subscribe(durationSubscriber);
},
undefined,
() => {
// Source completed.
// Emit any pending debounced values then complete
emit();
subscriber.complete();
},
// Pass all errors through to consumer
undefined,
() => {
// Teardown.
lastValue = durationSubscriber = null;
Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/debounceTime.ts
Expand Up @@ -104,13 +104,14 @@ export function debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyn
activeTask = scheduler.schedule(emitWhenIdle, dueTime);
}
},
undefined,
() => {
// Source completed.
// Emit any pending debounced values then complete
emit();
subscriber.complete();
},
// Pass all errors through to consumer.
undefined,
() => {
// Teardown.
lastValue = activeTask = null;
Expand Down
1 change: 0 additions & 1 deletion src/internal/operators/defaultIfEmpty.ts
Expand Up @@ -46,7 +46,6 @@ export function defaultIfEmpty<T, R>(defaultValue: R): OperatorFunction<T, T | R
hasValue = true;
subscriber.next(value);
},
undefined,
() => {
if (!hasValue) {
subscriber.next(defaultValue!);
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/distinct.ts
Expand Up @@ -84,6 +84,6 @@ export function distinct<T, K>(keySelector?: (value: T) => K, flushes?: Observab
})
);

flushes?.subscribe(new OperatorSubscriber(subscriber, () => distinctKeys.clear(), undefined, noop));
flushes?.subscribe(new OperatorSubscriber(subscriber, () => distinctKeys.clear(), noop));
});
}
1 change: 0 additions & 1 deletion src/internal/operators/every.ts
Expand Up @@ -53,7 +53,6 @@ export function every<T>(
subscriber.complete();
}
},
undefined,
() => {
subscriber.next(true);
subscriber.complete();
Expand Down
3 changes: 1 addition & 2 deletions src/internal/operators/exhaustAll.ts
Expand Up @@ -57,14 +57,13 @@ export function exhaustAll<O extends ObservableInput<any>>(): OperatorFunction<O
(inner) => {
if (!innerSub) {
innerSub = innerFrom(inner).subscribe(
new OperatorSubscriber(subscriber, undefined, undefined, () => {
new OperatorSubscriber(subscriber, undefined, () => {
innerSub = null;
isComplete && subscriber.complete();
})
);
}
},
undefined,
() => {
isComplete = true;
!innerSub && subscriber.complete();
Expand Down
3 changes: 1 addition & 2 deletions src/internal/operators/exhaustMap.ts
Expand Up @@ -83,14 +83,13 @@ export function exhaustMap<T, R, O extends ObservableInput<any>>(
subscriber,
(outerValue) => {
if (!innerSub) {
innerSub = new OperatorSubscriber(subscriber, undefined, undefined, () => {
innerSub = new OperatorSubscriber(subscriber, undefined, () => {
innerSub = null;
isComplete && subscriber.complete();
});
innerFrom(project(outerValue, index++)).subscribe(innerSub);
}
},
undefined,
() => {
isComplete = true;
!innerSub && subscriber.complete();
Expand Down
1 change: 0 additions & 1 deletion src/internal/operators/find.ts
Expand Up @@ -79,7 +79,6 @@ export function createFind<T>(
subscriber.complete();
}
},
undefined,
() => {
subscriber.next(findIndex ? -1 : undefined);
subscriber.complete();
Expand Down
8 changes: 4 additions & 4 deletions src/internal/operators/groupBy.ts
Expand Up @@ -175,11 +175,11 @@ export function groupBy<T, K, R>(
group!.complete();
durationSubscriber?.unsubscribe();
},
// Completions are also sent to the group, but just the group.
undefined,
// Errors on the duration subscriber are sent to the group
// but only the group. They are not sent to the main subscription.
undefined,
// Completions are also sent to the group, but just the group.
undefined,
// Teardown: Remove this group from our map.
() => groups.delete(key)
);
Expand All @@ -195,10 +195,10 @@ export function groupBy<T, K, R>(
handleError(err);
}
},
// Error from the source.
handleError,
// Source completes.
() => notify((consumer) => consumer.complete()),
// Error from the source.
handleError,
// Free up memory.
// When the source subscription is _finally_ torn down, release the subjects and keys
// in our groups Map, they may be quite large and we don't want to keep them around if we
Expand Down
1 change: 0 additions & 1 deletion src/internal/operators/isEmpty.ts
Expand Up @@ -74,7 +74,6 @@ export function isEmpty<T>(): OperatorFunction<T, boolean> {
subscriber.next(false);
subscriber.complete();
},
undefined,
() => {
subscriber.next(true);
subscriber.complete();
Expand Down
8 changes: 4 additions & 4 deletions src/internal/operators/materialize.ts
Expand Up @@ -65,13 +65,13 @@ export function materialize<T>(): OperatorFunction<T, Notification<T> & Observab
(value) => {
subscriber.next(Notification.createNext(value));
},
(err) => {
subscriber.next(Notification.createError(err));
subscriber.complete();
},
() => {
subscriber.next(Notification.createComplete());
subscriber.complete();
},
(err) => {
subscriber.next(Notification.createError(err));
subscriber.complete();
}
)
);
Expand Down
20 changes: 7 additions & 13 deletions src/internal/operators/mergeInternals.ts
Expand Up @@ -84,13 +84,13 @@ export function mergeInternals<T, R>(
subscriber.next(innerValue);
}
},
// Errors are passed to the destination.
undefined,
() => {
// Flag that we have completed, so we know to check the buffer
// during finalization.
innerComplete = true;
},
// Errors are passed to the destination.
undefined,
() => {
// During finalization, if the inner completed (it wasn't errored or
// cancelled), then we want to try the next item in the buffer if
Expand Down Expand Up @@ -129,17 +129,11 @@ export function mergeInternals<T, R>(

// Subscribe to our source observable.
source.subscribe(
new OperatorSubscriber(
subscriber,
outerNext,
// Errors are passed through
undefined,
() => {
// Outer completed, make a note of it, and check to see if we can complete everything.
isComplete = true;
checkComplete();
}
)
new OperatorSubscriber(subscriber, outerNext, () => {
// Outer completed, make a note of it, and check to see if we can complete everything.
isComplete = true;
checkComplete();
})
);

// Additional teardown (for when the destination is torn down).
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/observeOn.ts
Expand Up @@ -60,8 +60,8 @@ export function observeOn<T>(scheduler: SchedulerLike, delay: number = 0): MonoT
new OperatorSubscriber(
subscriber,
(value) => subscriber.add(scheduler.schedule(() => subscriber.next(value), delay)),
(err) => subscriber.add(scheduler.schedule(() => subscriber.error(err), delay)),
() => subscriber.add(scheduler.schedule(() => subscriber.complete(), delay))
() => subscriber.add(scheduler.schedule(() => subscriber.complete(), delay)),
(err) => subscriber.add(scheduler.schedule(() => subscriber.error(err), delay))
)
);
});
Expand Down

0 comments on commit 1d96831

Please sign in to comment.