diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 1e6bb3972e..9279850d8c 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -2,7 +2,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { Observer, TeardownLogic } from '../src/internal/types'; import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty } from 'rxjs'; -import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap, publish, publishLast, publishBehavior, share} from 'rxjs/operators'; +import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap, publish, publishLast, publishBehavior, share, finalize} from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from './helpers/observableMatcher'; @@ -633,7 +633,137 @@ describe('Observable', () => { .pipe(switchMap(() => throwError(new Error('Avast! Thar be a new error!')))) .subscribe(console.log); }).to.throw('Avast! Thar be a new error!'); - }) + }); + + it('should teardown even with a synchronous error', () => { + let called = false; + const badObservable = new Observable((subscriber) => { + subscriber.add(() => { + called = true; + }); + + subscriber.error(new Error('bad')); + }); + + try { + badObservable.subscribe(); + } catch (err) { + // do nothing + } + expect(called).to.be.true; + }); + + it('should teardown even with a synchronous thrown error', () => { + let called = false; + const badObservable = new Observable((subscriber) => { + subscriber.add(() => { + called = true; + }); + + throw new Error('bad'); + }); + + try { + badObservable.subscribe(); + } catch (err) { + // do nothing + } + expect(called).to.be.true; + }); + + + it('should handle empty string sync errors', () => { + const badObservable = new Observable(() => { + throw ''; + }); + + let caught = false; + try { + badObservable.subscribe(); + } catch (err) { + caught = true; + expect(err).to.equal(''); + } + expect(caught).to.be.true; + }); + + it('should execute finalize even with a sync error', () => { + let called = false; + const badObservable = new Observable((subscriber) => { + subscriber.error(new Error('bad')); + }).pipe( + finalize(() => { + called = true; + }) + ); + + try { + badObservable.subscribe(); + } catch (err) { + // do nothing + } + expect(called).to.be.true; + }); + + it('should execute finalize even with a sync thrown error', () => { + let called = false; + const badObservable = new Observable(() => { + throw new Error('bad'); + }).pipe( + finalize(() => { + called = true; + }) + ); + + try { + badObservable.subscribe(); + } catch (err) { + // do nothing + } + expect(called).to.be.true; + }); + + it('should execute finalize in order even with a sync error', () => { + const results: any[] = []; + const badObservable = new Observable((subscriber) => { + subscriber.error(new Error('bad')); + }).pipe( + finalize(() => { + results.push(1); + }), + finalize(() => { + results.push(2) + }) + ); + + try { + badObservable.subscribe(); + } catch (err) { + // do nothing + } + expect(results).to.deep.equal([1, 2]); + }); + + it('should execute finalize in order even with a sync thrown error', () => { + const results: any[] = []; + const badObservable = new Observable(() => { + throw new Error('bad'); + }).pipe( + finalize(() => { + results.push(1); + }), + finalize(() => { + results.push(2) + }) + ); + + try { + badObservable.subscribe(); + } catch (err) { + // do nothing + } + expect(results).to.deep.equal([1, 2]); + }); afterEach(() => { config.useDeprecatedSynchronousErrorHandling = false; diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index 1e5d027274..67b1c6a669 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -215,43 +215,67 @@ export class Observable implements Subscribable { ): Subscription { const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete); - // If we have an operator, it's the result of a lift, and we let the lift - // mechanism do the subscription for us in the operator call. Otherwise, - // if we have a source, it's a trusted observable we own, and we can call - // the `_subscribe` without wrapping it in a try/catch. If we are supposed to - // use the deprecated sync error handling, then we don't need the try/catch either - // otherwise, it may be from a user-made observable instance, and we want to - // wrap it in a try/catch so we can handle errors appropriately. - const { operator, source } = this; - - let dest: any = subscriber; if (config.useDeprecatedSynchronousErrorHandling) { - dest._syncErrorHack_isSubscribing = true; + this._deprecatedSyncErrorSubscribe(subscriber); + } else { + const { operator, source } = this; + subscriber.add( + operator + ? // We're dealing with a subscription in the + // operator chain to one of our lifted operators. + operator.call(subscriber, source) + : source + ? // If `source` has a value, but `operator` does not, something that + // had intimate knowledge of our API, like our `Subject`, must have + // set it. We're going to just call `_subscribe` directly. + this._subscribe(subscriber) + : // In all other cases, we're likely wrapping a user-provided initializer + // function, so we need to catch errors and handle them appropriately. + this._trySubscribe(subscriber) + ); } + return subscriber; + } - subscriber.add( - operator - ? operator.call(subscriber, source) - : source || config.useDeprecatedSynchronousErrorHandling - ? this._subscribe(subscriber) - : this._trySubscribe(subscriber) - ); + /** + * REMOVE THIS ENTIRE METHOD IN VERSION 8. + */ + private _deprecatedSyncErrorSubscribe(subscriber: Subscriber) { + let dest: any = subscriber; + dest._syncErrorHack_isSubscribing = true; + const { operator } = this; + if (operator) { + // We don't need to try/catch on operators, as they + // are doing their own try/catching, and will + // properly decorate the subscriber with `__syncError`. + subscriber.add(operator.call(subscriber, this.source)); + } else { + try { + this._subscribe(subscriber); + } catch (err) { + dest.__syncError = err; + } + } - if (config.useDeprecatedSynchronousErrorHandling) { - dest._syncErrorHack_isSubscribing = false; - // In the case of the deprecated sync error handling, - // we need to crawl forward through our subscriber chain and - // look to see if there's any synchronously thrown errors. - // Does this suck for perf? Yes. So stop using the deprecated sync - // error handling already. We're removing this in v8. - while (dest) { - if (dest.__syncError) { + // In the case of the deprecated sync error handling, + // we need to crawl forward through our subscriber chain and + // look to see if there's any synchronously thrown errors. + // Does this suck for perf? Yes. So stop using the deprecated sync + // error handling already. We're removing this in v8. + while (dest) { + // Technically, someone could throw something falsy, like 0, or "", + // so we need to check to see if anything was thrown, and we know + // that by the mere existence of `__syncError`. + if ('__syncError' in dest) { + try { throw dest.__syncError; + } finally { + subscriber.unsubscribe(); } - dest = dest.destination; } + dest = dest.destination; } - return subscriber; + dest._syncErrorHack_isSubscribing = false; } /** @internal */ diff --git a/src/internal/operators/finalize.ts b/src/internal/operators/finalize.ts index 4602f79045..436fb45f43 100644 --- a/src/internal/operators/finalize.ts +++ b/src/internal/operators/finalize.ts @@ -58,7 +58,12 @@ import { operate } from '../util/lift'; */ export function finalize(callback: () => void): MonoTypeOperatorFunction { return operate((source, subscriber) => { - source.subscribe(subscriber); - subscriber.add(callback); + // TODO: This try/finally was only added for `useDeprecatedSynchronousErrorHandling`. + // REMOVE THIS WHEN THAT HOT GARBAGE IS REMOVED IN V8. + try { + source.subscribe(subscriber); + } finally { + subscriber.add(callback); + } }); }