Skip to content

Commit

Permalink
Handle review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jliunyu committed Apr 29, 2022
1 parent f9ff23f commit 082c093
Showing 1 changed file with 11 additions and 14 deletions.
25 changes: 11 additions & 14 deletions kafka/consumer_test.go
Expand Up @@ -484,7 +484,7 @@ func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup)
t.Logf("Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("Headers: %v\n", e.Headers)
t.Logf("Headers: %v\n", e.Headers)
}

case Error:
Expand Down Expand Up @@ -591,35 +591,32 @@ func TestConsumerCloseForStaticMember(t *testing.T) {
err = c1.Subscribe(topic, wrapRebalancecb1)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}

wg := &sync.WaitGroup{}
wg := sync.WaitGroup{}
doneChan := make(chan bool, 3)

wg.Add(1)
go testPoll(c1, doneChan, t, wg)
go testPoll(c1, doneChan, t, &wg)
testConsumerWaitAssignment(c1, t)

closeChan := make(chan bool)
wrapRebalancecb2 := wrapRebalanceCb(&assignedEvents2, &revokedEvents2, t)
err = c2.Subscribe(topic, wrapRebalancecb2)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}
wg.Add(1)
go testPoll(c2, closeChan, t, wg)
go testPoll(c2, closeChan, t, &wg)
testConsumerWaitAssignment(c2, t)

wrapRebalancecb3 := wrapRebalanceCb(&assignedEvents3, &revokedEvents3, t)
err = c3.Subscribe(topic, wrapRebalancecb3)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}
wg.Add(1)
go testPoll(c3, doneChan, t, wg)
go testPoll(c3, doneChan, t, &wg)
testConsumerWaitAssignment(c3, t)

closeChan <- true
Expand All @@ -635,10 +632,10 @@ func TestConsumerCloseForStaticMember(t *testing.T) {
err = c2.Subscribe(topic, wrapRebalancecb4)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}

wg.Add(1)
go testPoll(c2, doneChan, t, wg)
go testPoll(c2, doneChan, t, &wg)
testConsumerWaitAssignment(c2, t)

doneChan <- true
Expand All @@ -650,8 +647,8 @@ func TestConsumerCloseForStaticMember(t *testing.T) {

wg.Wait()

// Wait 30s to make sure no revokedEvents happens
time.Sleep(30 * time.Second)
// Wait 2 * session.timeout.ms to make sure no revokedEvents happens
time.Sleep(2 * 6000 * time.Millisecond)

if atomic.LoadInt32(&assignedEvents1) != 3 {
t.Fatalf("3 assignedEvents are Expected to happen for the first consumer, but %d happened\n",
Expand Down Expand Up @@ -696,11 +693,11 @@ func testConsumerWaitAssignment(c *Consumer, t *testing.T) {
for run {
assignment, err := c.Assignment()
if err != nil {
t.Fatalf("Assignment failed: %s", err)
t.Fatalf("Assignment failed: %s\n", err)
}

if len(assignment) != 0 {
fmt.Printf("%v Assigned partitions are: %v\n", c, assignment)
t.Logf("%v Assigned partitions are: %v\n", c, assignment)
run = false
}
}
Expand Down

0 comments on commit 082c093

Please sign in to comment.