Skip to content

Commit

Permalink
fix: finalize behaves well with useDeprecatedSynchronousErrorHandling
Browse files Browse the repository at this point in the history
Adds tests and ensures a few more scenarios that were hit in Google because they use the deprecated synchronous error handling.

fixes ReactiveX#6250
  • Loading branch information
benlesh committed Apr 22, 2021
1 parent 23bc7fd commit 0f3f53a
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 25 deletions.
92 changes: 90 additions & 2 deletions spec/Observable-spec.ts
Expand Up @@ -2,7 +2,7 @@ import { expect } from 'chai';
import * as sinon from 'sinon';
import { Observer, TeardownLogic } from '../src/internal/types';
import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty } from 'rxjs';
import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap, publish, publishLast, publishBehavior, share} from 'rxjs/operators';
import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap, publish, publishLast, publishBehavior, share, finalize} from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from './helpers/observableMatcher';

Expand Down Expand Up @@ -633,7 +633,95 @@ describe('Observable', () => {
.pipe(switchMap(() => throwError(new Error('Avast! Thar be a new error!'))))
.subscribe(console.log);
}).to.throw('Avast! Thar be a new error!');
})
});

it('should teardown even with a synchronous error', () => {
let called = false;
const badObservable = new Observable((subscriber) => {
subscriber.add(() => {
called = true;
});

subscriber.error(new Error('bad'));
});

try {
badObservable.subscribe();
} catch (err) {
// do nothing
}
expect(called).to.be.true;
});

it('should teardown even with a synchronous thrown error', () => {
let called = false;
const badObservable = new Observable((subscriber) => {
subscriber.add(() => {
called = true;
});

throw new Error('bad');
});

try {
badObservable.subscribe();
} catch (err) {
// do nothing
}
expect(called).to.be.true;
});


it('should handle empty string sync errors', () => {
const badObservable = new Observable(() => {
throw '';
});

let caught = false;
try {
badObservable.subscribe();
} catch (err) {
caught = true;
expect(err).to.equal('');
}
expect(caught).to.be.true;
});

it('should execute finalize even with a sync error', () => {
let called = false;
const badObservable = new Observable((subscriber) => {
subscriber.error(new Error('bad'));
}).pipe(
finalize(() => {
called = true;
})
);

try {
badObservable.subscribe();
} catch (err) {
// do nothing
}
expect(called).to.be.true;
});

it('should execute finalize even with a sync thrown error', () => {
let called = false;
const badObservable = new Observable(() => {
throw new Error('bad');
}).pipe(
finalize(() => {
called = true;
})
);

try {
badObservable.subscribe();
} catch (err) {
// do nothing
}
expect(called).to.be.true;
});

afterEach(() => {
config.useDeprecatedSynchronousErrorHandling = false;
Expand Down
40 changes: 19 additions & 21 deletions src/internal/Observable.ts
Expand Up @@ -215,41 +215,39 @@ export class Observable<T> implements Subscribable<T> {
): Subscription {
const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);

// If we have an operator, it's the result of a lift, and we let the lift
// mechanism do the subscription for us in the operator call. Otherwise,
// if we have a source, it's a trusted observable we own, and we can call
// the `_subscribe` without wrapping it in a try/catch. If we are supposed to
// use the deprecated sync error handling, then we don't need the try/catch either
// otherwise, it may be from a user-made observable instance, and we want to
// wrap it in a try/catch so we can handle errors appropriately.
const { operator, source } = this;

let dest: any = subscriber;
if (config.useDeprecatedSynchronousErrorHandling) {
let dest: any = subscriber;
dest._syncErrorHack_isSubscribing = true;
}

subscriber.add(
operator
? operator.call(subscriber, source)
: source || config.useDeprecatedSynchronousErrorHandling
? this._subscribe(subscriber)
: this._trySubscribe(subscriber)
);
if (operator) {
subscriber.add(operator.call(subscriber, source));
} else {
try {
this._subscribe(subscriber);
} catch (err) {
dest.__syncError = err;
}
}

if (config.useDeprecatedSynchronousErrorHandling) {
dest._syncErrorHack_isSubscribing = false;
// In the case of the deprecated sync error handling,
// we need to crawl forward through our subscriber chain and
// look to see if there's any synchronously thrown errors.
// Does this suck for perf? Yes. So stop using the deprecated sync
// error handling already. We're removing this in v8.
while (dest) {
if (dest.__syncError) {
throw dest.__syncError;
if ('__syncError' in dest) {
try {
throw dest.__syncError;
} finally {
subscriber.unsubscribe();
}
}
dest = dest.destination;
}
dest._syncErrorHack_isSubscribing = false;
} else {
subscriber.add(operator ? operator.call(subscriber, source) : source ? this._subscribe(subscriber) : this._trySubscribe(subscriber));
}
return subscriber;
}
Expand Down
9 changes: 7 additions & 2 deletions src/internal/operators/finalize.ts
Expand Up @@ -58,7 +58,12 @@ import { operate } from '../util/lift';
*/
export function finalize<T>(callback: () => void): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
source.subscribe(subscriber);
subscriber.add(callback);
// TODO: This try/finally was only added for `useDeprecatedSynchronousErrorHandling`.
// REMOVE THIS WHEN THAT HOT GARBAGE IS REMOVED IN V8.
try {
source.subscribe(subscriber);
} finally {
subscriber.add(callback);
}
});
}

0 comments on commit 0f3f53a

Please sign in to comment.