Skip to content

Commit

Permalink
fix(buffer): subscribe to the closing notifier before the source
Browse files Browse the repository at this point in the history
  • Loading branch information
jakovljevic-mladen committed Feb 13, 2023
1 parent 7d3c4ec commit 1953d9a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
6 changes: 3 additions & 3 deletions spec/operators/buffer-spec.ts
Expand Up @@ -317,11 +317,11 @@ describe('Observable.prototype.buffer', () => {
});

subject.next(1);
expect(results).to.deep.equal([[1]]);
expect(results).to.deep.equal([[]]);
subject.next(2);
expect(results).to.deep.equal([[1], [2]]);
expect(results).to.deep.equal([[], [1]]);
subject.complete();
expect(results).to.deep.equal([[1], [2], [], 'complete']);
expect(results).to.deep.equal([[], [1], [2], 'complete']);
});

it('should buffer when Promise resolves', (done) => {
Expand Down
26 changes: 13 additions & 13 deletions src/internal/operators/buffer.ts
Expand Up @@ -47,19 +47,7 @@ export function buffer<T>(closingNotifier: ObservableInput<any>): OperatorFuncti
// The current buffered values.
let currentBuffer: T[] = [];

// Subscribe to our source.
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => currentBuffer.push(value),
() => {
subscriber.next(currentBuffer);
subscriber.complete();
}
)
);

// Subscribe to the closing notifier.
// Subscribe to the closing notifier first.
innerFrom(closingNotifier).subscribe(
createOperatorSubscriber(
subscriber,
Expand All @@ -73,6 +61,18 @@ export function buffer<T>(closingNotifier: ObservableInput<any>): OperatorFuncti
)
);

// Subscribe to our source.
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => currentBuffer.push(value),
() => {
subscriber.next(currentBuffer);
subscriber.complete();
}
)
);

return () => {
// Ensure buffered values are released on finalization.
currentBuffer = null!;
Expand Down

0 comments on commit 1953d9a

Please sign in to comment.