Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(retryWhen): notifier supports ObservableInput #7105

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 64 additions & 2 deletions spec-dtslint/operators/retryWhen-spec.ts
@@ -1,5 +1,6 @@
import { of } from 'rxjs';
import { retryWhen } from 'rxjs/operators';
import { asInteropObservable } from '../../spec/helpers/interop-helper';

it('should infer correctly', () => {
const o = of(1, 2, 3).pipe(retryWhen(errors => errors)); // $ExpectType Observable<number>
Expand All @@ -13,10 +14,71 @@ it('should enforce types', () => {
const o = of(1, 2, 3).pipe(retryWhen()); // $ExpectError
});

it('should accept interop observable notifier', () => {
of(1, 2, 3).pipe(retryWhen(() => asInteropObservable(of(true)))); // $ExpectType Observable<number>
});

it('should accept promise notifier', () => {
of(1, 2, 3).pipe(retryWhen(() => Promise.resolve(true))); // $ExpectType Observable<number>
});

it('should async iterable notifier', () => {
const asyncRange = {
from: 1,
to: 2,
[Symbol.asyncIterator]() {
return {
current: this.from,
last: this.to,
async next() {
await Promise.resolve();
const done = (this.current > this.last);
return {
done,
value: done ? this.current++ : undefined
};
}
};
}
};
of(1, 2, 3).pipe(retryWhen(() => asyncRange)); // $ExpectType Observable<number>
});

it('should accept iterable notifier', () => {
const syncRange = {
from: 1,
to: 2,
[Symbol.iterator]() {
return {
current: this.from,
last: this.to,
next() {
const done = (this.current > this.last);
return {
done,
value: done ? this.current++ : undefined
};
}
};
}
};
of(1, 2, 3).pipe(retryWhen(() => syncRange)); // $ExpectType Observable<number>
});

it('should accept readable stream notifier', () => {
const readableStream = new ReadableStream<string>({
pull(controller) {
controller.enqueue('x');
controller.close();
},
});
of(1, 2, 3).pipe(retryWhen(() => readableStream)); // $ExpectType Observable<number>
});

it('should enforce types of the notifier', () => {
const o = of(1, 2, 3).pipe(retryWhen(() => 8)); // $ExpectError
of(1, 2, 3).pipe(retryWhen(() => 8)); // $ExpectError
});

it('should be deprecated', () => {
const o = of(1, 2, 3).pipe(retryWhen(() => of(true))); // $ExpectDeprecation
of(1, 2, 3).pipe(retryWhen(() => of(true))); // $ExpectDeprecation
});
15 changes: 9 additions & 6 deletions src/internal/operators/retryWhen.ts
@@ -1,14 +1,15 @@
import { Observable } from '../Observable';
import { innerFrom } from '../observable/innerFrom';
import { Subject } from '../Subject';
import { Subscription } from '../Subscription';

import { MonoTypeOperatorFunction } from '../types';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { createOperatorSubscriber } from './OperatorSubscriber';

/**
* Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable
* calls `error`, this method will emit the Throwable that caused the error to the Observable returned from `notifier`.
* calls `error`, this method will emit the Throwable that caused the error to the `ObservableInput` returned from `notifier`.
* If that Observable calls `complete` or `error` then this method will call `complete` or `error` on the child
* subscription. Otherwise this method will resubscribe to the source Observable.
*
Expand Down Expand Up @@ -55,13 +56,15 @@ import { createOperatorSubscriber } from './OperatorSubscriber';
*
* @see {@link retry}
*
* @param {function(errors: Observable): Observable} notifier - Receives an Observable of notifications with which a
* @param notifier Function that receives an Observable of notifications with which a
* user can `complete` or `error`, aborting the retry.
* @return A function that returns an Observable that mirrors the source
* @return A function that returns an `ObservableInput` that mirrors the source
* Observable with the exception of an `error`.
* @deprecated Will be removed in v9 or v10, use {@link retry}'s `delay` option instead.
* Will be removed in v9 or v10. Use {@link retry}'s {@link RetryConfig#delay delay} option instead.
* Instead of `retryWhen(() => notify$)`, use: `retry({ delay: () => notify$ })`.
*/
export function retryWhen<T>(notifier: (errors: Observable<any>) => Observable<any>): MonoTypeOperatorFunction<T> {
export function retryWhen<T>(notifier: (errors: Observable<any>) => ObservableInput<any>): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let innerSub: Subscription | null;
let syncResub = false;
Expand All @@ -72,7 +75,7 @@ export function retryWhen<T>(notifier: (errors: Observable<any>) => Observable<a
createOperatorSubscriber(subscriber, undefined, undefined, (err) => {
if (!errors$) {
errors$ = new Subject();
notifier(errors$).subscribe(
innerFrom(notifier(errors$)).subscribe(
createOperatorSubscriber(subscriber, () =>
// If we have an innerSub, this was an asynchronous call, kick off the retry.
// Otherwise, if we don't have an innerSub yet, that's because the inner subscription
Expand Down