Skip to content

Commit

Permalink
test(pubsublite): fix flaky TestAssigningSubscriberAddRemovePartitions (
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Sep 1, 2021
1 parent 27c3ed0 commit d6d4a3c
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -1135,8 +1135,8 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
// Assignment stream
asnStream := test.NewRPCVerifier(t)
asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{3, 6}), nil)
assignmentBarrier := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{3, 8}), nil)
asnStream.Push(assignmentAckReq(), nil, nil)
assignmentBarrier1 := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{3, 8}), nil)
assignmentBarrier2 := asnStream.PushWithBarrier(assignmentAckReq(), nil, nil)
verifiers.AddAssignmentStream(subscription, asnStream)

// Partition 3
Expand Down Expand Up @@ -1191,7 +1191,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
}

// Partition assignments will now be {3, 8}.
assignmentBarrier.Release()
assignmentBarrier1.Release()
receiver.ValidateMsgs(partitionMsgs(8, msg5))
if got, want := sub.Partitions(), []int{3, 8}; !testutil.Equal(got, want) {
t.Errorf("subscriber partitions: got %d, want %d", got, want)
Expand All @@ -1204,6 +1204,10 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
msg4Barrier.Release()
receiver.ValidateMsgs(partitionMsgs(3, msg2))

// Ensure the second assignment ack is received by the server to avoid test
// flakiness.
assignmentBarrier2.Release()

// Stop should flush all commit cursors.
sub.Stop()
if gotErr := sub.WaitStopped(); gotErr != nil {
Expand Down

0 comments on commit d6d4a3c

Please sign in to comment.