From 794f8064cf8fe754e9dfebeee0ffef0ac1562252 Mon Sep 17 00:00:00 2001 From: Jeremy Date: Thu, 15 Dec 2022 18:02:47 -0500 Subject: [PATCH] feat(retryWhen): `notifier` now supports any `ObservableInput` (#7105) * feat(retryWhen): retryWhen's notifier supports ObservableInput * test(retryWhen): added tests for ObservableInput notifier * chore(retryWhen): updated docs grammar * Delete index.d.ts * Delete index.d.ts Co-authored-by: Ben Lesh --- spec-dtslint/operators/retryWhen-spec.ts | 66 +++++++++++++++++++++++- src/internal/operators/retryWhen.ts | 15 +++--- 2 files changed, 73 insertions(+), 8 deletions(-) diff --git a/spec-dtslint/operators/retryWhen-spec.ts b/spec-dtslint/operators/retryWhen-spec.ts index 14cef47877..2528ded366 100644 --- a/spec-dtslint/operators/retryWhen-spec.ts +++ b/spec-dtslint/operators/retryWhen-spec.ts @@ -1,5 +1,6 @@ import { of } from 'rxjs'; import { retryWhen } from 'rxjs/operators'; +import { asInteropObservable } from '../../spec/helpers/interop-helper'; it('should infer correctly', () => { const o = of(1, 2, 3).pipe(retryWhen(errors => errors)); // $ExpectType Observable @@ -13,10 +14,71 @@ it('should enforce types', () => { const o = of(1, 2, 3).pipe(retryWhen()); // $ExpectError }); +it('should accept interop observable notifier', () => { + of(1, 2, 3).pipe(retryWhen(() => asInteropObservable(of(true)))); // $ExpectType Observable +}); + +it('should accept promise notifier', () => { + of(1, 2, 3).pipe(retryWhen(() => Promise.resolve(true))); // $ExpectType Observable +}); + +it('should async iterable notifier', () => { + const asyncRange = { + from: 1, + to: 2, + [Symbol.asyncIterator]() { + return { + current: this.from, + last: this.to, + async next() { + await Promise.resolve(); + const done = (this.current > this.last); + return { + done, + value: done ? this.current++ : undefined + }; + } + }; + } + }; + of(1, 2, 3).pipe(retryWhen(() => asyncRange)); // $ExpectType Observable +}); + +it('should accept iterable notifier', () => { + const syncRange = { + from: 1, + to: 2, + [Symbol.iterator]() { + return { + current: this.from, + last: this.to, + next() { + const done = (this.current > this.last); + return { + done, + value: done ? this.current++ : undefined + }; + } + }; + } + }; + of(1, 2, 3).pipe(retryWhen(() => syncRange)); // $ExpectType Observable +}); + +it('should accept readable stream notifier', () => { + const readableStream = new ReadableStream({ + pull(controller) { + controller.enqueue('x'); + controller.close(); + }, + }); + of(1, 2, 3).pipe(retryWhen(() => readableStream)); // $ExpectType Observable +}); + it('should enforce types of the notifier', () => { - const o = of(1, 2, 3).pipe(retryWhen(() => 8)); // $ExpectError + of(1, 2, 3).pipe(retryWhen(() => 8)); // $ExpectError }); it('should be deprecated', () => { - const o = of(1, 2, 3).pipe(retryWhen(() => of(true))); // $ExpectDeprecation + of(1, 2, 3).pipe(retryWhen(() => of(true))); // $ExpectDeprecation }); \ No newline at end of file diff --git a/src/internal/operators/retryWhen.ts b/src/internal/operators/retryWhen.ts index 31d6a6ceb7..8090e5f619 100644 --- a/src/internal/operators/retryWhen.ts +++ b/src/internal/operators/retryWhen.ts @@ -1,14 +1,15 @@ import { Observable } from '../Observable'; +import { innerFrom } from '../observable/innerFrom'; import { Subject } from '../Subject'; import { Subscription } from '../Subscription'; -import { MonoTypeOperatorFunction } from '../types'; +import { MonoTypeOperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; import { createOperatorSubscriber } from './OperatorSubscriber'; /** * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable - * calls `error`, this method will emit the Throwable that caused the error to the Observable returned from `notifier`. + * calls `error`, this method will emit the Throwable that caused the error to the `ObservableInput` returned from `notifier`. * If that Observable calls `complete` or `error` then this method will call `complete` or `error` on the child * subscription. Otherwise this method will resubscribe to the source Observable. * @@ -55,13 +56,15 @@ import { createOperatorSubscriber } from './OperatorSubscriber'; * * @see {@link retry} * - * @param {function(errors: Observable): Observable} notifier - Receives an Observable of notifications with which a + * @param notifier Function that receives an Observable of notifications with which a * user can `complete` or `error`, aborting the retry. - * @return A function that returns an Observable that mirrors the source + * @return A function that returns an `ObservableInput` that mirrors the source * Observable with the exception of an `error`. * @deprecated Will be removed in v9 or v10, use {@link retry}'s `delay` option instead. + * Will be removed in v9 or v10. Use {@link retry}'s {@link RetryConfig#delay delay} option instead. + * Instead of `retryWhen(() => notify$)`, use: `retry({ delay: () => notify$ })`. */ -export function retryWhen(notifier: (errors: Observable) => Observable): MonoTypeOperatorFunction { +export function retryWhen(notifier: (errors: Observable) => ObservableInput): MonoTypeOperatorFunction { return operate((source, subscriber) => { let innerSub: Subscription | null; let syncResub = false; @@ -72,7 +75,7 @@ export function retryWhen(notifier: (errors: Observable) => Observable { if (!errors$) { errors$ = new Subject(); - notifier(errors$).subscribe( + innerFrom(notifier(errors$)).subscribe( createOperatorSubscriber(subscriber, () => // If we have an innerSub, this was an asynchronous call, kick off the retry. // Otherwise, if we don't have an innerSub yet, that's because the inner subscription