Skip to content

Commit

Permalink
fix(pubsub): disable deprecated BufferedByteLimit when using MaxOutst…
Browse files Browse the repository at this point in the history
…andingBytes (#6009)

* fix(pubsub): set buffered byte limit to maxint when using maxoutstandingmessages

* fix(pubsub): send final response for flow controller test
  • Loading branch information
hongalex committed May 6, 2022
1 parent 54bcb16 commit dbfdf76
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
10 changes: 8 additions & 2 deletions pubsub/topic.go
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"log"
"math"
"runtime"
"strings"
"sync"
Expand Down Expand Up @@ -552,6 +553,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
}
err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize)
if err != nil {
fmt.Printf("got err: %v\n", err)
t.scheduler.Pause(msg.OrderingKey)
ipubsub.SetPublishResult(r, "", err)
}
Expand Down Expand Up @@ -632,8 +634,12 @@ func (t *Topic) initBundler() {
if t.PublishSettings.FlowControlSettings.MaxOutstandingBytes > 0 {
b := t.PublishSettings.FlowControlSettings.MaxOutstandingBytes
fcs.MaxOutstandingBytes = b
// If MaxOutstandingBytes is set, override BufferedByteLimit.
t.PublishSettings.BufferedByteLimit = b

// If MaxOutstandingBytes is set, disable BufferedByteLimit by setting it to maxint.
// This is because there's no way to set "unlimited" for BufferedByteLimit,
// and simply setting it to MaxOutstandingBytes occasionally leads to issues where
// BufferedByteLimit is reached even though there are resources available.
t.PublishSettings.BufferedByteLimit = math.MaxInt64
}
if t.PublishSettings.FlowControlSettings.MaxOutstandingMessages > 0 {
fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages
Expand Down
9 changes: 5 additions & 4 deletions pubsub/topic_test.go
Expand Up @@ -554,10 +554,9 @@ func TestPublishFlowControl_Block(t *testing.T) {
publishSingleMessage(ctx, topic, "AA")
publishSingleMessage(ctx, topic, "AA")

// Sendinga third message blocks because the messages are outstanding
var publish3Completed, response3Sent sync.WaitGroup
// Sending a third message blocks because the messages are outstanding.
var publish3Completed sync.WaitGroup
publish3Completed.Add(1)
response3Sent.Add(1)
go func() {
publishSingleMessage(ctx, topic, "AAAAAA")
publish3Completed.Done()
Expand All @@ -569,6 +568,8 @@ func TestPublishFlowControl_Block(t *testing.T) {
sendResponse2.Done()
}()

// Sending a fourth message blocks because although only one message has been sent,
// the third message claimed the tokens for outstanding bytes.
var publish4Completed sync.WaitGroup
publish4Completed.Add(1)

Expand All @@ -580,7 +581,7 @@ func TestPublishFlowControl_Block(t *testing.T) {

publish3Completed.Wait()
addSingleResponse(srv, "3")
response3Sent.Done()
addSingleResponse(srv, "4")

publish4Completed.Wait()
}
Expand Down

0 comments on commit dbfdf76

Please sign in to comment.