diff --git a/spec-dtslint/operators/buffer-spec.ts b/spec-dtslint/operators/buffer-spec.ts index 2d439a4834..d4b79e68de 100644 --- a/spec-dtslint/operators/buffer-spec.ts +++ b/spec-dtslint/operators/buffer-spec.ts @@ -9,3 +9,7 @@ it('should enforce types', () => { const o = of(1, 2, 3).pipe(buffer()); // $ExpectError const p = of(1, 2, 3).pipe(buffer(6)); // $ExpectError }); + +it('should support Promises', () => { + const o = of(1, 2, 3).pipe(buffer(Promise.resolve('foo'))); // $ExpectType Observable +}); diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 9e4c94ca21..44a66bf006 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -1,5 +1,5 @@ import { buffer, mergeMap, take, window, toArray } from 'rxjs/operators'; -import { EMPTY, NEVER, throwError, of, Subject } from 'rxjs'; +import { EMPTY, NEVER, throwError, of, Subject, interval } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; import { expect } from 'chai'; @@ -324,6 +324,43 @@ describe('Observable.prototype.buffer', () => { expect(results).to.deep.equal([[1], [2], [], 'complete']); }); + it('should buffer when Promise resolves', (done) => { + const e1 = interval(3).pipe(take(5)); + const expected = [ + [0, 1], + [2, 3, 4], + ]; + + e1.pipe(buffer(new Promise((resolve) => setTimeout(() => resolve(), 8)))).subscribe({ + next: (x) => { + expect(x).to.deep.equal(expected.shift()); + }, + error: () => done(new Error('should not be called')), + complete: () => { + expect(expected.length).to.equal(0); + done(); + }, + }); + }); + + it('should raise error when Promise rejects', (done) => { + const e1 = interval(1).pipe(take(5)); + const error = new Error('err'); + + e1.pipe(buffer(Promise.reject(error))).subscribe({ + next: () => { + done(new Error('should not be called')); + }, + error: (err: any) => { + expect(err).to.be.an('error'); + done(); + }, + complete: () => { + done(new Error('should not be called')); + }, + }); + }); + describe('equivalence with the window operator', () => { const cases = [ { diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index a30b881786..2ca2fdecd1 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -1,8 +1,8 @@ -import { Observable } from '../Observable'; -import { OperatorFunction } from '../types'; +import { OperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; import { noop } from '../util/noop'; import { createOperatorSubscriber } from './OperatorSubscriber'; +import { innerFrom } from '../observable/innerFrom'; /** * Buffers the source Observable values until `closingNotifier` emits. @@ -13,7 +13,8 @@ import { createOperatorSubscriber } from './OperatorSubscriber'; * ![](buffer.png) * * Buffers the incoming Observable values until the given `closingNotifier` - * Observable emits a value, at which point it emits the buffer on the output + * `ObservableInput` (that internally gets converted to an Observable) + * emits a value, at which point it emits the buffer on the output * Observable and starts a new buffer internally, awaiting the next time * `closingNotifier` emits. * @@ -36,12 +37,12 @@ import { createOperatorSubscriber } from './OperatorSubscriber'; * @see {@link bufferWhen} * @see {@link window} * - * @param {Observable} closingNotifier An Observable that signals the + * @param closingNotifier An `ObservableInput` that signals the * buffer to be emitted on the output Observable. * @return A function that returns an Observable of buffers, which are arrays * of values. */ -export function buffer(closingNotifier: Observable): OperatorFunction { +export function buffer(closingNotifier: ObservableInput): OperatorFunction { return operate((source, subscriber) => { // The current buffered values. let currentBuffer: T[] = []; @@ -59,7 +60,7 @@ export function buffer(closingNotifier: Observable): OperatorFunction {