diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b93502f4..94a57652c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,8 @@ This is a feature release: ### Fixes - * Fix Rebalance events behavior for static membership (@jliunyu, #757). + * Fix Rebalance events behavior for static membership (@jliunyu, #757, + #798). * Fix consumer close taking 10 seconds when there's no rebalance needed (@jliunyu, #757). diff --git a/kafka/consumer.go b/kafka/consumer.go index 93e2260b8..263de387e 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -424,29 +424,12 @@ func (c *Consumer) Close() (err error) { close(c.events) } - doneChan := make(chan bool) - - go func() { - C.rd_kafka_consumer_close(c.handle.rk) - // wake up Poll() - C.rd_kafka_queue_yield(c.handle.rkq) - doneChan <- true - }() - - // wait for consumer_close() to finish while serving c.Poll() for rebalance callbacks/events - run := true - for run { - select { - case <-doneChan: - run = false + C.rd_kafka_consumer_close_queue(c.handle.rk, c.handle.rkq) - default: - c.Poll(100) - } + for C.rd_kafka_consumer_closed(c.handle.rk) != 1 { + c.Poll(100) } - close(doneChan) - // Destroy our queue C.rd_kafka_queue_destroy(c.handle.rkq) c.handle.rkq = nil diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index c1cdf70f7..7f5ea38d0 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -520,27 +520,33 @@ func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup) // then AssignedPartitions happens to both consumers. // 3) Third consumer joins, RevokedPartitions happens from the previous two // consumers, then AssignedPartitions happens to all the three consumers. -// 4) Close the second consumer, RevokedPartitions should not happen. +// 4) Close the second consumer, revoke its assignments will happen, but it +// should not notice other consumers. // 5) Rejoin the second consumer, rebalance should not happen to all the other // consumers since it's not the leader, AssignedPartitions only happened // to this consumer to assign the partitions. +// 6) Close the third consumer, revoke its assignments will happen, but it +// should not notice other consumers. +// 7) Close the rejoined consumer, revoke its assignments will happen, +// but it should not notice other consumers. +// 8) Close the first consumer, revoke its assignments will happen. // // The total number of AssignedPartitions for the first consumer is 3, -// and the total number of RevokedPartitions for the first consumer is 2. +// and the total number of RevokedPartitions for the first consumer is 3. // The total number of AssignedPartitions for the second consumer is 2, -// and the total number of RevokedPartitions for the second consumer is 1. +// and the total number of RevokedPartitions for the second consumer is 2. // The total number of AssignedPartitions for the third consumer is 1, -// and the total number of RevokedPartitions for the second consumer is 0. +// and the total number of RevokedPartitions for the third consumer is 1. // The total number of AssignedPartitions for the rejoined consumer // (originally second consumer) is 1, // and the total number of RevokedPartitions for the rejoined consumer -// (originally second consumer) is 0. +// (originally second consumer) is 1. func TestConsumerCloseForStaticMember(t *testing.T) { if !testconfRead() { t.Skipf("Missing testconf.json") } broker := testconf.Brokers - topic := testconf.Topic + topic := createTestTopic(t, "staticMembership", 3, 1) var assignedEvents1 int32 var revokedEvents1 int32 @@ -655,8 +661,8 @@ func TestConsumerCloseForStaticMember(t *testing.T) { atomic.LoadInt32(&assignedEvents1)) } - if atomic.LoadInt32(&revokedEvents1) != 2 { - t.Fatalf("2 revokedEvents are Expected to happen for the first consumer, but %d happened\n", + if atomic.LoadInt32(&revokedEvents1) != 3 { + t.Fatalf("3 revokedEvents are Expected to happen for the first consumer, but %d happened\n", atomic.LoadInt32(&revokedEvents1)) } @@ -664,8 +670,8 @@ func TestConsumerCloseForStaticMember(t *testing.T) { t.Fatalf("2 assignedEvents are Expected to happen for the second consumer, but %d happened\n", atomic.LoadInt32(&assignedEvents2)) } - if atomic.LoadInt32(&revokedEvents2) != 1 { - t.Fatalf("1 revokedEvents is Expected to happen for the second consumer, but %d happened\n", + if atomic.LoadInt32(&revokedEvents2) != 2 { + t.Fatalf("2 revokedEvents is Expected to happen for the second consumer, but %d happened\n", atomic.LoadInt32(&revokedEvents2)) } @@ -673,8 +679,8 @@ func TestConsumerCloseForStaticMember(t *testing.T) { t.Fatalf("1 assignedEvents is Expected to happen for the third consumer, but %d happened\n", atomic.LoadInt32(&assignedEvents3)) } - if atomic.LoadInt32(&revokedEvents3) != 0 { - t.Fatalf("0 revokedEvents is Expected to happen for the third consumer, but %d happened\n", + if atomic.LoadInt32(&revokedEvents3) != 1 { + t.Fatalf("1 revokedEvents is Expected to happen for the third consumer, but %d happened\n", atomic.LoadInt32(&revokedEvents3)) } @@ -682,8 +688,8 @@ func TestConsumerCloseForStaticMember(t *testing.T) { t.Fatalf("1 assignedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n", atomic.LoadInt32(&assignedEvents4)) } - if atomic.LoadInt32(&revokedEvents4) != 0 { - t.Fatalf("0 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n", + if atomic.LoadInt32(&revokedEvents4) != 1 { + t.Fatalf("1 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n", atomic.LoadInt32(&revokedEvents4)) } }