diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index 5c20945a5..a97fa65e8 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -465,7 +465,7 @@ func wrapRebalanceCb(assignedEvents *int32, revokedEvents *int32, t *testing.T) } } -func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup) { +func testpoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup) { defer wg.Done() run := true @@ -484,7 +484,7 @@ func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup) t.Logf("Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) if e.Headers != nil { - fmt.Printf("Headers: %v\n", e.Headers) + t.Logf("Headers: %v\n", e.Headers) } case Error: @@ -591,26 +591,24 @@ func TestConsumerCloseForStaticMember(t *testing.T) { err = c1.Subscribe(topic, wrapRebalancecb1) if err != nil { t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) - return } - wg := &sync.WaitGroup{} + wg := sync.WaitGroup{} doneChan := make(chan bool, 3) wg.Add(1) - go testPoll(c1, doneChan, t, wg) - testConsumerWaitAssignment(c1, t) + go testpoll(c1, doneChan, t, &wg) + consumerWaitAssignment(c1, t) closeChan := make(chan bool) wrapRebalancecb2 := wrapRebalanceCb(&assignedEvents2, &revokedEvents2, t) err = c2.Subscribe(topic, wrapRebalancecb2) if err != nil { t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) - return } wg.Add(1) - go testPoll(c2, closeChan, t, wg) - testConsumerWaitAssignment(c2, t) + go testpoll(c2, closeChan, t, &wg) + consumerWaitAssignment(c2, t) wrapRebalancecb3 := wrapRebalanceCb(&assignedEvents3, &revokedEvents3, t) err = c3.Subscribe(topic, wrapRebalancecb3) @@ -619,8 +617,8 @@ func TestConsumerCloseForStaticMember(t *testing.T) { return } wg.Add(1) - go testPoll(c3, doneChan, t, wg) - testConsumerWaitAssignment(c3, t) + go testpoll(c3, doneChan, t, &wg) + consumerWaitAssignment(c3, t) closeChan <- true close(closeChan) @@ -635,11 +633,11 @@ func TestConsumerCloseForStaticMember(t *testing.T) { err = c2.Subscribe(topic, wrapRebalancecb4) if err != nil { t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) - return } + wg.Add(1) - go testPoll(c2, doneChan, t, wg) - testConsumerWaitAssignment(c2, t) + go testpoll(c2, doneChan, t, &wg) + consumerWaitAssignment(c2, t) doneChan <- true close(doneChan) @@ -650,8 +648,8 @@ func TestConsumerCloseForStaticMember(t *testing.T) { wg.Wait() - // Wait 30s to make sure no revokedEvents happens - time.Sleep(30 * time.Second) + // Wait 2 * session.timeout.ms to make sure no revokedEvents happens + time.Sleep(2 * 6000 * time.Millisecond) if atomic.LoadInt32(&assignedEvents1) != 3 { t.Fatalf("3 assignedEvents are Expected to happen for the first consumer, but %d happened\n", @@ -691,16 +689,16 @@ func TestConsumerCloseForStaticMember(t *testing.T) { } } -func testConsumerWaitAssignment(c *Consumer, t *testing.T) { +func consumerWaitAssignment(c *Consumer, t *testing.T) { run := true for run { assignment, err := c.Assignment() if err != nil { - t.Fatalf("Assignment failed: %s", err) + t.Fatalf("Assignment failed: %s\n", err) } if len(assignment) != 0 { - fmt.Printf("%v Assigned partitions are: %v\n", c, assignment) + t.Logf("%v Assigned partitions are: %v\n", c, assignment) run = false } }