diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index cad68b4d2ae..7dcb4f67f08 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -307,16 +307,11 @@ func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) } func (s *subscribeStream) onAck(ac *ackConsumer) { - // Don't block the user's goroutine with potentially expensive ack processing. - go s.onAckAsync(ac.MsgBytes) -} - -func (s *subscribeStream) onAckAsync(msgBytes int64) { s.mu.Lock() defer s.mu.Unlock() if s.status == serviceActive { - s.unsafeAllowFlow(flowControlTokens{Bytes: msgBytes, Messages: 1}) + s.unsafeAllowFlow(flowControlTokens{Bytes: ac.MsgBytes, Messages: 1}) } } diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go index a5b417c8b1d..495e7edcc2a 100644 --- a/pubsublite/internal/wire/subscriber_test.go +++ b/pubsublite/internal/wire/subscriber_test.go @@ -339,8 +339,8 @@ func TestSubscribeStreamFlowControlBatching(t *testing.T) { } sub.Receiver.ValidateMsg(msg1) sub.Receiver.ValidateMsg(msg2) - sub.sub.onAckAsync(msg1.SizeBytes) - sub.sub.onAckAsync(msg2.SizeBytes) + sub.sub.onAck(&ackConsumer{MsgBytes: msg1.SizeBytes}) + sub.sub.onAck(&ackConsumer{MsgBytes: msg2.SizeBytes}) sub.sub.sendBatchFlowControl() if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) { t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr) @@ -373,8 +373,8 @@ func TestSubscribeStreamExpediteFlowControl(t *testing.T) { } sub.Receiver.ValidateMsg(msg1) sub.Receiver.ValidateMsg(msg2) - sub.sub.onAckAsync(msg1.SizeBytes) - sub.sub.onAckAsync(msg2.SizeBytes) + sub.sub.onAck(&ackConsumer{MsgBytes: msg1.SizeBytes}) + sub.sub.onAck(&ackConsumer{MsgBytes: msg2.SizeBytes}) // Note: the ack for msg2 automatically triggers sending the flow control. if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) { t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr) @@ -419,7 +419,7 @@ func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) { barrier.ReleaseAfter(func() { // While the stream is not connected, the pending flow control request // should not be released and sent to the stream. - sub.sub.onAckAsync(msg.SizeBytes) + sub.sub.onAck(&ackConsumer{MsgBytes: msg.SizeBytes}) if sub.PendingFlowControlRequest() == nil { t.Errorf("Pending flow control request should not be cleared") }