Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(skipUntil): notifier should support ObservableInput #7091

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion spec-dtslint/operators/skipUntil-spec.ts
Expand Up @@ -7,5 +7,9 @@ it('should infer correctly', () => {

it('should enforce types', () => {
const o = of('foo', 'bar', 'baz').pipe(skipUntil()); // $ExpectError
const p = of('foo', 'bar', 'baz').pipe(skipUntil('7')); // $ExpectError
const p = of('foo', 'bar', 'baz').pipe(skipUntil(7)); // $ExpectError
});

it('should support Promises', () => {
of(1, 2, 3).pipe(skipUntil(Promise.resolve('foo'))); // $ExpectType Observable<number>
});
36 changes: 35 additions & 1 deletion spec/operators/skipUntil-spec.ts
@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { concat, defer, of, Subject, Observable } from 'rxjs';
import { concat, defer, of, Subject, Observable, interval } from 'rxjs';
import { skipUntil, mergeMap, take } from 'rxjs/operators';
import { asInteropObservable } from '../helpers/interop-helper';
import { TestScheduler } from 'rxjs/testing';
Expand Down Expand Up @@ -367,4 +367,38 @@ describe('skipUntil', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

it('should skip until Promise resolves', (done) => {
const e1 = interval(3).pipe(take(5));
const expected = [2, 3, 4];

e1.pipe(skipUntil(new Promise<void>((resolve) => setTimeout(() => resolve(), 8)))).subscribe({
next: (x) => {
expect(x).to.deep.equal(expected.shift());
},
error: () => done(new Error('should not be called')),
complete: () => {
expect(expected.length).to.equal(0);
done();
},
});
});

it('should raise error when Promise rejects', (done) => {
const e1 = interval(1).pipe(take(5));
const error = new Error('err');

e1.pipe(skipUntil(Promise.reject(error))).subscribe({
next: () => {
done(new Error('should not be called'));
},
error: (err: any) => {
expect(err).to.be.an('error');
done();
},
complete: () => {
done(new Error('should not be called'));
},
});
});
});
26 changes: 14 additions & 12 deletions src/internal/operators/skipUntil.ts
@@ -1,5 +1,4 @@
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction } from '../types';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { innerFrom } from '../observable/innerFrom';
Expand All @@ -8,19 +7,22 @@ import { noop } from '../util/noop';
/**
* Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
*
* The `skipUntil` operator causes the observable stream to skip the emission of values until the passed in observable emits the first value.
* This can be particularly useful in combination with user interactions, responses of http requests or waiting for specific times to pass by.
* The `skipUntil` operator causes the observable stream to skip the emission of values until the passed in observable
* emits the first value. This can be particularly useful in combination with user interactions, responses of HTTP
* requests or waiting for specific times to pass by.
*
* ![](skipUntil.png)
*
* Internally the `skipUntil` operator subscribes to the passed in observable (in the following called *notifier*) in order to recognize the emission
* of its first value. When this happens, the operator unsubscribes from the *notifier* and starts emitting the values of the *source*
* observable. It will never let the *source* observable emit any values if the *notifier* completes or throws an error without emitting
* a value before.
* Internally, the `skipUntil` operator subscribes to the passed in `notifier` `ObservableInput` (which gets converted
* to an Observable) in order to recognize the emission of its first value. When `notifier` emits next, the operator
* unsubscribes from it and starts emitting the values of the *source* observable until it completes or errors. It
* will never let the *source* observable emit any values if the `notifier` completes or throws an error without
* emitting a value before.
*
* ## Example
*
* In the following example, all emitted values of the interval observable are skipped until the user clicks anywhere within the page
* In the following example, all emitted values of the interval observable are skipped until the user clicks anywhere
* within the page
*
* ```ts
* import { interval, fromEvent, skipUntil } from 'rxjs';
Expand All @@ -41,13 +43,13 @@ import { noop } from '../util/noop';
* @see {@link skipWhile}
* @see {@link skipLast}
*
* @param {Observable} notifier - The second Observable that has to emit an item before the source Observable's elements begin to
* @param notifier An `ObservableInput` that has to emit an item before the source Observable elements begin to
* be mirrored by the resulting Observable.
* @return A function that returns an Observable that skips items from the
* source Observable until the second Observable emits an item, then emits the
* source Observable until the `notifier` Observable emits an item, then emits the
* remaining items.
*/
export function skipUntil<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T> {
export function skipUntil<T>(notifier: ObservableInput<any>): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let taking = false;

Expand Down