Skip to content

Commit

Permalink
test(pubsublite): fix flaky TestAssigningSubscriberIgnoreOutstandingA…
Browse files Browse the repository at this point in the history
…cks (#3664)

Adds a couple of barriers to ensure that the server has received the expected requests before proceeding with the rest of the test.

Fixes #3645.
  • Loading branch information
tmdiep committed Feb 3, 2021
1 parent c34dbd0 commit e3587f1
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -966,8 +966,8 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) {
// Assignment stream
asnStream := test.NewRPCVerifier(t)
asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1}), nil)
assignmentBarrier := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{2}), nil)
asnStream.Push(assignmentAckReq(), nil, nil)
assignmentBarrier1 := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{2}), nil)
assignmentBarrier2 := asnStream.PushWithBarrier(assignmentAckReq(), nil, nil)
verifiers.AddAssignmentStream(subscription, asnStream)

// Partition 1
Expand All @@ -977,7 +977,7 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) {
verifiers.AddSubscribeStream(subscription, 1, subStream1)

cmtStream1 := test.NewRPCVerifier(t)
cmtStream1.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil)
commitBarrier := cmtStream1.PushWithBarrier(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil)
// Note: no commit expected.
verifiers.AddCommitStream(subscription, 1, cmtStream1)

Expand All @@ -1004,9 +1004,13 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) {
ack1 := receiver.ValidateMsg(msg1)

// Partition assignments will now be {2}.
assignmentBarrier.Release()
assignmentBarrier1.Release()
receiver.ValidateMsg(msg2).Ack()

// These barriers ensure that this test is deterministic by ensuring that the
// server has received expected requests before proceeding.
commitBarrier.Release()
assignmentBarrier2.Release()
// Partition 1 has already been unassigned, so this ack is discarded.
ack1.Ack()

Expand Down

0 comments on commit e3587f1

Please sign in to comment.