From e6da6563e3853639d9f7e994f0b9486b07e9bbbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mladen=20Jakovljevi=C4=87?= Date: Thu, 20 Oct 2022 11:36:25 +0200 Subject: [PATCH] feat(skipUntil): notifier should support ObservableInput --- spec-dtslint/operators/skipUntil-spec.ts | 6 +++- spec/operators/skipUntil-spec.ts | 36 +++++++++++++++++++++++- src/internal/operators/skipUntil.ts | 26 +++++++++-------- 3 files changed, 54 insertions(+), 14 deletions(-) diff --git a/spec-dtslint/operators/skipUntil-spec.ts b/spec-dtslint/operators/skipUntil-spec.ts index 2c550614ab..72d8f423ac 100644 --- a/spec-dtslint/operators/skipUntil-spec.ts +++ b/spec-dtslint/operators/skipUntil-spec.ts @@ -7,5 +7,9 @@ it('should infer correctly', () => { it('should enforce types', () => { const o = of('foo', 'bar', 'baz').pipe(skipUntil()); // $ExpectError - const p = of('foo', 'bar', 'baz').pipe(skipUntil('7')); // $ExpectError + const p = of('foo', 'bar', 'baz').pipe(skipUntil(7)); // $ExpectError +}); + +it('should support Promises', () => { + of(1, 2, 3).pipe(skipUntil(Promise.resolve('foo'))); // $ExpectType Observable }); diff --git a/spec/operators/skipUntil-spec.ts b/spec/operators/skipUntil-spec.ts index 9829c33a24..65d5695ecd 100644 --- a/spec/operators/skipUntil-spec.ts +++ b/spec/operators/skipUntil-spec.ts @@ -1,5 +1,5 @@ import { expect } from 'chai'; -import { concat, defer, of, Subject, Observable } from 'rxjs'; +import { concat, defer, of, Subject, Observable, interval } from 'rxjs'; import { skipUntil, mergeMap, take } from 'rxjs/operators'; import { asInteropObservable } from '../helpers/interop-helper'; import { TestScheduler } from 'rxjs/testing'; @@ -367,4 +367,38 @@ describe('skipUntil', () => { expect(sideEffects).to.deep.equal([0, 1, 2]); }); + + it('should skip until Promise resolves', (done) => { + const e1 = interval(3).pipe(take(5)); + const expected = [2, 3, 4]; + + e1.pipe(skipUntil(new Promise((resolve) => setTimeout(() => resolve(), 8)))).subscribe({ + next: (x) => { + expect(x).to.deep.equal(expected.shift()); + }, + error: () => done(new Error('should not be called')), + complete: () => { + expect(expected.length).to.equal(0); + done(); + }, + }); + }); + + it('should raise error when Promise rejects', (done) => { + const e1 = interval(1).pipe(take(5)); + const error = new Error('err'); + + e1.pipe(skipUntil(Promise.reject(error))).subscribe({ + next: () => { + done(new Error('should not be called')); + }, + error: (err: any) => { + expect(err).to.be.an('error'); + done(); + }, + complete: () => { + done(new Error('should not be called')); + }, + }); + }); }); diff --git a/src/internal/operators/skipUntil.ts b/src/internal/operators/skipUntil.ts index 38fcdb930c..e6984e5189 100644 --- a/src/internal/operators/skipUntil.ts +++ b/src/internal/operators/skipUntil.ts @@ -1,5 +1,4 @@ -import { Observable } from '../Observable'; -import { MonoTypeOperatorFunction } from '../types'; +import { MonoTypeOperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; import { createOperatorSubscriber } from './OperatorSubscriber'; import { innerFrom } from '../observable/innerFrom'; @@ -8,19 +7,22 @@ import { noop } from '../util/noop'; /** * Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item. * - * The `skipUntil` operator causes the observable stream to skip the emission of values until the passed in observable emits the first value. - * This can be particularly useful in combination with user interactions, responses of http requests or waiting for specific times to pass by. + * The `skipUntil` operator causes the observable stream to skip the emission of values until the passed in observable + * emits the first value. This can be particularly useful in combination with user interactions, responses of HTTP + * requests or waiting for specific times to pass by. * * ![](skipUntil.png) * - * Internally the `skipUntil` operator subscribes to the passed in observable (in the following called *notifier*) in order to recognize the emission - * of its first value. When this happens, the operator unsubscribes from the *notifier* and starts emitting the values of the *source* - * observable. It will never let the *source* observable emit any values if the *notifier* completes or throws an error without emitting - * a value before. + * Internally, the `skipUntil` operator subscribes to the passed in `notifier` `ObservableInput` (which gets converted + * to an Observable) in order to recognize the emission of its first value. When `notifier` emits next, the operator + * unsubscribes from it and starts emitting the values of the *source* observable until it completes or errors. It + * will never let the *source* observable emit any values if the `notifier` completes or throws an error without + * emitting a value before. * * ## Example * - * In the following example, all emitted values of the interval observable are skipped until the user clicks anywhere within the page + * In the following example, all emitted values of the interval observable are skipped until the user clicks anywhere + * within the page * * ```ts * import { interval, fromEvent, skipUntil } from 'rxjs'; @@ -41,13 +43,13 @@ import { noop } from '../util/noop'; * @see {@link skipWhile} * @see {@link skipLast} * - * @param {Observable} notifier - The second Observable that has to emit an item before the source Observable's elements begin to + * @param notifier An `ObservableInput` that has to emit an item before the source Observable elements begin to * be mirrored by the resulting Observable. * @return A function that returns an Observable that skips items from the - * source Observable until the second Observable emits an item, then emits the + * source Observable until the `notifier` Observable emits an item, then emits the * remaining items. */ -export function skipUntil(notifier: Observable): MonoTypeOperatorFunction { +export function skipUntil(notifier: ObservableInput): MonoTypeOperatorFunction { return operate((source, subscriber) => { let taking = false;