Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sample): notifier supports ObservableInput #7104

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 69 additions & 3 deletions spec-dtslint/operators/sample-spec.ts
@@ -1,11 +1,77 @@
import { of } from 'rxjs';
import { sample } from 'rxjs/operators';
import { asInteropObservable } from '../../spec/helpers/interop-helper';

it('should enforce parameter', () => {
const a = of(1, 2, 3).pipe(sample()); // $ExpectError
of(1, 2, 3).pipe(sample()); // $ExpectError
});

it('should accept observable as notifier parameter', () => {
const a = of(1, 2, 3).pipe(sample(of(4))); // $ExpectType Observable<number>
const b = of(1, 2, 3).pipe(sample(of('a'))); // $ExpectType Observable<number>
of(1, 2, 3).pipe(sample(of(4))); // $ExpectType Observable<number>
of(1, 2, 3).pipe(sample(of('a'))); // $ExpectType Observable<number>
});

it('should accept interop observable notifier', () => {
of(1, 2, 3).pipe(sample(asInteropObservable(of(true)))); // $ExpectType Observable<number>
});

it('should accept promise notifier', () => {
of(1, 2, 3).pipe(sample(Promise.resolve(true))); // $ExpectType Observable<number>
});

it('should async iterable notifier', () => {
const asyncRange = {
from: 1,
to: 2,
[Symbol.asyncIterator]() {
return {
current: this.from,
last: this.to,
async next() {
await Promise.resolve();
const done = (this.current > this.last);
return {
done,
value: done ? this.current++ : undefined
};
}
};
}
};
of(1, 2, 3).pipe(sample(asyncRange)); // $ExpectType Observable<number>
});

it('should accept iterable notifier', () => {
const syncRange = {
from: 1,
to: 2,
[Symbol.iterator]() {
return {
current: this.from,
last: this.to,
next() {
const done = (this.current > this.last);
return {
done,
value: done ? this.current++ : undefined
};
}
};
}
};
of(1, 2, 3).pipe(sample(syncRange)); // $ExpectType Observable<number>
});

it('should accept readable stream notifier', () => {
const readableStream = new ReadableStream<string>({
pull(controller) {
controller.enqueue('x');
controller.close();
},
});
of(1, 2, 3).pipe(sample(readableStream)); // $ExpectType Observable<number>
});

it('should enforce types of the notifier', () => {
of(1, 2, 3).pipe(sample(8)); // $ExpectError
});
14 changes: 7 additions & 7 deletions src/internal/operators/sample.ts
@@ -1,5 +1,5 @@
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction } from '../types';
import { innerFrom } from '../observable/innerFrom';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { noop } from '../util/noop';
import { createOperatorSubscriber } from './OperatorSubscriber';
Expand All @@ -9,11 +9,11 @@ import { createOperatorSubscriber } from './OperatorSubscriber';
* another Observable, the `notifier`, emits.
*
* <span class="informal">It's like {@link sampleTime}, but samples whenever
* the `notifier` Observable emits something.</span>
* the `notifier` `ObservableInput` emits something.</span>
*
* ![](sample.png)
*
* Whenever the `notifier` Observable emits a value, `sample`
* Whenever the `notifier` `ObservableInput` emits a value, `sample`
* looks at the source Observable and emits whichever value it has most recently
* emitted since the previous sampling, unless the source has not emitted
* anything since the previous sampling. The `notifier` is subscribed to as soon
Expand All @@ -38,13 +38,13 @@ import { createOperatorSubscriber } from './OperatorSubscriber';
* @see {@link sampleTime}
* @see {@link throttle}
*
* @param notifier The Observable to use for sampling the
* @param notifier The `ObservableInput` to use for sampling the
* source Observable.
* @return A function that returns an Observable that emits the results of
* sampling the values emitted by the source Observable whenever the notifier
* Observable emits value or completes.
*/
export function sample<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T> {
export function sample<T>(notifier: ObservableInput<any>): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let hasValue = false;
let lastValue: T | null = null;
Expand All @@ -54,7 +54,7 @@ export function sample<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T
lastValue = value;
})
);
notifier.subscribe(
innerFrom(notifier).subscribe(
createOperatorSubscriber(
subscriber,
() => {
Expand Down