Skip to content

Commit

Permalink
Merge pull request #2130 from Shopify/dnwe/rework-test
Browse files Browse the repository at this point in the history
fix: rework RebalancingMultiplePartitions test
  • Loading branch information
dnwe committed Feb 7, 2022
2 parents e35ee3f + b8a0c80 commit a5fdd87
Showing 1 changed file with 65 additions and 37 deletions.
102 changes: 65 additions & 37 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os/signal"
"reflect"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1251,18 +1250,30 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {

// launch test goroutines
config := NewTestConfig()
config.ClientID = t.Name()
config.Consumer.Retry.Backoff = 50
master, err := NewConsumer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

// we expect to end up (eventually) consuming exactly ten messages on each partition
var wg sync.WaitGroup
consumers := map[int32]PartitionConsumer{}
checkMessage := func(partition int32, offset int) {
c := consumers[partition]
message := <-c.Messages()
t.Logf("Received message my_topic-%d offset=%d", partition, message.Offset)
if message.Offset != int64(offset) {
t.Error("Incorrect message offset!", offset, partition, message.Offset)
}
if message.Partition != partition {
t.Error("Incorrect message partition!")
}
}

for i := int32(0); i < 2; i++ {
consumer, err := master.ConsumePartition("my_topic", i, 0)
if err != nil {
t.Error(err)
t.Fatal(err)
}

go func(c PartitionConsumer) {
Expand All @@ -1271,27 +1282,13 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
}
}(consumer)

wg.Add(1)
go func(partition int32, c PartitionConsumer) {
for i := 0; i < 10; i++ {
message := <-consumer.Messages()
if message.Offset != int64(i) {
t.Error("Incorrect message offset!", i, partition, message.Offset)
}
if message.Partition != partition {
t.Error("Incorrect message partition!")
}
}
safeClose(t, consumer)
wg.Done()
}(i, consumer)
consumers[i] = consumer
}

time.Sleep(50 * time.Millisecond)
Logger.Printf(" STAGE 1")
// Stage 1:
// * my_topic/0 -> leader0 serves 4 messages
// * my_topic/1 -> leader1 serves 0 messages
t.Log(` STAGE 1:
* my_topic/0 -> leader0 will serve 4 messages
* my_topic/1 -> leader1 will serve 0 messages`)

mockFetchResponse := NewMockFetchResponse(t, 1)
for i := 0; i < 4; i++ {
Expand All @@ -1301,11 +1298,15 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
"FetchRequest": mockFetchResponse,
})

for i := 0; i < 4; i++ {
checkMessage(0, i)
}

time.Sleep(50 * time.Millisecond)
Logger.Printf(" STAGE 2")
// Stage 2:
// * leader0 says that it is no longer serving my_topic/0
// * seedBroker tells that leader1 is serving my_topic/0 now
t.Log(` STAGE 2:
* my_topic/0 -> leader0 will return NotLeaderForPartition
seedBroker will give leader1 as serving my_topic/0 now
* my_topic/1 -> leader1 will serve 0 messages`)

// seed broker tells that the new partition 0 leader is leader1
seedBroker.SetHandlerByMap(map[string]MockResponse{
Expand All @@ -1325,13 +1326,12 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
})

time.Sleep(50 * time.Millisecond)
Logger.Printf(" STAGE 3")
// Stage 3:
// * my_topic/0 -> leader1 serves 3 messages
// * my_topic/1 -> leader1 server 8 messages
t.Log(` STAGE 3:
* my_topic/0 -> leader1 will serve 3 messages
* my_topic/1 -> leader1 will serve 8 messages`)

// leader1 provides 3 message on partition 0, and 8 messages on partition 1
mockFetchResponse2 := NewMockFetchResponse(t, 2)
mockFetchResponse2 := NewMockFetchResponse(t, 11)
for i := 4; i < 7; i++ {
mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
}
Expand All @@ -1342,12 +1342,25 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
"FetchRequest": mockFetchResponse2,
})

for i := 0; i < 8; i++ {
checkMessage(1, i)
}
for i := 4; i < 7; i++ {
checkMessage(0, i)
}

time.Sleep(50 * time.Millisecond)
Logger.Printf(" STAGE 4")
// Stage 4:
// * my_topic/0 -> leader1 serves 3 messages
// * my_topic/1 -> leader1 tells that it is no longer the leader
// * seedBroker tells that leader0 is a new leader for my_topic/1
t.Log(` STAGE 4:
* my_topic/0 -> leader1 will serve 3 messages
* my_topic/1 -> leader1 will return NotLeaderForPartition
seedBroker will give leader0 as serving my_topic/1 now`)

leader0.SetHandlerByMap(map[string]MockResponse{
"FetchRequest": NewMockFetchResponse(t, 1),
})
leader1.SetHandlerByMap(map[string]MockResponse{
"FetchRequest": NewMockFetchResponse(t, 1),
})

// metadata assigns 0 to leader1 and 1 to leader0
seedBroker.SetHandlerByMap(map[string]MockResponse{
Expand All @@ -1365,11 +1378,16 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
SetMessage("my_topic", 0, int64(8), testMsg).
SetMessage("my_topic", 0, int64(9), testMsg)
fetchResponse4 := new(FetchResponse)
fetchResponse4.AddError("my_topic", 0, ErrNoError)
fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
leader1.SetHandlerByMap(map[string]MockResponse{
"FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4),
})

t.Log(` STAGE 5:
* my_topic/0 -> leader1 will serve 0 messages
* my_topic/1 -> leader0 will serve 2 messages`)

// leader0 provides two messages on partition 1
mockFetchResponse4 := NewMockFetchResponse(t, 2)
for i := 8; i < 10; i++ {
Expand All @@ -1379,7 +1397,17 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
"FetchRequest": mockFetchResponse4,
})

wg.Wait()
for i := 7; i < 10; i++ {
checkMessage(0, i)
}

for i := 8; i < 10; i++ {
checkMessage(1, i)
}

for _, pc := range consumers {
safeClose(t, pc)
}
safeClose(t, master)
leader1.Close()
leader0.Close()
Expand Down

0 comments on commit a5fdd87

Please sign in to comment.