Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: Ensure batcher flushes on shutdown, even if min batch size isn't met #3386

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

tonyhb
Copy link

@tonyhb tonyhb commented Feb 27, 2024

This PR ensures that the batcher flushes on shutdown, even if the pending length is less than the min batch size specified. Sending events is preferred to dropping, even if limits are not obeyed.

Related to #3383, but doesn't necessarily replace.

…n't met

This PR ensures that the batcher flushes on shutdown, even if the
pending length is less than the min batch size specified.  Sending
events is preferred to dropping, even if limits are not obeyed.
@tonyhb tonyhb mentioned this pull request Feb 27, 2024
4 tasks
@vangent
Copy link
Contributor

vangent commented Mar 1, 2024

The tests are failing with a data race:

WARNING: DATA RACE
Write at 0x00c00044e3f0 by goroutine 978:
gocloud.dev/pubsub/batcher.(*Batcher).callHandler()
/home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.go:287 +0x1ca
gocloud.dev/pubsub/batcher.(*Batcher).handleBatch.func1()
/home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.go:217 +0x54

Previous read at 0x00c00044e3f0 by goroutine 979:
gocloud.dev/pubsub/batcher.(*Batcher).Shutdown()
/home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.go:303 +0x75
gocloud.dev/pubsub.(*Subscription).Shutdown.func2()
/home/runner/work/go-cloud/go-cloud/pubsub/pubsub.go:697 +0xb2

Goroutine 978 (running) created at:
gocloud.dev/pubsub/batcher.(*Batcher).handleBatch()
/home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.go:216 +0x11d
gocloud.dev/pubsub/batcher.(*Batcher).AddNoWait()
/home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.go:203 +0x3b1
gocloud.dev/pubsub.(*Subscription).Receive.func3()
/home/runner/work/go-cloud/go-cloud/pubsub/pubsub.go:608 +0xcc
gocloud.dev/pubsub.(*Message).Ack()
/home/runner/work/go-cloud/go-cloud/pubsub/pubsub.go:161 +0xe1
gocloud.dev/pubsub/natspubsub.TestInteropWithDirectNATS.deferwrap4()
/home/runner/work/go-cloud/go-cloud/pubsub/natspubsub/nats_test.go:259 +0x33
runtime.deferreturn()
/opt/hostedtoolcache/go/1.22.0/x64/src/runtime/panic.go:602 +0x5d
testing.tRunner()
/opt/hostedtoolcache/go/1.22.0/x64/src/testing/testing.go:1689 +0x21e
testing.(*T).Run.gowrap1()
/opt/hostedtoolcache/go/1.22.0/x64/src/testing/testing.go:1742 +0x44

@tonyhb
Copy link
Author

tonyhb commented Mar 1, 2024

Ah, thanks. I ran without -race 🤦. Fixing!

@tonyhb
Copy link
Author

tonyhb commented Mar 1, 2024

@vangent fixed, thanks for the notification.


// On shutdown, ensure that we attempt to flush any pending items
// if there's a minimum batch size.
if atomic.LoadInt32(&b.nHandlers) < int32(b.opts.MaxHandlers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be necessary to use atomic to check/modify nHandlers. There's a mutex on the struct to protect it. IIUC, the problem is that you added a read on this line outside of the lock. I think if you move the b.mu.Unlock a few lines up to below this new stanza, it will be fine.


// On shutdown, ensure that we attempt to flush any pending items
// if there's a minimum batch size.
if atomic.LoadInt32(&b.nHandlers) < int32(b.opts.MaxHandlers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if nHandlers == MaxHandlers? Won't we drop some messages then?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, nextBatch in the handlers call will return the remaining items as shutdown is set to true, so everything will be processed as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If nHandlers == MaxHandlers, nextBatch won't even be called...?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants