Skip to content

Commit

Permalink
refactor(pubsublite): remove async ack processing (#4189)
Browse files Browse the repository at this point in the history
Process acks synchronously.
  • Loading branch information
tmdiep committed Jun 8, 2021
1 parent 9e80ea0 commit fb2cd04
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 11 deletions.
7 changes: 1 addition & 6 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -307,16 +307,11 @@ func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse)
}

func (s *subscribeStream) onAck(ac *ackConsumer) {
// Don't block the user's goroutine with potentially expensive ack processing.
go s.onAckAsync(ac.MsgBytes)
}

func (s *subscribeStream) onAckAsync(msgBytes int64) {
s.mu.Lock()
defer s.mu.Unlock()

if s.status == serviceActive {
s.unsafeAllowFlow(flowControlTokens{Bytes: msgBytes, Messages: 1})
s.unsafeAllowFlow(flowControlTokens{Bytes: ac.MsgBytes, Messages: 1})
}
}

Expand Down
10 changes: 5 additions & 5 deletions pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -339,8 +339,8 @@ func TestSubscribeStreamFlowControlBatching(t *testing.T) {
}
sub.Receiver.ValidateMsg(msg1)
sub.Receiver.ValidateMsg(msg2)
sub.sub.onAckAsync(msg1.SizeBytes)
sub.sub.onAckAsync(msg2.SizeBytes)
sub.sub.onAck(&ackConsumer{MsgBytes: msg1.SizeBytes})
sub.sub.onAck(&ackConsumer{MsgBytes: msg2.SizeBytes})
sub.sub.sendBatchFlowControl()
if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr)
Expand Down Expand Up @@ -373,8 +373,8 @@ func TestSubscribeStreamExpediteFlowControl(t *testing.T) {
}
sub.Receiver.ValidateMsg(msg1)
sub.Receiver.ValidateMsg(msg2)
sub.sub.onAckAsync(msg1.SizeBytes)
sub.sub.onAckAsync(msg2.SizeBytes)
sub.sub.onAck(&ackConsumer{MsgBytes: msg1.SizeBytes})
sub.sub.onAck(&ackConsumer{MsgBytes: msg2.SizeBytes})
// Note: the ack for msg2 automatically triggers sending the flow control.
if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr)
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) {
barrier.ReleaseAfter(func() {
// While the stream is not connected, the pending flow control request
// should not be released and sent to the stream.
sub.sub.onAckAsync(msg.SizeBytes)
sub.sub.onAck(&ackConsumer{MsgBytes: msg.SizeBytes})
if sub.PendingFlowControlRequest() == nil {
t.Errorf("Pending flow control request should not be cleared")
}
Expand Down

0 comments on commit fb2cd04

Please sign in to comment.