Skip to content

Commit

Permalink
Handle review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jliunyu committed Apr 29, 2022
1 parent f9ff23f commit 5efeabd
Showing 1 changed file with 17 additions and 19 deletions.
36 changes: 17 additions & 19 deletions kafka/consumer_test.go
Expand Up @@ -465,7 +465,7 @@ func wrapRebalanceCb(assignedEvents *int32, revokedEvents *int32, t *testing.T)
}
}

func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup) {
func testpoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup) {
defer wg.Done()

run := true
Expand All @@ -484,7 +484,7 @@ func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup)
t.Logf("Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("Headers: %v\n", e.Headers)
t.Logf("Headers: %v\n", e.Headers)
}

case Error:
Expand Down Expand Up @@ -591,26 +591,24 @@ func TestConsumerCloseForStaticMember(t *testing.T) {
err = c1.Subscribe(topic, wrapRebalancecb1)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}

wg := &sync.WaitGroup{}
wg := sync.WaitGroup{}
doneChan := make(chan bool, 3)

wg.Add(1)
go testPoll(c1, doneChan, t, wg)
testConsumerWaitAssignment(c1, t)
go testpoll(c1, doneChan, t, &wg)
consumerWaitAssignment(c1, t)

closeChan := make(chan bool)
wrapRebalancecb2 := wrapRebalanceCb(&assignedEvents2, &revokedEvents2, t)
err = c2.Subscribe(topic, wrapRebalancecb2)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}
wg.Add(1)
go testPoll(c2, closeChan, t, wg)
testConsumerWaitAssignment(c2, t)
go testpoll(c2, closeChan, t, &wg)
consumerWaitAssignment(c2, t)

wrapRebalancecb3 := wrapRebalanceCb(&assignedEvents3, &revokedEvents3, t)
err = c3.Subscribe(topic, wrapRebalancecb3)
Expand All @@ -619,8 +617,8 @@ func TestConsumerCloseForStaticMember(t *testing.T) {
return
}
wg.Add(1)
go testPoll(c3, doneChan, t, wg)
testConsumerWaitAssignment(c3, t)
go testpoll(c3, doneChan, t, &wg)
consumerWaitAssignment(c3, t)

closeChan <- true
close(closeChan)
Expand All @@ -635,11 +633,11 @@ func TestConsumerCloseForStaticMember(t *testing.T) {
err = c2.Subscribe(topic, wrapRebalancecb4)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}

wg.Add(1)
go testPoll(c2, doneChan, t, wg)
testConsumerWaitAssignment(c2, t)
go testpoll(c2, doneChan, t, &wg)
consumerWaitAssignment(c2, t)

doneChan <- true
close(doneChan)
Expand All @@ -650,8 +648,8 @@ func TestConsumerCloseForStaticMember(t *testing.T) {

wg.Wait()

// Wait 30s to make sure no revokedEvents happens
time.Sleep(30 * time.Second)
// Wait 2 * session.timeout.ms to make sure no revokedEvents happens
time.Sleep(2 * 6000 * time.Millisecond)

if atomic.LoadInt32(&assignedEvents1) != 3 {
t.Fatalf("3 assignedEvents are Expected to happen for the first consumer, but %d happened\n",
Expand Down Expand Up @@ -691,16 +689,16 @@ func TestConsumerCloseForStaticMember(t *testing.T) {
}
}

func testConsumerWaitAssignment(c *Consumer, t *testing.T) {
func consumerWaitAssignment(c *Consumer, t *testing.T) {
run := true
for run {
assignment, err := c.Assignment()
if err != nil {
t.Fatalf("Assignment failed: %s", err)
t.Fatalf("Assignment failed: %s\n", err)
}

if len(assignment) != 0 {
fmt.Printf("%v Assigned partitions are: %v\n", c, assignment)
t.Logf("%v Assigned partitions are: %v\n", c, assignment)
run = false
}
}
Expand Down

0 comments on commit 5efeabd

Please sign in to comment.