Skip to content

Commit

Permalink
Fix Rebalance events behavior for static membership and fix for consu…
Browse files Browse the repository at this point in the history
…mer close takes longer than 10s
  • Loading branch information
jliunyu committed Apr 15, 2022
1 parent b97c440 commit f9ff23f
Show file tree
Hide file tree
Showing 3 changed files with 302 additions and 17 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,12 @@ This is a feature release:
See [examples/mock_cluster](examples/mock_cluster).


### Fixes

* Fix Rebalance events behavior for static membership (@jliunyu, #757).
* Fix consumer close taking 10 seconds when there's no rebalance
needed (@jliunyu, #757).

confluent-kafka-go is based on librdkafka FUTUREFIXME, see the
[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.9.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.
Expand Down
37 changes: 20 additions & 17 deletions kafka/consumer.go
Expand Up @@ -424,30 +424,33 @@ func (c *Consumer) Close() (err error) {
close(c.events)
}

// librdkafka's rd_kafka_consumer_close() will block
// and trigger the rebalance_cb() if one is set, if not, which is the
// case with the Go client since it registers EVENTs rather than callbacks,
// librdkafka will shortcut the rebalance_cb and do a forced unassign.
// But we can't have that since the application might need the final RevokePartitions
// before shutting down. So we trigger an Unsubscribe() first, wait for that to
// propagate (in the Poll loop below), and then close the consumer.
c.Unsubscribe()

// Poll for rebalance events
for {
c.Poll(10 * 1000)
if int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 {
break
doneChan := make(chan bool)

go func() {
C.rd_kafka_consumer_close(c.handle.rk)
// wake up Poll()
C.rd_kafka_queue_yield(c.handle.rkq)
doneChan <- true
}()

// wait for consumer_close() to finish while serving c.Poll() for rebalance callbacks/events
run := true
for run {
select {
case <-doneChan:
run = false

default:
c.Poll(100)
}
}

close(doneChan)

// Destroy our queue
C.rd_kafka_queue_destroy(c.handle.rkq)
c.handle.rkq = nil

// Close the consumer
C.rd_kafka_consumer_close(c.handle.rk)

c.handle.cleanup()

C.rd_kafka_destroy(c.handle.rk)
Expand Down
276 changes: 276 additions & 0 deletions kafka/consumer_test.go
Expand Up @@ -22,6 +22,8 @@ import (
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -429,3 +431,277 @@ func TestConsumerLog(t *testing.T) {
}
}
}

func wrapRebalanceCb(assignedEvents *int32, revokedEvents *int32, t *testing.T) func(c *Consumer, event Event) error {
return func(c *Consumer, event Event) error {
switch ev := event.(type) {
case AssignedPartitions:
atomic.AddInt32(assignedEvents, 1)

t.Logf("%v, %s rebalance: %d new partition(s) assigned: %v\n",
c, c.GetRebalanceProtocol(), len(ev.Partitions),
ev.Partitions)
err := c.Assign(ev.Partitions)
if err != nil {
panic(err)
}

case RevokedPartitions:
atomic.AddInt32(revokedEvents, 1)

t.Logf("%v, %s rebalance: %d partition(s) revoked: %v\n",
c, c.GetRebalanceProtocol(), len(ev.Partitions),
ev.Partitions)
if c.AssignmentLost() {
// Our consumer has been kicked out of the group and the
// entire assignment is thus lost.
t.Logf("%v, Current assignment lost!\n", c)
}

// The client automatically calls Unassign() unless
// the callback has already called that method.
}
return nil
}
}

func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup) {
defer wg.Done()

run := true
for run {
select {
case <-doneChan:
run = false

default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *Message:
t.Logf("Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("Headers: %v\n", e.Headers)
}

case Error:
// Errors should generally be
// considered informational, the client
// will try to automatically recover.
t.Logf("Error: %v: %v for "+
"consumer %v\n", e.Code(), e, c)

default:
t.Logf("Ignored %v for consumer %v\n",
e, c)
}
}
}
}

// TestConsumerCloseForStaticMember verifies the rebalance
// for static membership.
// According to KIP-345, the consumer group will not trigger rebalance unless
// 1) A new member joins
// 2) A leader rejoins (possibly due to topic assignment change)
// 3) An existing member offline time is over session timeout
// 4) Broker receives a leave group request containing alistof
// `group.instance.id`s (details later)
//
// This test uses 3 consumers while each consumer joins after the assignment
// finished for the previous consumers.
// The expected behavior for these consumers are:
// 1) First consumer joins, AssignedPartitions happens. Assign all the
// partitions to it.
// 2) Second consumer joins, RevokedPartitions happens from the first consumer,
// then AssignedPartitions happens to both consumers.
// 3) Third consumer joins, RevokedPartitions happens from the previous two
// consumers, then AssignedPartitions happens to all the three consumers.
// 4) Close the second consumer, RevokedPartitions should not happen.
// 5) Rejoin the second consumer, rebalance should not happen to all the other
// consumers since it's not the leader, AssignedPartitions only happened
// to this consumer to assign the partitions.
//
// The total number of AssignedPartitions for the first consumer is 3,
// and the total number of RevokedPartitions for the first consumer is 2.
// The total number of AssignedPartitions for the second consumer is 2,
// and the total number of RevokedPartitions for the second consumer is 1.
// The total number of AssignedPartitions for the third consumer is 1,
// and the total number of RevokedPartitions for the second consumer is 0.
// The total number of AssignedPartitions for the rejoined consumer
// (originally second consumer) is 1,
// and the total number of RevokedPartitions for the rejoined consumer
// (originally second consumer) is 0.
func TestConsumerCloseForStaticMember(t *testing.T) {
if !testconfRead() {
t.Skipf("Missing testconf.json")
}
broker := testconf.Brokers
topic := testconf.Topic

var assignedEvents1 int32
var revokedEvents1 int32

var assignedEvents2 int32
var revokedEvents2 int32

var assignedEvents3 int32
var revokedEvents3 int32

var assignedEvents4 int32
var revokedEvents4 int32

conf1 := ConfigMap{
"bootstrap.servers": broker,
"group.id": "rebalance",
"session.timeout.ms": "6000",
"max.poll.interval.ms": "10000",
"group.instance.id": "staticmember1",
}
c1, err := NewConsumer(&conf1)

conf2 := ConfigMap{
"bootstrap.servers": broker,
"group.id": "rebalance",
"session.timeout.ms": "6000",
"max.poll.interval.ms": "10000",
"group.instance.id": "staticmember2",
}
c2, err := NewConsumer(&conf2)
if err != nil {
t.Fatalf("%s", err)
}

conf3 := ConfigMap{
"bootstrap.servers": broker,
"group.id": "rebalance",
"session.timeout.ms": "6000",
"max.poll.interval.ms": "10000",
"group.instance.id": "staticmember3",
}

c3, err := NewConsumer(&conf3)
if err != nil {
t.Fatalf("%s", err)
}
wrapRebalancecb1 := wrapRebalanceCb(&assignedEvents1, &revokedEvents1, t)
err = c1.Subscribe(topic, wrapRebalancecb1)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}

wg := &sync.WaitGroup{}
doneChan := make(chan bool, 3)

wg.Add(1)
go testPoll(c1, doneChan, t, wg)
testConsumerWaitAssignment(c1, t)

closeChan := make(chan bool)
wrapRebalancecb2 := wrapRebalanceCb(&assignedEvents2, &revokedEvents2, t)
err = c2.Subscribe(topic, wrapRebalancecb2)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}
wg.Add(1)
go testPoll(c2, closeChan, t, wg)
testConsumerWaitAssignment(c2, t)

wrapRebalancecb3 := wrapRebalanceCb(&assignedEvents3, &revokedEvents3, t)
err = c3.Subscribe(topic, wrapRebalancecb3)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}
wg.Add(1)
go testPoll(c3, doneChan, t, wg)
testConsumerWaitAssignment(c3, t)

