Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(repeatWhen): notifier supports ObservableInput #7103

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 67 additions & 5 deletions spec-dtslint/operators/repeatWhen-spec.ts
@@ -1,22 +1,84 @@
import { of } from 'rxjs';
import { repeatWhen } from 'rxjs/operators';
import { asInteropObservable } from '../../spec/helpers/interop-helper';

it('should infer correctly', () => {
const o = of(1, 2, 3).pipe(repeatWhen(errors => errors)); // $ExpectType Observable<number>
of(1, 2, 3).pipe(repeatWhen(errors => errors)); // $ExpectType Observable<number>
});

it('should infer correctly when the error observable has a different type', () => {
const o = of(1, 2, 3).pipe(repeatWhen(repeatWhen(errors => of('a', 'b', 'c')))); // $ExpectType Observable<number>
of(1, 2, 3).pipe(repeatWhen(errors => asInteropObservable(of('a', 'b', 'c')))); // $ExpectType Observable<number>
});

it('should enforce types', () => {
const o = of(1, 2, 3).pipe(repeatWhen()); // $ExpectError
of(1, 2, 3).pipe(repeatWhen()); // $ExpectError
});

it('should accept interop observable notifier', () => {
of(1, 2, 3).pipe(repeatWhen(() => asInteropObservable(of(true)))); // $ExpectType Observable<number>
});

it('should accept promise notifier', () => {
of(1, 2, 3).pipe(repeatWhen(() => Promise.resolve(true))); // $ExpectType Observable<number>
});

it('should async iterable notifier', () => {
const asyncRange = {
from: 1,
to: 2,
[Symbol.asyncIterator]() {
return {
current: this.from,
last: this.to,
async next() {
await Promise.resolve();
const done = (this.current > this.last);
return {
done,
value: done ? this.current++ : undefined
};
}
};
}
};
of(1, 2, 3).pipe(repeatWhen(() => asyncRange)); // $ExpectType Observable<number>
});

it('should accept iterable notifier', () => {
const syncRange = {
from: 1,
to: 2,
[Symbol.iterator]() {
return {
current: this.from,
last: this.to,
next() {
const done = (this.current > this.last);
return {
done,
value: done ? this.current++ : undefined
};
}
};
}
};
of(1, 2, 3).pipe(repeatWhen(() => syncRange)); // $ExpectType Observable<number>
});

it('should accept readable stream notifier', () => {
const readableStream = new ReadableStream<string>({
pull(controller) {
controller.enqueue('x');
controller.close();
},
});
of(1, 2, 3).pipe(repeatWhen(() => readableStream)); // $ExpectType Observable<number>
});

it('should enforce types of the notifier', () => {
const o = of(1, 2, 3).pipe(repeatWhen(() => 8)); // $ExpectError
of(1, 2, 3).pipe(repeatWhen(() => 8)); // $ExpectError
});

it('should be deprecated', () => {
const o = of(1, 2, 3).pipe(repeatWhen(() => of(true))); // $ExpectDeprecation
of(1, 2, 3).pipe(repeatWhen(() => of(true))); // $ExpectDeprecation
});
2 changes: 1 addition & 1 deletion src/index.ts
Expand Up @@ -164,7 +164,7 @@ export { publishLast } from './internal/operators/publishLast';
export { publishReplay } from './internal/operators/publishReplay';
export { raceWith } from './internal/operators/raceWith';
export { reduce } from './internal/operators/reduce';
export { repeat } from './internal/operators/repeat';
export { repeat, RepeatConfig } from './internal/operators/repeat';
export { repeatWhen } from './internal/operators/repeatWhen';
export { retry, RetryConfig } from './internal/operators/retry';
export { retryWhen } from './internal/operators/retryWhen';
Expand Down
14 changes: 8 additions & 6 deletions src/internal/operators/repeatWhen.ts
@@ -1,8 +1,9 @@
import { Observable } from '../Observable';
import { innerFrom } from '../observable/innerFrom';
import { Subject } from '../Subject';
import { Subscription } from '../Subscription';

import { MonoTypeOperatorFunction } from '../types';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { createOperatorSubscriber } from './OperatorSubscriber';

Expand Down Expand Up @@ -33,13 +34,14 @@ import { createOperatorSubscriber } from './OperatorSubscriber';
* @see {@link retry}
* @see {@link retryWhen}
*
* @param {function(notifications: Observable): Observable} notifier - Receives an Observable of notifications with
* @param notifier Function that receives an Observable of notifications with
* which a user can `complete` or `error`, aborting the repetition.
* @return A function that returns an Observable that mirrors the source
* @return A function that returns an `ObservableInput` that mirrors the source
* Observable with the exception of a `complete`.
* @deprecated Will be removed in v9 or v10. Use {@link repeat}'s `delay` option instead.
* @deprecated Will be removed in v9 or v10. Use {@link repeat}'s {@link RepeatConfig#delay delay} option instead.
* Instead of `repeatWhen(() => notify$)`, use: `repeat({ delay: () => notify$ })`.
*/
export function repeatWhen<T>(notifier: (notifications: Observable<void>) => Observable<any>): MonoTypeOperatorFunction<T> {
export function repeatWhen<T>(notifier: (notifications: Observable<void>) => ObservableInput<any>): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let innerSub: Subscription | null;
let syncResub = false;
Expand All @@ -61,7 +63,7 @@ export function repeatWhen<T>(notifier: (notifications: Observable<void>) => Obs

// If the call to `notifier` throws, it will be caught by the OperatorSubscriber
// In the main subscription -- in `subscribeForRepeatWhen`.
notifier(completions$).subscribe(
innerFrom(notifier(completions$)).subscribe(
createOperatorSubscriber(
subscriber,
() => {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/index.ts
Expand Up @@ -68,7 +68,7 @@ export { publishReplay } from '../internal/operators/publishReplay';
export { race } from '../internal/operators/race';
export { raceWith } from '../internal/operators/raceWith';
export { reduce } from '../internal/operators/reduce';
export { repeat } from '../internal/operators/repeat';
export { repeat, RepeatConfig } from '../internal/operators/repeat';
export { repeatWhen } from '../internal/operators/repeatWhen';
export { retry, RetryConfig } from '../internal/operators/retry';
export { retryWhen } from '../internal/operators/retryWhen';
Expand Down