Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: Ensure batcher flushes on shutdown, even if min batch size isn't met #3386

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 35 additions & 12 deletions pubsub/batcher/batcher.go
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"reflect"
"sync"
"sync/atomic"
)

// Split determines how to split n (representing n items) into batches based on
Expand Down Expand Up @@ -70,7 +71,7 @@ type Batcher struct {

mu sync.Mutex
pending []waiter // items waiting to be handled
nHandlers int // number of currently running handler goroutines
nHandlers int32 // number of currently running handler goroutines
shutdown bool
}

Expand Down Expand Up @@ -197,29 +198,43 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error {

// Add the item to the pending list.
b.pending = append(b.pending, waiter{item, c})
if b.nHandlers < b.opts.MaxHandlers {
if atomic.LoadInt32(&b.nHandlers) < int32(b.opts.MaxHandlers) {
// If we can start a handler, do so with the item just added and any others that are pending.
batch := b.nextBatch()
if batch != nil {
b.wg.Add(1)
go func() {
b.callHandler(batch)
b.wg.Done()
}()
b.nHandlers++
}
b.handleBatch(batch)
}
// If we can't start a handler, then one of the currently running handlers will
// take our item.
return c
}

func (b *Batcher) handleBatch(batch []waiter) {
if batch == nil || len(batch) == 0 {
return
}

b.wg.Add(1)
go func() {
b.callHandler(batch)
b.wg.Done()
}()
atomic.AddInt32(&b.nHandlers, 1)
}

// nextBatch returns the batch to process, and updates b.pending.
// It returns nil if there's no batch ready for processing.
// b.mu must be held.
func (b *Batcher) nextBatch() []waiter {
if len(b.pending) < b.opts.MinBatchSize {
return nil
// We handle minimum batch sizes depending on specific
// situations.
// XXX: If we allow max batch lifetimes, handle that here.
if b.shutdown == false {
// If we're not shutting down, respect minimums. If we're
// shutting down, though, we ignore minimums to flush the
// entire batch.
return nil
}
}

if b.opts.MaxBatchByteSize == 0 && (b.opts.MaxBatchSize == 0 || len(b.pending) <= b.opts.MaxBatchSize) {
Expand Down Expand Up @@ -270,7 +285,7 @@ func (b *Batcher) callHandler(batch []waiter) {
// always get to run.
batch = b.nextBatch()
if batch == nil {
b.nHandlers--
atomic.AddInt32(&b.nHandlers, -1)
}
b.mu.Unlock()
}
Expand All @@ -283,5 +298,13 @@ func (b *Batcher) Shutdown() {
b.mu.Lock()
b.shutdown = true
b.mu.Unlock()

// On shutdown, ensure that we attempt to flush any pending items
// if there's a minimum batch size.
if atomic.LoadInt32(&b.nHandlers) < int32(b.opts.MaxHandlers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be necessary to use atomic to check/modify nHandlers. There's a mutex on the struct to protect it. IIUC, the problem is that you added a read on this line outside of the lock. I think if you move the b.mu.Unlock a few lines up to below this new stanza, it will be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if nHandlers == MaxHandlers? Won't we drop some messages then?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, nextBatch in the handlers call will return the remaining items as shutdown is set to true, so everything will be processed as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If nHandlers == MaxHandlers, nextBatch won't even be called...?

batch := b.nextBatch()
b.handleBatch(batch)
}

b.wg.Wait()
}
28 changes: 28 additions & 0 deletions pubsub/batcher/batcher_test.go
Expand Up @@ -171,6 +171,34 @@ func TestMinBatchSize(t *testing.T) {
}
}

// TestMinBatchSizeFlushesOnShutdown ensures that Shutdown() flushes batches, even if
// the pending count is less than the minimum batch size.
func TestMinBatchSizeFlushesOnShutdown(t *testing.T) {
var got [][]int

batchSize := 3

b := batcher.New(reflect.TypeOf(int(0)), &batcher.Options{MinBatchSize: batchSize}, func(items interface{}) error {
got = append(got, items.([]int))
return nil
})
for i := 0; i < (batchSize - 1); i++ {
b.AddNoWait(i)
}

// Ensure that we've received nothing
if len(got) > 0 {
t.Errorf("got batch unexpectedly: %+v", got)
}

b.Shutdown()

want := [][]int{{0, 1}}
if !cmp.Equal(got, want) {
t.Errorf("got %+v, want %+v on shutdown", got, want)
}
}

func TestSaturation(t *testing.T) {
// Verify that under high load the maximum number of handlers are running.
ctx := context.Background()
Expand Down