From b18c2eb2bc8dc1a717c927f998028316eec83937 Mon Sep 17 00:00:00 2001 From: Jeremy Date: Thu, 15 Dec 2022 18:02:23 -0500 Subject: [PATCH] feat(sample): `notifier` now supports any `ObservableInput` (#7104) * feat(sample): add ObservableInput support in notifier * test(sample): added tests for sample's ObservableInput notifier * chore(sample): cleanup unused declaration * Delete index.d.ts * Delete index.d.ts Co-authored-by: Ben Lesh --- spec-dtslint/operators/sample-spec.ts | 72 +++++++++++++++++++++++++-- src/internal/operators/sample.ts | 14 +++--- 2 files changed, 76 insertions(+), 10 deletions(-) diff --git a/spec-dtslint/operators/sample-spec.ts b/spec-dtslint/operators/sample-spec.ts index 48b011f0b9..647e00a88e 100644 --- a/spec-dtslint/operators/sample-spec.ts +++ b/spec-dtslint/operators/sample-spec.ts @@ -1,11 +1,77 @@ import { of } from 'rxjs'; import { sample } from 'rxjs/operators'; +import { asInteropObservable } from '../../spec/helpers/interop-helper'; it('should enforce parameter', () => { - const a = of(1, 2, 3).pipe(sample()); // $ExpectError + of(1, 2, 3).pipe(sample()); // $ExpectError }); it('should accept observable as notifier parameter', () => { - const a = of(1, 2, 3).pipe(sample(of(4))); // $ExpectType Observable - const b = of(1, 2, 3).pipe(sample(of('a'))); // $ExpectType Observable + of(1, 2, 3).pipe(sample(of(4))); // $ExpectType Observable + of(1, 2, 3).pipe(sample(of('a'))); // $ExpectType Observable +}); + +it('should accept interop observable notifier', () => { + of(1, 2, 3).pipe(sample(asInteropObservable(of(true)))); // $ExpectType Observable +}); + +it('should accept promise notifier', () => { + of(1, 2, 3).pipe(sample(Promise.resolve(true))); // $ExpectType Observable +}); + +it('should async iterable notifier', () => { + const asyncRange = { + from: 1, + to: 2, + [Symbol.asyncIterator]() { + return { + current: this.from, + last: this.to, + async next() { + await Promise.resolve(); + const done = (this.current > this.last); + return { + done, + value: done ? this.current++ : undefined + }; + } + }; + } + }; + of(1, 2, 3).pipe(sample(asyncRange)); // $ExpectType Observable +}); + +it('should accept iterable notifier', () => { + const syncRange = { + from: 1, + to: 2, + [Symbol.iterator]() { + return { + current: this.from, + last: this.to, + next() { + const done = (this.current > this.last); + return { + done, + value: done ? this.current++ : undefined + }; + } + }; + } + }; + of(1, 2, 3).pipe(sample(syncRange)); // $ExpectType Observable +}); + +it('should accept readable stream notifier', () => { + const readableStream = new ReadableStream({ + pull(controller) { + controller.enqueue('x'); + controller.close(); + }, + }); + of(1, 2, 3).pipe(sample(readableStream)); // $ExpectType Observable +}); + +it('should enforce types of the notifier', () => { + of(1, 2, 3).pipe(sample(8)); // $ExpectError }); diff --git a/src/internal/operators/sample.ts b/src/internal/operators/sample.ts index 96854056ca..9008af2f07 100644 --- a/src/internal/operators/sample.ts +++ b/src/internal/operators/sample.ts @@ -1,5 +1,5 @@ -import { Observable } from '../Observable'; -import { MonoTypeOperatorFunction } from '../types'; +import { innerFrom } from '../observable/innerFrom'; +import { MonoTypeOperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; import { noop } from '../util/noop'; import { createOperatorSubscriber } from './OperatorSubscriber'; @@ -9,11 +9,11 @@ import { createOperatorSubscriber } from './OperatorSubscriber'; * another Observable, the `notifier`, emits. * * It's like {@link sampleTime}, but samples whenever - * the `notifier` Observable emits something. + * the `notifier` `ObservableInput` emits something. * * ![](sample.png) * - * Whenever the `notifier` Observable emits a value, `sample` + * Whenever the `notifier` `ObservableInput` emits a value, `sample` * looks at the source Observable and emits whichever value it has most recently * emitted since the previous sampling, unless the source has not emitted * anything since the previous sampling. The `notifier` is subscribed to as soon @@ -38,13 +38,13 @@ import { createOperatorSubscriber } from './OperatorSubscriber'; * @see {@link sampleTime} * @see {@link throttle} * - * @param notifier The Observable to use for sampling the + * @param notifier The `ObservableInput` to use for sampling the * source Observable. * @return A function that returns an Observable that emits the results of * sampling the values emitted by the source Observable whenever the notifier * Observable emits value or completes. */ -export function sample(notifier: Observable): MonoTypeOperatorFunction { +export function sample(notifier: ObservableInput): MonoTypeOperatorFunction { return operate((source, subscriber) => { let hasValue = false; let lastValue: T | null = null; @@ -54,7 +54,7 @@ export function sample(notifier: Observable): MonoTypeOperatorFunction {