From dbfdf762c77f9cfad637c573b06f0a49e01316f3 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Fri, 6 May 2022 19:19:16 +0000 Subject: [PATCH] fix(pubsub): disable deprecated BufferedByteLimit when using MaxOutstandingBytes (#6009) * fix(pubsub): set buffered byte limit to maxint when using maxoutstandingmessages * fix(pubsub): send final response for flow controller test --- pubsub/topic.go | 10 ++++++++-- pubsub/topic_test.go | 9 +++++---- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/pubsub/topic.go b/pubsub/topic.go index cc08e38a3a0..3c0f903473c 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "log" + "math" "runtime" "strings" "sync" @@ -552,6 +553,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { } err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize) if err != nil { + fmt.Printf("got err: %v\n", err) t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) } @@ -632,8 +634,12 @@ func (t *Topic) initBundler() { if t.PublishSettings.FlowControlSettings.MaxOutstandingBytes > 0 { b := t.PublishSettings.FlowControlSettings.MaxOutstandingBytes fcs.MaxOutstandingBytes = b - // If MaxOutstandingBytes is set, override BufferedByteLimit. - t.PublishSettings.BufferedByteLimit = b + + // If MaxOutstandingBytes is set, disable BufferedByteLimit by setting it to maxint. + // This is because there's no way to set "unlimited" for BufferedByteLimit, + // and simply setting it to MaxOutstandingBytes occasionally leads to issues where + // BufferedByteLimit is reached even though there are resources available. + t.PublishSettings.BufferedByteLimit = math.MaxInt64 } if t.PublishSettings.FlowControlSettings.MaxOutstandingMessages > 0 { fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index 9115b43ab8a..2c0c4553ac6 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -554,10 +554,9 @@ func TestPublishFlowControl_Block(t *testing.T) { publishSingleMessage(ctx, topic, "AA") publishSingleMessage(ctx, topic, "AA") - // Sendinga third message blocks because the messages are outstanding - var publish3Completed, response3Sent sync.WaitGroup + // Sending a third message blocks because the messages are outstanding. + var publish3Completed sync.WaitGroup publish3Completed.Add(1) - response3Sent.Add(1) go func() { publishSingleMessage(ctx, topic, "AAAAAA") publish3Completed.Done() @@ -569,6 +568,8 @@ func TestPublishFlowControl_Block(t *testing.T) { sendResponse2.Done() }() + // Sending a fourth message blocks because although only one message has been sent, + // the third message claimed the tokens for outstanding bytes. var publish4Completed sync.WaitGroup publish4Completed.Add(1) @@ -580,7 +581,7 @@ func TestPublishFlowControl_Block(t *testing.T) { publish3Completed.Wait() addSingleResponse(srv, "3") - response3Sent.Done() + addSingleResponse(srv, "4") publish4Completed.Wait() }