From ab9e170da4b6e0ef5cf6bed0e9530e973c99ad11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mladen=20Jakovljevi=C4=87?= Date: Mon, 13 Feb 2023 09:32:57 +0100 Subject: [PATCH] fix(buffer): subscribe to the closing notifier before the source --- spec/operators/buffer-spec.ts | 6 +++--- src/internal/operators/buffer.ts | 26 +++++++++++++------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 44a66bf006..debb7fa9d5 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -317,11 +317,11 @@ describe('Observable.prototype.buffer', () => { }); subject.next(1); - expect(results).to.deep.equal([[1]]); + expect(results).to.deep.equal([[]]); subject.next(2); - expect(results).to.deep.equal([[1], [2]]); + expect(results).to.deep.equal([[], [1]]); subject.complete(); - expect(results).to.deep.equal([[1], [2], [], 'complete']); + expect(results).to.deep.equal([[], [1], [2], 'complete']); }); it('should buffer when Promise resolves', (done) => { diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index 8c082ce4e7..c7c3e7b7c7 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -47,19 +47,7 @@ export function buffer(closingNotifier: ObservableInput): OperatorFuncti // The current buffered values. let currentBuffer: T[] = []; - // Subscribe to our source. - source.subscribe( - createOperatorSubscriber( - subscriber, - (value) => currentBuffer.push(value), - () => { - subscriber.next(currentBuffer); - subscriber.complete(); - } - ) - ); - - // Subscribe to the closing notifier. + // Subscribe to the closing notifier first. from(closingNotifier).subscribe( createOperatorSubscriber( subscriber, @@ -73,6 +61,18 @@ export function buffer(closingNotifier: ObservableInput): OperatorFuncti ) ); + // Subscribe to our source. + source.subscribe( + createOperatorSubscriber( + subscriber, + (value) => currentBuffer.push(value), + () => { + subscriber.next(currentBuffer); + subscriber.complete(); + } + ) + ); + return () => { // Ensure buffered values are released on finalization. currentBuffer = null!;