Skip to content

Commit

Permalink
fix: finalize behaves well with useDeprecatedSynchronousErrorHandling (
Browse files Browse the repository at this point in the history
…#6251)

* fix: finalize behaves well with useDeprecatedSynchronousErrorHandling

Adds tests and ensures a few more scenarios that were hit in Google because they use the deprecated synchronous error handling.

fixes #6250

* refactor: Move deprecated junk to its own method

Just for readability. The deprecated stuff is a hot mess, and this shows what we get to delete in version 8 more cleanly.

* refactor: Add more comments

* test: Add more tests around gross mode and finalize
  • Loading branch information
benlesh committed Apr 23, 2021
1 parent 23bc7fd commit e4bed2a
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 33 deletions.
134 changes: 132 additions & 2 deletions spec/Observable-spec.ts
Expand Up @@ -2,7 +2,7 @@ import { expect } from 'chai';
import * as sinon from 'sinon';
import { Observer, TeardownLogic } from '../src/internal/types';
import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty } from 'rxjs';
import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap, publish, publishLast, publishBehavior, share} from 'rxjs/operators';
import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap, publish, publishLast, publishBehavior, share, finalize} from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from './helpers/observableMatcher';

Expand Down Expand Up @@ -633,7 +633,137 @@ describe('Observable', () => {
.pipe(switchMap(() => throwError(new Error('Avast! Thar be a new error!'))))
.subscribe(console.log);
}).to.throw('Avast! Thar be a new error!');
})
});

it('should teardown even with a synchronous error', () => {
let called = false;
const badObservable = new Observable((subscriber) => {
subscriber.add(() => {
called = true;
});

subscriber.error(new Error('bad'));
});

try {
badObservable.subscribe();
} catch (err) {
// do nothing
}
expect(called).to.be.true;
});

it('should teardown even with a synchronous thrown error', () => {
let called = false;
const badObservable = new Observable((subscriber) => {
subscriber.add(() => {
called = true;
});

throw new Error('bad');
});

try {
badObservable.subscribe();
} catch (err) {
// do nothing
}
expect(called).to.be.true;
});


it('should handle empty string sync errors', () => {
const badObservable = new Observable(() => {
throw '';
});

let caught = false;
try {
badObservable.subscribe();
} catch (err) {
caught = true;
expect(err).to.equal('');
}
expect(caught).to.be.true;
});

it('should execute finalize even with a sync error', () => {
let called = false;
const badObservable = new Observable((subscriber) => {
subscriber.error(new Error('bad'));
}).pipe(
finalize(() => {
called = true;
})
);

try {
badObservable.subscribe();
} catch (err) {
// do nothing
}
expect(called).to.be.true;
});

it('should execute finalize even with a sync thrown error', () => {
let called = false;
const badObservable = new Observable(() => {
throw new Error('bad');
}).pipe(
finalize(() => {
called = true;
})
);

try {
badObservable.subscribe();
} catch (err) {
// do nothing
}
expect(called).to.be.true;
});

it('should execute finalize in order even with a sync error', () => {
const results: any[] = [];
const badObservable = new Observable((subscriber) => {
subscriber.error(new Error('bad'));
}).pipe(
finalize(() => {
results.push(1);
}),
finalize(() => {
results.push(2)
})
);

try {
badObservable.subscribe();
} catch (err) {
// do nothing
}
expect(results).to.deep.equal([1, 2]);
});

it('should execute finalize in order even with a sync thrown error', () => {
const results: any[] = [];
const badObservable = new Observable(() => {
throw new Error('bad');
}).pipe(
finalize(() => {
results.push(1);
}),
finalize(() => {
results.push(2)
})
);

try {
badObservable.subscribe();
} catch (err) {
// do nothing
}
expect(results).to.deep.equal([1, 2]);
});

