diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index 5c20945a5..c1cdf70f7 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -484,7 +484,7 @@ func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup) t.Logf("Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) if e.Headers != nil { - fmt.Printf("Headers: %v\n", e.Headers) + t.Logf("Headers: %v\n", e.Headers) } case Error: @@ -591,14 +591,13 @@ func TestConsumerCloseForStaticMember(t *testing.T) { err = c1.Subscribe(topic, wrapRebalancecb1) if err != nil { t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) - return } - wg := &sync.WaitGroup{} + wg := sync.WaitGroup{} doneChan := make(chan bool, 3) wg.Add(1) - go testPoll(c1, doneChan, t, wg) + go testPoll(c1, doneChan, t, &wg) testConsumerWaitAssignment(c1, t) closeChan := make(chan bool) @@ -606,20 +605,18 @@ func TestConsumerCloseForStaticMember(t *testing.T) { err = c2.Subscribe(topic, wrapRebalancecb2) if err != nil { t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) - return } wg.Add(1) - go testPoll(c2, closeChan, t, wg) + go testPoll(c2, closeChan, t, &wg) testConsumerWaitAssignment(c2, t) wrapRebalancecb3 := wrapRebalanceCb(&assignedEvents3, &revokedEvents3, t) err = c3.Subscribe(topic, wrapRebalancecb3) if err != nil { t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) - return } wg.Add(1) - go testPoll(c3, doneChan, t, wg) + go testPoll(c3, doneChan, t, &wg) testConsumerWaitAssignment(c3, t) closeChan <- true @@ -635,10 +632,10 @@ func TestConsumerCloseForStaticMember(t *testing.T) { err = c2.Subscribe(topic, wrapRebalancecb4) if err != nil { t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) - return } + wg.Add(1) - go testPoll(c2, doneChan, t, wg) + go testPoll(c2, doneChan, t, &wg) testConsumerWaitAssignment(c2, t) doneChan <- true @@ -650,8 +647,8 @@ func TestConsumerCloseForStaticMember(t *testing.T) { wg.Wait() - // Wait 30s to make sure no revokedEvents happens - time.Sleep(30 * time.Second) + // Wait 2 * session.timeout.ms to make sure no revokedEvents happens + time.Sleep(2 * 6000 * time.Millisecond) if atomic.LoadInt32(&assignedEvents1) != 3 { t.Fatalf("3 assignedEvents are Expected to happen for the first consumer, but %d happened\n", @@ -696,11 +693,11 @@ func testConsumerWaitAssignment(c *Consumer, t *testing.T) { for run { assignment, err := c.Assignment() if err != nil { - t.Fatalf("Assignment failed: %s", err) + t.Fatalf("Assignment failed: %s\n", err) } if len(assignment) != 0 { - fmt.Printf("%v Assigned partitions are: %v\n", c, assignment) + t.Logf("%v Assigned partitions are: %v\n", c, assignment) run = false } }