Skip to content

Commit

Permalink
fix(buffer): subscribe to the closing notifier before the source (#7185)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Weird patterns where an observable is used to notify buffers of itself will result in a first emission of `[]`. If you need to buffer an array with itself, just use `map(x => [x])`. If that doesn't work, feel free to file a use case as an issue. `skip(1)` would be the other workaround.
  • Loading branch information
jakovljevic-mladen committed Mar 6, 2023
1 parent 79314f1 commit f37be62
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
6 changes: 3 additions & 3 deletions spec/operators/buffer-spec.ts
Expand Up @@ -317,11 +317,11 @@ describe('Observable.prototype.buffer', () => {
});

subject.next(1);
expect(results).to.deep.equal([[1]]);
expect(results).to.deep.equal([[]]);
subject.next(2);
expect(results).to.deep.equal([[1], [2]]);
expect(results).to.deep.equal([[], [1]]);
subject.complete();
expect(results).to.deep.equal([[1], [2], [], 'complete']);
expect(results).to.deep.equal([[], [1], [2], 'complete']);
});

it('should buffer when Promise resolves', (done) => {
Expand Down
26 changes: 13 additions & 13 deletions src/internal/operators/buffer.ts
Expand Up @@ -47,19 +47,7 @@ export function buffer<T>(closingNotifier: ObservableInput<any>): OperatorFuncti
// The current buffered values.
let currentBuffer: T[] = [];

// Subscribe to our source.
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => currentBuffer.push(value),
() => {
subscriber.next(currentBuffer);
subscriber.complete();
}
)
);

// Subscribe to the closing notifier.
// Subscribe to the closing notifier first.
from(closingNotifier).subscribe(
createOperatorSubscriber(
subscriber,
Expand All @@ -73,6 +61,18 @@ export function buffer<T>(closingNotifier: ObservableInput<any>): OperatorFuncti
)
);

// Subscribe to our source.
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => currentBuffer.push(value),
() => {
subscriber.next(currentBuffer);
subscriber.complete();
}
)
);

return () => {
// Ensure buffered values are released on finalization.
currentBuffer = null!;
Expand Down

0 comments on commit f37be62

Please sign in to comment.