Skip to content

Commit

Permalink
Allow for nil min time, reset batch timeout atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyhb committed Feb 23, 2024
1 parent 7d91fcc commit bd8cc4e
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions pubsub/batcher/batcher.go
Expand Up @@ -212,7 +212,7 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error {
b.handleBatch(batch)
}

if batch == nil && len(b.pending) > 0 {
if batch == nil && len(b.pending) > 0 && b.opts.BatchTimeout > 0 {
// If the batch size timeout is zero, this is one of the first items to
// be added to the batch under the minimum batch size. Record when this
// happens so that .nextBatch() can grab the batch on timeout.
Expand All @@ -225,6 +225,7 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error {
if atomic.CompareAndSwapInt32(&b.batchTimeoutRunning, 0, 1) {
go func() {
<-time.After(b.opts.BatchTimeout)
b.batchTimeoutRunning = 0
batch = b.nextBatch()
if batch != nil {
b.handleBatch(batch)
Expand Down Expand Up @@ -256,7 +257,7 @@ func (b *Batcher) handleBatch(batch []waiter) {
// b.mu must be held.
func (b *Batcher) nextBatch() []waiter {
// if there's a min batch size, only skip if we haven't yet waited for the batch timeout
if len(b.pending) < b.opts.MinBatchSize && time.Since(b.batchSizeTimeout) < b.opts.BatchTimeout {
if len(b.pending) < b.opts.MinBatchSize && (b.opts.BatchTimeout == 0 || time.Since(b.batchSizeTimeout) < b.opts.BatchTimeout) {
return nil
}

Expand Down

0 comments on commit bd8cc4e

Please sign in to comment.