From 91b751198a53f4d01808b5e6244d1248a339b9ab Mon Sep 17 00:00:00 2001 From: tmdiep Date: Mon, 31 May 2021 18:51:36 -0400 Subject: [PATCH] refactor(pubsublite): remove async ack processing --- pubsublite/internal/wire/subscriber.go | 7 +------ pubsublite/internal/wire/subscriber_test.go | 10 +++++----- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index 56344cae0d2..5dcf28ea8b3 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -289,16 +289,11 @@ func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) } func (s *subscribeStream) onAck(ac *ackConsumer) { - // Don't block the user's goroutine with potentially expensive ack processing. - go s.onAckAsync(ac.MsgBytes) -} - -func (s *subscribeStream) onAckAsync(msgBytes int64) { s.mu.Lock() defer s.mu.Unlock() if s.status == serviceActive { - s.unsafeAllowFlow(flowControlTokens{Bytes: msgBytes, Messages: 1}) + s.unsafeAllowFlow(flowControlTokens{Bytes: ac.MsgBytes, Messages: 1}) } } diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go index b29aa0c3233..34e780c904f 100644 --- a/pubsublite/internal/wire/subscriber_test.go +++ b/pubsublite/internal/wire/subscriber_test.go @@ -293,8 +293,8 @@ func TestSubscribeStreamFlowControlBatching(t *testing.T) { } sub.Receiver.ValidateMsg(msg1) sub.Receiver.ValidateMsg(msg2) - sub.sub.onAckAsync(msg1.SizeBytes) - sub.sub.onAckAsync(msg2.SizeBytes) + sub.sub.onAck(&ackConsumer{MsgBytes: msg1.SizeBytes}) + sub.sub.onAck(&ackConsumer{MsgBytes: msg2.SizeBytes}) sub.sub.sendBatchFlowControl() if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) { t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr) @@ -327,8 +327,8 @@ func TestSubscribeStreamExpediteFlowControl(t *testing.T) { } sub.Receiver.ValidateMsg(msg1) sub.Receiver.ValidateMsg(msg2) - sub.sub.onAckAsync(msg1.SizeBytes) - sub.sub.onAckAsync(msg2.SizeBytes) + sub.sub.onAck(&ackConsumer{MsgBytes: msg1.SizeBytes}) + sub.sub.onAck(&ackConsumer{MsgBytes: msg2.SizeBytes}) // Note: the ack for msg2 automatically triggers sending the flow control. if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) { t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr) @@ -373,7 +373,7 @@ func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) { barrier.ReleaseAfter(func() { // While the stream is not connected, the pending flow control request // should not be released and sent to the stream. - sub.sub.onAckAsync(msg.SizeBytes) + sub.sub.onAck(&ackConsumer{MsgBytes: msg.SizeBytes}) if sub.PendingFlowControlRequest() == nil { t.Errorf("Pending flow control request should not be cleared") }