closeChan <- true
close(closeChan)
c2.Close()

c2, err = NewConsumer(&conf2)
if err != nil {
t.Fatalf("%s", err)
}

wrapRebalancecb4 := wrapRebalanceCb(&assignedEvents4, &revokedEvents4, t)
err = c2.Subscribe(topic, wrapRebalancecb4)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}
wg.Add(1)
go testPoll(c2, doneChan, t, wg)
testConsumerWaitAssignment(c2, t)

doneChan <- true
close(doneChan)

c3.Close()
c2.Close()
c1.Close()

wg.Wait()

// Wait 30s to make sure no revokedEvents happens
time.Sleep(30 * time.Second)

if atomic.LoadInt32(&assignedEvents1) != 3 {
t.Fatalf("3 assignedEvents are Expected to happen for the first consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents1))
}

if atomic.LoadInt32(&revokedEvents1) != 2 {
t.Fatalf("2 revokedEvents are Expected to happen for the first consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents1))
}

if atomic.LoadInt32(&assignedEvents2) != 2 {
t.Fatalf("2 assignedEvents are Expected to happen for the second consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents2))
}
if atomic.LoadInt32(&revokedEvents2) != 1 {
t.Fatalf("1 revokedEvents is Expected to happen for the second consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents2))
}

if atomic.LoadInt32(&assignedEvents3) != 1 {
t.Fatalf("1 assignedEvents is Expected to happen for the third consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents3))
}
if atomic.LoadInt32(&revokedEvents3) != 0 {
t.Fatalf("0 revokedEvents is Expected to happen for the third consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents3))
}

if atomic.LoadInt32(&assignedEvents4) != 1 {
t.Fatalf("1 assignedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
atomic.LoadInt32(&assignedEvents4))
}
if atomic.LoadInt32(&revokedEvents4) != 0 {
t.Fatalf("0 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
atomic.LoadInt32(&revokedEvents4))
}
}

func testConsumerWaitAssignment(c *Consumer, t *testing.T) {
run := true
for run {
assignment, err := c.Assignment()
if err != nil {
t.Fatalf("Assignment failed: %s", err)
}

if len(assignment) != 0 {
fmt.Printf("%v Assigned partitions are: %v\n", c, assignment)
run = false
}
}
}

0 comments on commit f9ff23f

Please sign in to comment.