Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Rebalance events behavior for static membership and fix for consu… #757

Merged
merged 2 commits into from May 2, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,12 @@ This is a feature release:
See [examples/mock_cluster](examples/mock_cluster).


### Fixes

* Fix Rebalance events behavior for static membership (@jliunyu, #757).
* Fix consumer close taking 10 seconds when there's no rebalance
needed (@jliunyu, #757).

confluent-kafka-go is based on librdkafka FUTUREFIXME, see the
[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.9.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.
Expand Down
37 changes: 20 additions & 17 deletions kafka/consumer.go
Expand Up @@ -424,30 +424,33 @@ func (c *Consumer) Close() (err error) {
close(c.events)
}

// librdkafka's rd_kafka_consumer_close() will block
// and trigger the rebalance_cb() if one is set, if not, which is the
// case with the Go client since it registers EVENTs rather than callbacks,
// librdkafka will shortcut the rebalance_cb and do a forced unassign.
// But we can't have that since the application might need the final RevokePartitions
// before shutting down. So we trigger an Unsubscribe() first, wait for that to
// propagate (in the Poll loop below), and then close the consumer.
c.Unsubscribe()

// Poll for rebalance events
for {
c.Poll(10 * 1000)
if int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 {
break
doneChan := make(chan bool)

go func() {
C.rd_kafka_consumer_close(c.handle.rk)
// wake up Poll()
C.rd_kafka_queue_yield(c.handle.rkq)
doneChan <- true
}()

// wait for consumer_close() to finish while serving c.Poll() for rebalance callbacks/events
run := true
for run {
select {
case <-doneChan:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will block until doneChan is written to, which means if Poll() is triggered once for some other reason, e.g., a stats callback, then it will hang here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 to exist the loop which is same with the previous way?

for run {
		c.Poll(100)
		select {
		case <-doneChan:
			run = false

		default:
			if int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 {
				run = false
			}
		}
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I'm also confused why it will hang, I tested with this code with the existing change in this PR, it doesn't hang

func TestConsumerClose1(t *testing.T) {

	c, err := NewConsumer(&ConfigMap{
		"group.id":               "gotest"})
	if err != nil {
		t.Fatalf("%s", err)
	}

	startClose := time.Now()
	err = c.Close()
	if err != nil {
		panic(err)
	}
	fmt.Println(fmt.Sprintf("consumer closed. closing time: %v", time.Since(startClose)))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with the following sequence of events it will hang:

  1. application calls Close()
  2. Close() is blocking on c.Poll()
  3. A stats event (or something else which is not a rebalance event) triggers the Poll and Poll() wakes up and returns
  4. Close() is now blocking on doneChan
  5. librdkafka emits a rebalance event and waits for the app/go to ack it
  6. .. but since Close() is no longer calling Poll() the rebalance event is never handled.
  7. indefinite hang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is easily solved by moving the Poll to the default case in the select, like in other places in the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for run {
		select {
		case <-doneChan:
**-------------->// Is it possible that after the doneChan, there are still other events we need to handle? We always have to wait to read the doneChan value. If we exit from default, the doneChan will report some error**
			run = false

		default:
                         c.Poll(100)
			if int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 {
				run = false
			}
		}
	}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, when doneChan is done we don't need to call Poll no more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the queue_length() check needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it, I already removed it from my code.

run = false

default:
c.Poll(100)
}
}

close(doneChan)

// Destroy our queue
C.rd_kafka_queue_destroy(c.handle.rkq)
c.handle.rkq = nil

// Close the consumer
C.rd_kafka_consumer_close(c.handle.rk)

c.handle.cleanup()

C.rd_kafka_destroy(c.handle.rk)
Expand Down
276 changes: 276 additions & 0 deletions kafka/consumer_test.go
Expand Up @@ -22,6 +22,8 @@ import (
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -429,3 +431,277 @@ func TestConsumerLog(t *testing.T) {
}
}
}

func wrapRebalanceCb(assignedEvents *int32, revokedEvents *int32, t *testing.T) func(c *Consumer, event Event) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 on the wrapping to provide the testing.T

return func(c *Consumer, event Event) error {
switch ev := event.(type) {
case AssignedPartitions:
atomic.AddInt32(assignedEvents, 1)

t.Logf("%v, %s rebalance: %d new partition(s) assigned: %v\n",
c, c.GetRebalanceProtocol(), len(ev.Partitions),
ev.Partitions)
err := c.Assign(ev.Partitions)
if err != nil {
panic(err)
}

case RevokedPartitions:
atomic.AddInt32(revokedEvents, 1)

t.Logf("%v, %s rebalance: %d partition(s) revoked: %v\n",
c, c.GetRebalanceProtocol(), len(ev.Partitions),
ev.Partitions)
if c.AssignmentLost() {
// Our consumer has been kicked out of the group and the
// entire assignment is thus lost.
t.Logf("%v, Current assignment lost!\n", c)
}

// The client automatically calls Unassign() unless
// the callback has already called that method.
}
return nil
}
}

func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup) {
defer wg.Done()

run := true
for run {
select {
case <-doneChan:
run = false

default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *Message:
t.Logf("Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("Headers: %v\n", e.Headers)
jliunyu marked this conversation as resolved.
Show resolved Hide resolved
}

case Error:
// Errors should generally be
// considered informational, the client
// will try to automatically recover.
t.Logf("Error: %v: %v for "+
"consumer %v\n", e.Code(), e, c)

default:
t.Logf("Ignored %v for consumer %v\n",
e, c)
}
}
}
}

// TestConsumerCloseForStaticMember verifies the rebalance
jliunyu marked this conversation as resolved.
Show resolved Hide resolved
// for static membership.
// According to KIP-345, the consumer group will not trigger rebalance unless
// 1) A new member joins
// 2) A leader rejoins (possibly due to topic assignment change)
// 3) An existing member offline time is over session timeout
// 4) Broker receives a leave group request containing alistof
// `group.instance.id`s (details later)
//
// This test uses 3 consumers while each consumer joins after the assignment
// finished for the previous consumers.
// The expected behavior for these consumers are:
// 1) First consumer joins, AssignedPartitions happens. Assign all the
// partitions to it.
// 2) Second consumer joins, RevokedPartitions happens from the first consumer,
// then AssignedPartitions happens to both consumers.
// 3) Third consumer joins, RevokedPartitions happens from the previous two
// consumers, then AssignedPartitions happens to all the three consumers.
// 4) Close the second consumer, RevokedPartitions should not happen.
// 5) Rejoin the second consumer, rebalance should not happen to all the other
// consumers since it's not the leader, AssignedPartitions only happened
// to this consumer to assign the partitions.
//
// The total number of AssignedPartitions for the first consumer is 3,
// and the total number of RevokedPartitions for the first consumer is 2.
// The total number of AssignedPartitions for the second consumer is 2,
// and the total number of RevokedPartitions for the second consumer is 1.
// The total number of AssignedPartitions for the third consumer is 1,
// and the total number of RevokedPartitions for the second consumer is 0.
// The total number of AssignedPartitions for the rejoined consumer
// (originally second consumer) is 1,
// and the total number of RevokedPartitions for the rejoined consumer
// (originally second consumer) is 0.
func TestConsumerCloseForStaticMember(t *testing.T) {
jliunyu marked this conversation as resolved.
Show resolved Hide resolved
if !testconfRead() {
t.Skipf("Missing testconf.json")
}
broker := testconf.Brokers
topic := testconf.Topic

var assignedEvents1 int32
var revokedEvents1 int32

var assignedEvents2 int32
var revokedEvents2 int32

var assignedEvents3 int32
var revokedEvents3 int32

var assignedEvents4 int32
var revokedEvents4 int32

conf1 := ConfigMap{
"bootstrap.servers": broker,
"group.id": "rebalance",
"session.timeout.ms": "6000",
"max.poll.interval.ms": "10000",
"group.instance.id": "staticmember1",
}
c1, err := NewConsumer(&conf1)

conf2 := ConfigMap{
"bootstrap.servers": broker,
"group.id": "rebalance",
"session.timeout.ms": "6000",
"max.poll.interval.ms": "10000",
"group.instance.id": "staticmember2",
}
c2, err := NewConsumer(&conf2)
if err != nil {
t.Fatalf("%s", err)
}

conf3 := ConfigMap{
"bootstrap.servers": broker,
"group.id": "rebalance",
"session.timeout.ms": "6000",
"max.poll.interval.ms": "10000",
"group.instance.id": "staticmember3",
}

c3, err := NewConsumer(&conf3)
if err != nil {
t.Fatalf("%s", err)
}
wrapRebalancecb1 := wrapRebalanceCb(&assignedEvents1, &revokedEvents1, t)
err = c1.Subscribe(topic, wrapRebalancecb1)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}

wg := &sync.WaitGroup{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to keep this as a non-pointer here, and then pass it as pointer when needed (to testPoll()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep this as a non-pointer here, we need to use it as pointer each time, and there are a lot of use cases in the same function.

For example:
wg.Add(1)
wg.Wait()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pointer receiver methods apply also to non pointer variables, based on this.

So pointer should be needed to pass the WaitGroup struct by reference, but not to call its methods with pointer receiver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep this as a non-pointer here, we need to use it as pointer each time, and there are a lot of use cases in the same function.

I still believe it is more correct to keep it as a non-pointer here and pass it as a pointer where needed.
But it is a nit, so do as you wish.

doneChan := make(chan bool, 3)

wg.Add(1)
go testPoll(c1, doneChan, t, wg)
testConsumerWaitAssignment(c1, t)

closeChan := make(chan bool)
wrapRebalancecb2 := wrapRebalanceCb(&assignedEvents2, &revokedEvents2, t)
err = c2.Subscribe(topic, wrapRebalancecb2)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}
wg.Add(1)
go testPoll(c2, closeChan, t, wg)
testConsumerWaitAssignment(c2, t)

wrapRebalancecb3 := wrapRebalanceCb(&assignedEvents3, &revokedEvents3, t)
err = c3.Subscribe(topic, wrapRebalancecb3)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
return
}
wg.Add(1)
go testPoll(c3, doneChan, t, wg)
testConsumerWaitAssignment(c3, t)

closeChan <- true
close(closeChan)
c2.Close()

c2, err = NewConsumer(&conf2)
if err != nil {
t.Fatalf("%s", err)
}

wrapRebalancecb4 := wrapRebalanceCb(&assignedEvents4, &revokedEvents4, t)
err = c2.Subscribe(topic, wrapRebalancecb4)
if err != nil {
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err)
jliunyu marked this conversation as resolved.
Show resolved Hide resolved
return
jliunyu marked this conversation as resolved.
Show resolved Hide resolved
}
wg.Add(1)
go testPoll(c2, doneChan, t, wg)
testConsumerWaitAssignment(c2, t)

doneChan <- true
close(doneChan)

c3.Close()
c2.Close()
c1.Close()

wg.Wait()

// Wait 30s to make sure no revokedEvents happens
jliunyu marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(30 * time.Second)

if atomic.LoadInt32(&assignedEvents1) != 3 {
t.Fatalf("3 assignedEvents are Expected to happen for the first consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents1))
}

if atomic.LoadInt32(&revokedEvents1) != 2 {
t.Fatalf("2 revokedEvents are Expected to happen for the first consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents1))
}

if atomic.LoadInt32(&assignedEvents2) != 2 {
t.Fatalf("2 assignedEvents are Expected to happen for the second consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents2))
}
if atomic.LoadInt32(&revokedEvents2) != 1 {
t.Fatalf("1 revokedEvents is Expected to happen for the second consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents2))
}

if atomic.LoadInt32(&assignedEvents3) != 1 {
t.Fatalf("1 assignedEvents is Expected to happen for the third consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents3))
}
if atomic.LoadInt32(&revokedEvents3) != 0 {
t.Fatalf("0 revokedEvents is Expected to happen for the third consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents3))
}

if atomic.LoadInt32(&assignedEvents4) != 1 {
t.Fatalf("1 assignedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
atomic.LoadInt32(&assignedEvents4))
}
if atomic.LoadInt32(&revokedEvents4) != 0 {
t.Fatalf("0 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
atomic.LoadInt32(&revokedEvents4))
}
}

func testConsumerWaitAssignment(c *Consumer, t *testing.T) {
run := true
for run {
assignment, err := c.Assignment()
if err != nil {
t.Fatalf("Assignment failed: %s", err)
}

if len(assignment) != 0 {
fmt.Printf("%v Assigned partitions are: %v\n", c, assignment)
jliunyu marked this conversation as resolved.
Show resolved Hide resolved
run = false
}
}
}