diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go index 0c30fdbee2c..6aa5fe77933 100644 --- a/pubsublite/internal/wire/subscriber_test.go +++ b/pubsublite/internal/wire/subscriber_test.go @@ -1135,8 +1135,8 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) { // Assignment stream asnStream := test.NewRPCVerifier(t) asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{3, 6}), nil) - assignmentBarrier := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{3, 8}), nil) - asnStream.Push(assignmentAckReq(), nil, nil) + assignmentBarrier1 := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{3, 8}), nil) + assignmentBarrier2 := asnStream.PushWithBarrier(assignmentAckReq(), nil, nil) verifiers.AddAssignmentStream(subscription, asnStream) // Partition 3 @@ -1191,7 +1191,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) { } // Partition assignments will now be {3, 8}. - assignmentBarrier.Release() + assignmentBarrier1.Release() receiver.ValidateMsgs(partitionMsgs(8, msg5)) if got, want := sub.Partitions(), []int{3, 8}; !testutil.Equal(got, want) { t.Errorf("subscriber partitions: got %d, want %d", got, want) @@ -1204,6 +1204,10 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) { msg4Barrier.Release() receiver.ValidateMsgs(partitionMsgs(3, msg2)) + // Ensure the second assignment ack is received by the server to avoid test + // flakiness. + assignmentBarrier2.Release() + // Stop should flush all commit cursors. sub.Stop() if gotErr := sub.WaitStopped(); gotErr != nil {