afterEach(() => {
config.useDeprecatedSynchronousErrorHandling = false;
Expand Down
82 changes: 53 additions & 29 deletions src/internal/Observable.ts
Expand Up @@ -215,43 +215,67 @@ export class Observable<T> implements Subscribable<T> {
): Subscription {
const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);

// If we have an operator, it's the result of a lift, and we let the lift
// mechanism do the subscription for us in the operator call. Otherwise,
// if we have a source, it's a trusted observable we own, and we can call
// the `_subscribe` without wrapping it in a try/catch. If we are supposed to
// use the deprecated sync error handling, then we don't need the try/catch either
// otherwise, it may be from a user-made observable instance, and we want to
// wrap it in a try/catch so we can handle errors appropriately.
const { operator, source } = this;

let dest: any = subscriber;
if (config.useDeprecatedSynchronousErrorHandling) {
dest._syncErrorHack_isSubscribing = true;
this._deprecatedSyncErrorSubscribe(subscriber);
} else {
const { operator, source } = this;
subscriber.add(
operator
? // We're dealing with a subscription in the
// operator chain to one of our lifted operators.
operator.call(subscriber, source)
: source
? // If `source` has a value, but `operator` does not, something that
// had intimate knowledge of our API, like our `Subject`, must have
// set it. We're going to just call `_subscribe` directly.
this._subscribe(subscriber)
: // In all other cases, we're likely wrapping a user-provided initializer
// function, so we need to catch errors and handle them appropriately.
this._trySubscribe(subscriber)
);
}
return subscriber;
}

subscriber.add(
operator
? operator.call(subscriber, source)
: source || config.useDeprecatedSynchronousErrorHandling
? this._subscribe(subscriber)
: this._trySubscribe(subscriber)
);
/**
* REMOVE THIS ENTIRE METHOD IN VERSION 8.
*/
private _deprecatedSyncErrorSubscribe(subscriber: Subscriber<unknown>) {
let dest: any = subscriber;
dest._syncErrorHack_isSubscribing = true;
const { operator } = this;
if (operator) {
// We don't need to try/catch on operators, as they
// are doing their own try/catching, and will
// properly decorate the subscriber with `__syncError`.
subscriber.add(operator.call(subscriber, this.source));
} else {
try {
this._subscribe(subscriber);
} catch (err) {
dest.__syncError = err;
}
}

if (config.useDeprecatedSynchronousErrorHandling) {
dest._syncErrorHack_isSubscribing = false;
// In the case of the deprecated sync error handling,
// we need to crawl forward through our subscriber chain and
// look to see if there's any synchronously thrown errors.
// Does this suck for perf? Yes. So stop using the deprecated sync
// error handling already. We're removing this in v8.
while (dest) {
if (dest.__syncError) {
// In the case of the deprecated sync error handling,
// we need to crawl forward through our subscriber chain and
// look to see if there's any synchronously thrown errors.
// Does this suck for perf? Yes. So stop using the deprecated sync
// error handling already. We're removing this in v8.
while (dest) {
// Technically, someone could throw something falsy, like 0, or "",
// so we need to check to see if anything was thrown, and we know
// that by the mere existence of `__syncError`.
if ('__syncError' in dest) {
try {
throw dest.__syncError;
} finally {
subscriber.unsubscribe();
}
dest = dest.destination;
}
dest = dest.destination;
}
return subscriber;
dest._syncErrorHack_isSubscribing = false;
}

/** @internal */
Expand Down
9 changes: 7 additions & 2 deletions src/internal/operators/finalize.ts
Expand Up @@ -58,7 +58,12 @@ import { operate } from '../util/lift';
*/
export function finalize<T>(callback: () => void): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
source.subscribe(subscriber);
subscriber.add(callback);
// TODO: This try/finally was only added for `useDeprecatedSynchronousErrorHandling`.
// REMOVE THIS WHEN THAT HOT GARBAGE IS REMOVED IN V8.
try {
source.subscribe(subscriber);
} finally {
subscriber.add(callback);
}
});
}

0 comments on commit e4bed2a

Please sign in to comment.