Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(window): windowBoundaries should support ObservableInput #7088

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion spec-dtslint/operators/window-spec.ts
Expand Up @@ -6,5 +6,10 @@ it('should infer correctly', () => {
});

it('should enforce types', () => {
of(1).pipe(window('')); // $ExpectError
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since String implements Iterable interface and since Iterable is declared in ObservableInput, window now accepts string as a valid input type. I didn't want to move this test to 'should infer correctly' section so I don't confuse anyone reading these tests.

of(1).pipe(window()); // $ExpectError
of(1).pipe(window(6)); // $ExpectError
});

it('should support Promises', () => {
of(1, 2, 3).pipe(window(Promise.resolve('foo'))); // $ExpectType Observable<Observable<number>>
});
44 changes: 42 additions & 2 deletions spec/operators/window-spec.ts
@@ -1,7 +1,8 @@
import { window, mergeMap } from 'rxjs/operators';
import { window, mergeMap, take } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { EMPTY, of, Observable } from 'rxjs';
import { EMPTY, of, Observable, interval } from 'rxjs';
import { observableMatcher } from '../helpers/observableMatcher';
import { expect } from 'chai';

/** @test {window} */
describe('window', () => {
Expand Down Expand Up @@ -280,4 +281,43 @@ describe('window', () => {
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});
});

it('should window when Promise resolves', (done) => {
const e1 = interval(3).pipe(take(5));
let pos = 0;
const result: number[][] = [[], []];
const expected = [
[0, 1],
[2, 3, 4],
];

e1.pipe(window(new Promise<void>((resolve) => setTimeout(() => resolve(), 8)))).subscribe({
next: (x) => {
x.subscribe({
next: (v) => result[pos].push(v),
complete: () => pos++,
});
},
error: () => done(new Error('should not be called')),
complete: () => {
expect(result).to.deep.equal(expected);
done();
},
});
});

it('should raise error when Promise rejects', (done) => {
const e1 = interval(1).pipe(take(5));
const error = new Error('err');

e1.pipe(window(Promise.reject(error))).subscribe({
error: (err) => {
expect(err).to.be.an('error');
done();
},
complete: () => {
done(new Error('should not be called'));
},
});
});
});
14 changes: 8 additions & 6 deletions src/internal/operators/window.ts
@@ -1,9 +1,10 @@
import { Observable } from '../Observable';
import { OperatorFunction } from '../types';
import { OperatorFunction, ObservableInput } from '../types';
import { Subject } from '../Subject';
import { operate } from '../util/lift';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { noop } from '../util/noop';
import { innerFrom } from '../observable/innerFrom';

/**
* Branch out the source Observable values as a nested Observable whenever
Expand All @@ -17,8 +18,9 @@ import { noop } from '../util/noop';
* Returns an Observable that emits windows of items it collects from the source
* Observable. The output Observable emits connected, non-overlapping
* windows. It emits the current window and opens a new one whenever the
* Observable `windowBoundaries` emits an item. Because each window is an
* Observable, the output is a higher-order Observable.
* `windowBoundaries` emits an item. `windowBoundaries` can be any type that
* `ObservableInput` accepts. It internally gets converted to an Observable.
* Because each window is an Observable, the output is a higher-order Observable.
*
* ## Example
*
Expand All @@ -43,12 +45,12 @@ import { noop } from '../util/noop';
* @see {@link windowWhen}
* @see {@link buffer}
*
* @param {Observable<any>} windowBoundaries An Observable that completes the
* @param windowBoundaries An `ObservableInput` that completes the
* previous window and starts a new window.
* @return A function that returns an Observable of windows, which are
* Observables emitting values of the source Observable.
*/
export function window<T>(windowBoundaries: Observable<any>): OperatorFunction<T, Observable<T>> {
export function window<T>(windowBoundaries: ObservableInput<any>): OperatorFunction<T, Observable<T>> {
return operate((source, subscriber) => {
let windowSubject: Subject<T> = new Subject<T>();

Expand All @@ -73,7 +75,7 @@ export function window<T>(windowBoundaries: Observable<any>): OperatorFunction<T
);

// Subscribe to the window boundaries.
windowBoundaries.subscribe(
innerFrom(windowBoundaries).subscribe(
createOperatorSubscriber(
subscriber,
() => {
Expand Down