From 0f3f53aa78590b22997fed45c60b7c6896e13db0 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 22 Apr 2021 15:27:58 -0500 Subject: [PATCH 1/4] fix: finalize behaves well with useDeprecatedSynchronousErrorHandling Adds tests and ensures a few more scenarios that were hit in Google because they use the deprecated synchronous error handling. fixes #6250 --- spec/Observable-spec.ts | 92 +++++++++++++++++++++++++++++- src/internal/Observable.ts | 40 ++++++------- src/internal/operators/finalize.ts | 9 ++- 3 files changed, 116 insertions(+), 25 deletions(-) diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 1e6bb3972e..334b5d9237 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -2,7 +2,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { Observer, TeardownLogic } from '../src/internal/types'; import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty } from 'rxjs'; -import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap, publish, publishLast, publishBehavior, share} from 'rxjs/operators'; +import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap, publish, publishLast, publishBehavior, share, finalize} from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from './helpers/observableMatcher'; @@ -633,7 +633,95 @@ describe('Observable', () => { .pipe(switchMap(() => throwError(new Error('Avast! Thar be a new error!')))) .subscribe(console.log); }).to.throw('Avast! Thar be a new error!'); - }) + }); + + it('should teardown even with a synchronous error', () => { + let called = false; + const badObservable = new Observable((subscriber) => { + subscriber.add(() => { + called = true; + }); + + subscriber.error(new Error('bad')); + }); + + try { + badObservable.subscribe(); + } catch (err) { + // do nothing + } + expect(called).to.be.true; + }); + + it('should teardown even with a synchronous thrown error', () => { + let called = false; + const badObservable = new Observable((subscriber) => { + subscriber.add(() => { + called = true; + }); + + throw new Error('bad'); + }); + + try { + badObservable.subscribe(); + } catch (err) { + // do nothing + } + expect(called).to.be.true; + }); + + + it('should handle empty string sync errors', () => { + const badObservable = new Observable(() => { + throw ''; + }); + + let caught = false; + try { + badObservable.subscribe(); + } catch (err) { + caught = true; + expect(err).to.equal(''); + } + expect(caught).to.be.true; + }); + + it('should execute finalize even with a sync error', () => { + let called = false; + const badObservable = new Observable((subscriber) => { + subscriber.error(new Error('bad')); + }).pipe( + finalize(() => { + called = true; + }) + ); + + try { + badObservable.subscribe(); + } catch (err) { + // do nothing + } + expect(called).to.be.true; + }); + + it('should execute finalize even with a sync thrown error', () => { + let called = false; + const badObservable = new Observable(() => { + throw new Error('bad'); + }).pipe( + finalize(() => { + called = true; + }) + ); + + try { + badObservable.subscribe(); + } catch (err) { + // do nothing + } + expect(called).to.be.true; + }); afterEach(() => { config.useDeprecatedSynchronousErrorHandling = false; diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index 1e5d027274..3c0b586bf8 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -215,41 +215,39 @@ export class Observable implements Subscribable { ): Subscription { const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete); - // If we have an operator, it's the result of a lift, and we let the lift - // mechanism do the subscription for us in the operator call. Otherwise, - // if we have a source, it's a trusted observable we own, and we can call - // the `_subscribe` without wrapping it in a try/catch. If we are supposed to - // use the deprecated sync error handling, then we don't need the try/catch either - // otherwise, it may be from a user-made observable instance, and we want to - // wrap it in a try/catch so we can handle errors appropriately. const { operator, source } = this; - - let dest: any = subscriber; if (config.useDeprecatedSynchronousErrorHandling) { + let dest: any = subscriber; dest._syncErrorHack_isSubscribing = true; - } - subscriber.add( - operator - ? operator.call(subscriber, source) - : source || config.useDeprecatedSynchronousErrorHandling - ? this._subscribe(subscriber) - : this._trySubscribe(subscriber) - ); + if (operator) { + subscriber.add(operator.call(subscriber, source)); + } else { + try { + this._subscribe(subscriber); + } catch (err) { + dest.__syncError = err; + } + } - if (config.useDeprecatedSynchronousErrorHandling) { - dest._syncErrorHack_isSubscribing = false; // In the case of the deprecated sync error handling, // we need to crawl forward through our subscriber chain and // look to see if there's any synchronously thrown errors. // Does this suck for perf? Yes. So stop using the deprecated sync // error handling already. We're removing this in v8. while (dest) { - if (dest.__syncError) { - throw dest.__syncError; + if ('__syncError' in dest) { + try { + throw dest.__syncError; + } finally { + subscriber.unsubscribe(); + } } dest = dest.destination; } + dest._syncErrorHack_isSubscribing = false; + } else { + subscriber.add(operator ? operator.call(subscriber, source) : source ? this._subscribe(subscriber) : this._trySubscribe(subscriber)); } return subscriber; } diff --git a/src/internal/operators/finalize.ts b/src/internal/operators/finalize.ts index 4602f79045..436fb45f43 100644 --- a/src/internal/operators/finalize.ts +++ b/src/internal/operators/finalize.ts @@ -58,7 +58,12 @@ import { operate } from '../util/lift'; */ export function finalize(callback: () => void): MonoTypeOperatorFunction { return operate((source, subscriber) => { - source.subscribe(subscriber); - subscriber.add(callback); + // TODO: This try/finally was only added for `useDeprecatedSynchronousErrorHandling`. + // REMOVE THIS WHEN THAT HOT GARBAGE IS REMOVED IN V8. + try { + source.subscribe(subscriber); + } finally { + subscriber.add(callback); + } }); } From 729ea954000ce95db0c52c8bc58585d5a7c1f36e Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 22 Apr 2021 15:39:53 -0500 Subject: [PATCH 2/4] refactor: Move deprecated junk to its own method Just for readability. The deprecated stuff is a hot mess, and this shows what we get to delete in version 8 more cleanly. --- src/internal/Observable.ts | 69 ++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index 3c0b586bf8..53e47ec9f0 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -215,41 +215,54 @@ export class Observable implements Subscribable { ): Subscription { const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete); - const { operator, source } = this; if (config.useDeprecatedSynchronousErrorHandling) { - let dest: any = subscriber; - dest._syncErrorHack_isSubscribing = true; + this._deprecatedSyncErrorSubscribe(subscriber); + } else { + const { operator, source } = this; + subscriber.add(operator ? operator.call(subscriber, source) : source ? this._subscribe(subscriber) : this._trySubscribe(subscriber)); + } + return subscriber; + } - if (operator) { - subscriber.add(operator.call(subscriber, source)); - } else { - try { - this._subscribe(subscriber); - } catch (err) { - dest.__syncError = err; - } + /** + * REMOVE THIS ENTIRE METHOD IN VERSION 8. + */ + private _deprecatedSyncErrorSubscribe(subscriber: Subscriber) { + let dest: any = subscriber; + dest._syncErrorHack_isSubscribing = true; + const { operator } = this; + if (operator) { + // We don't need to try/catch on operators, as they + // are doing their own try/catching, and will + // properly decorate the subscriber with `__syncError`. + subscriber.add(operator.call(subscriber, this.source)); + } else { + try { + this._subscribe(subscriber); + } catch (err) { + dest.__syncError = err; } + } - // In the case of the deprecated sync error handling, - // we need to crawl forward through our subscriber chain and - // look to see if there's any synchronously thrown errors. - // Does this suck for perf? Yes. So stop using the deprecated sync - // error handling already. We're removing this in v8. - while (dest) { - if ('__syncError' in dest) { - try { - throw dest.__syncError; - } finally { - subscriber.unsubscribe(); - } + // In the case of the deprecated sync error handling, + // we need to crawl forward through our subscriber chain and + // look to see if there's any synchronously thrown errors. + // Does this suck for perf? Yes. So stop using the deprecated sync + // error handling already. We're removing this in v8. + while (dest) { + // Technically, someone could throw something falsy, like 0, or "", + // so we need to check to see if anything was thrown, and we know + // that by the mere existence of `__syncError`. + if ('__syncError' in dest) { + try { + throw dest.__syncError; + } finally { + subscriber.unsubscribe(); } - dest = dest.destination; } - dest._syncErrorHack_isSubscribing = false; - } else { - subscriber.add(operator ? operator.call(subscriber, source) : source ? this._subscribe(subscriber) : this._trySubscribe(subscriber)); + dest = dest.destination; } - return subscriber; + dest._syncErrorHack_isSubscribing = false; } /** @internal */ From d0fca8d8b3cb5419867e4fd0edd9f13d150715e9 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 22 Apr 2021 15:58:09 -0500 Subject: [PATCH 3/4] refactor: Add more comments --- src/internal/Observable.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index 53e47ec9f0..67b1c6a669 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -219,7 +219,20 @@ export class Observable implements Subscribable { this._deprecatedSyncErrorSubscribe(subscriber); } else { const { operator, source } = this; - subscriber.add(operator ? operator.call(subscriber, source) : source ? this._subscribe(subscriber) : this._trySubscribe(subscriber)); + subscriber.add( + operator + ? // We're dealing with a subscription in the + // operator chain to one of our lifted operators. + operator.call(subscriber, source) + : source + ? // If `source` has a value, but `operator` does not, something that + // had intimate knowledge of our API, like our `Subject`, must have + // set it. We're going to just call `_subscribe` directly. + this._subscribe(subscriber) + : // In all other cases, we're likely wrapping a user-provided initializer + // function, so we need to catch errors and handle them appropriately. + this._trySubscribe(subscriber) + ); } return subscriber; } From dcb9543d0e48f04980bebdc16f8d4ae843c0052f Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 22 Apr 2021 16:05:00 -0500 Subject: [PATCH 4/4] test: Add more tests around gross mode and finalize --- spec/Observable-spec.ts | 42 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 334b5d9237..9279850d8c 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -722,6 +722,48 @@ describe('Observable', () => { } expect(called).to.be.true; }); + + it('should execute finalize in order even with a sync error', () => { + const results: any[] = []; + const badObservable = new Observable((subscriber) => { + subscriber.error(new Error('bad')); + }).pipe( + finalize(() => { + results.push(1); + }), + finalize(() => { + results.push(2) + }) + ); + + try { + badObservable.subscribe(); + } catch (err) { + // do nothing + } + expect(results).to.deep.equal([1, 2]); + }); + + it('should execute finalize in order even with a sync thrown error', () => { + const results: any[] = []; + const badObservable = new Observable(() => { + throw new Error('bad'); + }).pipe( + finalize(() => { + results.push(1); + }), + finalize(() => { + results.push(2) + }) + ); + + try { + badObservable.subscribe(); + } catch (err) { + // do nothing + } + expect(results).to.deep.equal([1, 2]); + }); afterEach(() => { config.useDeprecatedSynchronousErrorHandling = false;