diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fc823e05..6b93502f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,12 @@ This is a feature release: See [examples/mock_cluster](examples/mock_cluster). +### Fixes + + * Fix Rebalance events behavior for static membership (@jliunyu, #757). + * Fix consumer close taking 10 seconds when there's no rebalance + needed (@jliunyu, #757). + confluent-kafka-go is based on librdkafka FUTUREFIXME, see the [librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.9.0) for a complete list of changes, enhancements, fixes and upgrade considerations. diff --git a/kafka/consumer.go b/kafka/consumer.go index ef483300c..93e2260b8 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -424,30 +424,33 @@ func (c *Consumer) Close() (err error) { close(c.events) } - // librdkafka's rd_kafka_consumer_close() will block - // and trigger the rebalance_cb() if one is set, if not, which is the - // case with the Go client since it registers EVENTs rather than callbacks, - // librdkafka will shortcut the rebalance_cb and do a forced unassign. - // But we can't have that since the application might need the final RevokePartitions - // before shutting down. So we trigger an Unsubscribe() first, wait for that to - // propagate (in the Poll loop below), and then close the consumer. - c.Unsubscribe() - - // Poll for rebalance events - for { - c.Poll(10 * 1000) - if int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 { - break + doneChan := make(chan bool) + + go func() { + C.rd_kafka_consumer_close(c.handle.rk) + // wake up Poll() + C.rd_kafka_queue_yield(c.handle.rkq) + doneChan <- true + }() + + // wait for consumer_close() to finish while serving c.Poll() for rebalance callbacks/events + run := true + for run { + select { + case <-doneChan: + run = false + + default: + c.Poll(100) } } + close(doneChan) + // Destroy our queue C.rd_kafka_queue_destroy(c.handle.rkq) c.handle.rkq = nil - // Close the consumer - C.rd_kafka_consumer_close(c.handle.rk) - c.handle.cleanup() C.rd_kafka_destroy(c.handle.rk) diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index caa23e5c2..c1cdf70f7 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -22,6 +22,8 @@ import ( "reflect" "sort" "strings" + "sync" + "sync/atomic" "testing" "time" ) @@ -429,3 +431,274 @@ func TestConsumerLog(t *testing.T) { } } } + +func wrapRebalanceCb(assignedEvents *int32, revokedEvents *int32, t *testing.T) func(c *Consumer, event Event) error { + return func(c *Consumer, event Event) error { + switch ev := event.(type) { + case AssignedPartitions: + atomic.AddInt32(assignedEvents, 1) + + t.Logf("%v, %s rebalance: %d new partition(s) assigned: %v\n", + c, c.GetRebalanceProtocol(), len(ev.Partitions), + ev.Partitions) + err := c.Assign(ev.Partitions) + if err != nil { + panic(err) + } + + case RevokedPartitions: + atomic.AddInt32(revokedEvents, 1) + + t.Logf("%v, %s rebalance: %d partition(s) revoked: %v\n", + c, c.GetRebalanceProtocol(), len(ev.Partitions), + ev.Partitions) + if c.AssignmentLost() { + // Our consumer has been kicked out of the group and the + // entire assignment is thus lost. + t.Logf("%v, Current assignment lost!\n", c) + } + + // The client automatically calls Unassign() unless + // the callback has already called that method. + } + return nil + } +} + +func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup) { + defer wg.Done() + + run := true + for run { + select { + case <-doneChan: + run = false + + default: + ev := c.Poll(100) + if ev == nil { + continue + } + switch e := ev.(type) { + case *Message: + t.Logf("Message on %s:\n%s\n", + e.TopicPartition, string(e.Value)) + if e.Headers != nil { + t.Logf("Headers: %v\n", e.Headers) + } + + case Error: + // Errors should generally be + // considered informational, the client + // will try to automatically recover. + t.Logf("Error: %v: %v for "+ + "consumer %v\n", e.Code(), e, c) + + default: + t.Logf("Ignored %v for consumer %v\n", + e, c) + } + } + } +} + +// TestConsumerCloseForStaticMember verifies the rebalance +// for static membership. +// According to KIP-345, the consumer group will not trigger rebalance unless +// 1) A new member joins +// 2) A leader rejoins (possibly due to topic assignment change) +// 3) An existing member offline time is over session timeout +// 4) Broker receives a leave group request containing alistof +// `group.instance.id`s (details later) +// +// This test uses 3 consumers while each consumer joins after the assignment +// finished for the previous consumers. +// The expected behavior for these consumers are: +// 1) First consumer joins, AssignedPartitions happens. Assign all the +// partitions to it. +// 2) Second consumer joins, RevokedPartitions happens from the first consumer, +// then AssignedPartitions happens to both consumers. +// 3) Third consumer joins, RevokedPartitions happens from the previous two +// consumers, then AssignedPartitions happens to all the three consumers. +// 4) Close the second consumer, RevokedPartitions should not happen. +// 5) Rejoin the second consumer, rebalance should not happen to all the other +// consumers since it's not the leader, AssignedPartitions only happened +// to this consumer to assign the partitions. +// +// The total number of AssignedPartitions for the first consumer is 3, +// and the total number of RevokedPartitions for the first consumer is 2. +// The total number of AssignedPartitions for the second consumer is 2, +// and the total number of RevokedPartitions for the second consumer is 1. +// The total number of AssignedPartitions for the third consumer is 1, +// and the total number of RevokedPartitions for the second consumer is 0. +// The total number of AssignedPartitions for the rejoined consumer +// (originally second consumer) is 1, +// and the total number of RevokedPartitions for the rejoined consumer +// (originally second consumer) is 0. +func TestConsumerCloseForStaticMember(t *testing.T) { + if !testconfRead() { + t.Skipf("Missing testconf.json") + } + broker := testconf.Brokers + topic := testconf.Topic + + var assignedEvents1 int32 + var revokedEvents1 int32 + + var assignedEvents2 int32 + var revokedEvents2 int32 + + var assignedEvents3 int32 + var revokedEvents3 int32 + + var assignedEvents4 int32 + var revokedEvents4 int32 + + conf1 := ConfigMap{ + "bootstrap.servers": broker, + "group.id": "rebalance", + "session.timeout.ms": "6000", + "max.poll.interval.ms": "10000", + "group.instance.id": "staticmember1", + } + c1, err := NewConsumer(&conf1) + + conf2 := ConfigMap{ + "bootstrap.servers": broker, + "group.id": "rebalance", + "session.timeout.ms": "6000", + "max.poll.interval.ms": "10000", + "group.instance.id": "staticmember2", + } + c2, err := NewConsumer(&conf2) + if err != nil { + t.Fatalf("%s", err) + } + + conf3 := ConfigMap{ + "bootstrap.servers": broker, + "group.id": "rebalance", + "session.timeout.ms": "6000", + "max.poll.interval.ms": "10000", + "group.instance.id": "staticmember3", + } + + c3, err := NewConsumer(&conf3) + if err != nil { + t.Fatalf("%s", err) + } + wrapRebalancecb1 := wrapRebalanceCb(&assignedEvents1, &revokedEvents1, t) + err = c1.Subscribe(topic, wrapRebalancecb1) + if err != nil { + t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) + } + + wg := sync.WaitGroup{} + doneChan := make(chan bool, 3) + + wg.Add(1) + go testPoll(c1, doneChan, t, &wg) + testConsumerWaitAssignment(c1, t) + + closeChan := make(chan bool) + wrapRebalancecb2 := wrapRebalanceCb(&assignedEvents2, &revokedEvents2, t) + err = c2.Subscribe(topic, wrapRebalancecb2) + if err != nil { + t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) + } + wg.Add(1) + go testPoll(c2, closeChan, t, &wg) + testConsumerWaitAssignment(c2, t) + + wrapRebalancecb3 := wrapRebalanceCb(&assignedEvents3, &revokedEvents3, t) + err = c3.Subscribe(topic, wrapRebalancecb3) + if err != nil { + t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) + } + wg.Add(1) + go testPoll(c3, doneChan, t, &wg) + testConsumerWaitAssignment(c3, t) + + closeChan <- true + close(closeChan) + c2.Close() + + c2, err = NewConsumer(&conf2) + if err != nil { + t.Fatalf("%s", err) + } + + wrapRebalancecb4 := wrapRebalanceCb(&assignedEvents4, &revokedEvents4, t) + err = c2.Subscribe(topic, wrapRebalancecb4) + if err != nil { + t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) + } + + wg.Add(1) + go testPoll(c2, doneChan, t, &wg) + testConsumerWaitAssignment(c2, t) + + doneChan <- true + close(doneChan) + + c3.Close() + c2.Close() + c1.Close() + + wg.Wait() + + // Wait 2 * session.timeout.ms to make sure no revokedEvents happens + time.Sleep(2 * 6000 * time.Millisecond) + + if atomic.LoadInt32(&assignedEvents1) != 3 { + t.Fatalf("3 assignedEvents are Expected to happen for the first consumer, but %d happened\n", + atomic.LoadInt32(&assignedEvents1)) + } + + if atomic.LoadInt32(&revokedEvents1) != 2 { + t.Fatalf("2 revokedEvents are Expected to happen for the first consumer, but %d happened\n", + atomic.LoadInt32(&revokedEvents1)) + } + + if atomic.LoadInt32(&assignedEvents2) != 2 { + t.Fatalf("2 assignedEvents are Expected to happen for the second consumer, but %d happened\n", + atomic.LoadInt32(&assignedEvents2)) + } + if atomic.LoadInt32(&revokedEvents2) != 1 { + t.Fatalf("1 revokedEvents is Expected to happen for the second consumer, but %d happened\n", + atomic.LoadInt32(&revokedEvents2)) + } + + if atomic.LoadInt32(&assignedEvents3) != 1 { + t.Fatalf("1 assignedEvents is Expected to happen for the third consumer, but %d happened\n", + atomic.LoadInt32(&assignedEvents3)) + } + if atomic.LoadInt32(&revokedEvents3) != 0 { + t.Fatalf("0 revokedEvents is Expected to happen for the third consumer, but %d happened\n", + atomic.LoadInt32(&revokedEvents3)) + } + + if atomic.LoadInt32(&assignedEvents4) != 1 { + t.Fatalf("1 assignedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n", + atomic.LoadInt32(&assignedEvents4)) + } + if atomic.LoadInt32(&revokedEvents4) != 0 { + t.Fatalf("0 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n", + atomic.LoadInt32(&revokedEvents4)) + } +} + +func testConsumerWaitAssignment(c *Consumer, t *testing.T) { + run := true + for run { + assignment, err := c.Assignment() + if err != nil { + t.Fatalf("Assignment failed: %s\n", err) + } + + if len(assignment) != 0 { + t.Logf("%v Assigned partitions are: %v\n", c, assignment) + run = false + } + } +}