Skip to content

Commit

Permalink
feat(delayWhen): delayWhen's delayDurationSelector should support…
Browse files Browse the repository at this point in the history
… `ObservableInput`
  • Loading branch information
jakovljevic-mladen committed Aug 29, 2022
1 parent 4afbc16 commit 0560d15
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 17 deletions.
4 changes: 2 additions & 2 deletions api_guard/dist/types/index.d.ts
Expand Up @@ -155,8 +155,8 @@ export declare function defer<R extends ObservableInput<any>>(observableFactory:

export declare function delay<T>(due: number | Date, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;

export declare function delayWhen<T>(delayDurationSelector: (value: T, index: number) => Observable<any>, subscriptionDelay: Observable<any>): MonoTypeOperatorFunction<T>;
export declare function delayWhen<T>(delayDurationSelector: (value: T, index: number) => Observable<any>): MonoTypeOperatorFunction<T>;
export declare function delayWhen<T>(delayDurationSelector: (value: T, index: number) => ObservableInput<any>, subscriptionDelay: Observable<any>): MonoTypeOperatorFunction<T>;
export declare function delayWhen<T>(delayDurationSelector: (value: T, index: number) => ObservableInput<any>): MonoTypeOperatorFunction<T>;

export declare function dematerialize<N extends ObservableNotification<any>>(): OperatorFunction<N, ValueFromNotification<N>>;

Expand Down
4 changes: 2 additions & 2 deletions api_guard/dist/types/operators/index.d.ts
Expand Up @@ -67,8 +67,8 @@ export declare function defaultIfEmpty<T, R>(defaultValue: R): OperatorFunction<

export declare function delay<T>(due: number | Date, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;

export declare function delayWhen<T>(delayDurationSelector: (value: T, index: number) => Observable<any>, subscriptionDelay: Observable<any>): MonoTypeOperatorFunction<T>;
export declare function delayWhen<T>(delayDurationSelector: (value: T, index: number) => Observable<any>): MonoTypeOperatorFunction<T>;
export declare function delayWhen<T>(delayDurationSelector: (value: T, index: number) => ObservableInput<any>, subscriptionDelay: Observable<any>): MonoTypeOperatorFunction<T>;
export declare function delayWhen<T>(delayDurationSelector: (value: T, index: number) => ObservableInput<any>): MonoTypeOperatorFunction<T>;

export declare function dematerialize<N extends ObservableNotification<any>>(): OperatorFunction<N, ValueFromNotification<N>>;

Expand Down
29 changes: 16 additions & 13 deletions src/internal/operators/delayWhen.ts
@@ -1,17 +1,18 @@
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction } from '../types';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
import { concat } from '../observable/concat';
import { take } from './take';
import { ignoreElements } from './ignoreElements';
import { mapTo } from './mapTo';
import { mergeMap } from './mergeMap';
import { innerFrom } from '../observable/innerFrom';

/** @deprecated The `subscriptionDelay` parameter will be removed in v8. */
export function delayWhen<T>(
delayDurationSelector: (value: T, index: number) => Observable<any>,
delayDurationSelector: (value: T, index: number) => ObservableInput<any>,
subscriptionDelay: Observable<any>
): MonoTypeOperatorFunction<T>;
export function delayWhen<T>(delayDurationSelector: (value: T, index: number) => Observable<any>): MonoTypeOperatorFunction<T>;
export function delayWhen<T>(delayDurationSelector: (value: T, index: number) => ObservableInput<any>): MonoTypeOperatorFunction<T>;

/**
* Delays the emission of items from the source Observable by a given time span
Expand All @@ -26,8 +27,9 @@ export function delayWhen<T>(delayDurationSelector: (value: T, index: number) =>
* a time span determined by another Observable. When the source emits a value,
* the `delayDurationSelector` function is called with the value emitted from
* the source Observable as the first argument to the `delayDurationSelector`.
* The `delayDurationSelector` function should return an Observable, called
* the "duration" Observable.
* The `delayDurationSelector` function should return an {@link ObservableInput},
* that is internally converted to an Observable that is called the "duration"
* Observable.
*
* The source value is emitted on the output Observable only when the "duration"
* Observable emits ({@link guide/glossary-and-semantics#next next}s) any value.
Expand Down Expand Up @@ -76,18 +78,19 @@ export function delayWhen<T>(delayDurationSelector: (value: T, index: number) =>
* @see {@link audit}
* @see {@link auditTime}
*
* @param {function(value: T, index: number): Observable} delayDurationSelector A function that
* returns an Observable for each value emitted by the source Observable, which
* is then used to delay the emission of that item on the output Observable
* until the Observable returned from this function emits a value.
* @param {Observable} subscriptionDelay An Observable that triggers the
* subscription to the source Observable once it emits any value.
* @param delayDurationSelector A function that returns an `ObservableInput` for
* each `value` emitted by the source Observable, which is then used to delay the
* emission of that `value` on the output Observable until the `ObservableInput`
* returned from this function emits a next value. When called, beside `value`,
* this function receives a zero-based `index` of the emission order.
* @param subscriptionDelay An Observable that triggers the subscription to the
* source Observable once it emits any value.
* @return A function that returns an Observable that delays the emissions of
* the source Observable by an amount of time specified by the Observable
* returned by `delayDurationSelector`.
*/
export function delayWhen<T>(
delayDurationSelector: (value: T, index: number) => Observable<any>,
delayDurationSelector: (value: T, index: number) => ObservableInput<any>,
subscriptionDelay?: Observable<any>
): MonoTypeOperatorFunction<T> {
if (subscriptionDelay) {
Expand All @@ -96,5 +99,5 @@ export function delayWhen<T>(
concat(subscriptionDelay.pipe(take(1), ignoreElements()), source.pipe(delayWhen(delayDurationSelector)));
}

return mergeMap((value, index) => delayDurationSelector(value, index).pipe(take(1), mapTo(value)));
return mergeMap((value, index) => innerFrom(delayDurationSelector(value, index)).pipe(take(1), mapTo(value)));
}

0 comments on commit 0560d15

Please sign in to comment.