From f37be627b6742c9ae666e40f9035b51596971fc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mladen=20Jakovljevi=C4=87?= Date: Mon, 6 Mar 2023 23:05:05 +0100 Subject: [PATCH] fix(buffer): subscribe to the closing notifier before the source (#7185) BREAKING CHANGE: Weird patterns where an observable is used to notify buffers of itself will result in a first emission of `[]`. If you need to buffer an array with itself, just use `map(x => [x])`. If that doesn't work, feel free to file a use case as an issue. `skip(1)` would be the other workaround. --- spec/operators/buffer-spec.ts | 6 +++--- src/internal/operators/buffer.ts | 26 +++++++++++++------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 44a66bf006..debb7fa9d5 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -317,11 +317,11 @@ describe('Observable.prototype.buffer', () => { }); subject.next(1); - expect(results).to.deep.equal([[1]]); + expect(results).to.deep.equal([[]]); subject.next(2); - expect(results).to.deep.equal([[1], [2]]); + expect(results).to.deep.equal([[], [1]]); subject.complete(); - expect(results).to.deep.equal([[1], [2], [], 'complete']); + expect(results).to.deep.equal([[], [1], [2], 'complete']); }); it('should buffer when Promise resolves', (done) => { diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index 8c082ce4e7..c7c3e7b7c7 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -47,19 +47,7 @@ export function buffer(closingNotifier: ObservableInput): OperatorFuncti // The current buffered values. let currentBuffer: T[] = []; - // Subscribe to our source. - source.subscribe( - createOperatorSubscriber( - subscriber, - (value) => currentBuffer.push(value), - () => { - subscriber.next(currentBuffer); - subscriber.complete(); - } - ) - ); - - // Subscribe to the closing notifier. + // Subscribe to the closing notifier first. from(closingNotifier).subscribe( createOperatorSubscriber( subscriber, @@ -73,6 +61,18 @@ export function buffer(closingNotifier: ObservableInput): OperatorFuncti ) ); + // Subscribe to our source. + source.subscribe( + createOperatorSubscriber( + subscriber, + (value) => currentBuffer.push(value), + () => { + subscriber.next(currentBuffer); + subscriber.complete(); + } + ) + ); + return () => { // Ensure buffered values are released on finalization. currentBuffer = null!;