From 5206fc03843986afadb442b0165a6d8eac467f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mladen=20Jakovljevi=C4=87?= Date: Thu, 29 Sep 2022 12:14:29 +0200 Subject: [PATCH 1/2] feat(buffer): closingNotifier should support ObservableInput --- spec-dtslint/operators/buffer-spec.ts | 5 ++++ spec/operators/buffer-spec.ts | 39 ++++++++++++++++++++++++++- src/internal/operators/buffer.ts | 13 ++++----- 3 files changed, 50 insertions(+), 7 deletions(-) diff --git a/spec-dtslint/operators/buffer-spec.ts b/spec-dtslint/operators/buffer-spec.ts index 2d439a4834..2acb64d146 100644 --- a/spec-dtslint/operators/buffer-spec.ts +++ b/spec-dtslint/operators/buffer-spec.ts @@ -9,3 +9,8 @@ it('should enforce types', () => { const o = of(1, 2, 3).pipe(buffer()); // $ExpectError const p = of(1, 2, 3).pipe(buffer(6)); // $ExpectError }); + +it('should support Promises', () => { + const o = of(1, 2, 3).pipe(buffer(Promise.resolve('foo'))); // $ExpectType Observable + const p = of(1, 2, 3).pipe(buffer(async () => {})); // $ExpectType Observable +}); diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 9e4c94ca21..44a66bf006 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -1,5 +1,5 @@ import { buffer, mergeMap, take, window, toArray } from 'rxjs/operators'; -import { EMPTY, NEVER, throwError, of, Subject } from 'rxjs'; +import { EMPTY, NEVER, throwError, of, Subject, interval } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; import { expect } from 'chai'; @@ -324,6 +324,43 @@ describe('Observable.prototype.buffer', () => { expect(results).to.deep.equal([[1], [2], [], 'complete']); }); + it('should buffer when Promise resolves', (done) => { + const e1 = interval(3).pipe(take(5)); + const expected = [ + [0, 1], + [2, 3, 4], + ]; + + e1.pipe(buffer(new Promise((resolve) => setTimeout(() => resolve(), 8)))).subscribe({ + next: (x) => { + expect(x).to.deep.equal(expected.shift()); + }, + error: () => done(new Error('should not be called')), + complete: () => { + expect(expected.length).to.equal(0); + done(); + }, + }); + }); + + it('should raise error when Promise rejects', (done) => { + const e1 = interval(1).pipe(take(5)); + const error = new Error('err'); + + e1.pipe(buffer(Promise.reject(error))).subscribe({ + next: () => { + done(new Error('should not be called')); + }, + error: (err: any) => { + expect(err).to.be.an('error'); + done(); + }, + complete: () => { + done(new Error('should not be called')); + }, + }); + }); + describe('equivalence with the window operator', () => { const cases = [ { diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index a30b881786..2ca2fdecd1 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -1,8 +1,8 @@ -import { Observable } from '../Observable'; -import { OperatorFunction } from '../types'; +import { OperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; import { noop } from '../util/noop'; import { createOperatorSubscriber } from './OperatorSubscriber'; +import { innerFrom } from '../observable/innerFrom'; /** * Buffers the source Observable values until `closingNotifier` emits. @@ -13,7 +13,8 @@ import { createOperatorSubscriber } from './OperatorSubscriber'; * ![](buffer.png) * * Buffers the incoming Observable values until the given `closingNotifier` - * Observable emits a value, at which point it emits the buffer on the output + * `ObservableInput` (that internally gets converted to an Observable) + * emits a value, at which point it emits the buffer on the output * Observable and starts a new buffer internally, awaiting the next time * `closingNotifier` emits. * @@ -36,12 +37,12 @@ import { createOperatorSubscriber } from './OperatorSubscriber'; * @see {@link bufferWhen} * @see {@link window} * - * @param {Observable} closingNotifier An Observable that signals the + * @param closingNotifier An `ObservableInput` that signals the * buffer to be emitted on the output Observable. * @return A function that returns an Observable of buffers, which are arrays * of values. */ -export function buffer(closingNotifier: Observable): OperatorFunction { +export function buffer(closingNotifier: ObservableInput): OperatorFunction { return operate((source, subscriber) => { // The current buffered values. let currentBuffer: T[] = []; @@ -59,7 +60,7 @@ export function buffer(closingNotifier: Observable): OperatorFunction { From 06f3eafe6d1ac4a364c8cb703d2a778aef6073b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mladen=20Jakovljevi=C4=87?= Date: Mon, 17 Oct 2022 21:54:44 +0200 Subject: [PATCH 2/2] chore(buffer): remove bad test --- spec-dtslint/operators/buffer-spec.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/spec-dtslint/operators/buffer-spec.ts b/spec-dtslint/operators/buffer-spec.ts index 2acb64d146..d4b79e68de 100644 --- a/spec-dtslint/operators/buffer-spec.ts +++ b/spec-dtslint/operators/buffer-spec.ts @@ -12,5 +12,4 @@ it('should enforce types', () => { it('should support Promises', () => { const o = of(1, 2, 3).pipe(buffer(Promise.resolve('foo'))); // $ExpectType Observable - const p = of(1, 2, 3).pipe(buffer(async () => {})); // $ExpectType Observable });