New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Rebalance events behavior for static membership and fix for consu… #757
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,8 @@ import ( | |
"reflect" | ||
"sort" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
"testing" | ||
"time" | ||
) | ||
|
@@ -429,3 +431,274 @@ func TestConsumerLog(t *testing.T) { | |
} | ||
} | ||
} | ||
|
||
func wrapRebalanceCb(assignedEvents *int32, revokedEvents *int32, t *testing.T) func(c *Consumer, event Event) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 on the wrapping to provide the testing.T |
||
return func(c *Consumer, event Event) error { | ||
switch ev := event.(type) { | ||
case AssignedPartitions: | ||
atomic.AddInt32(assignedEvents, 1) | ||
|
||
t.Logf("%v, %s rebalance: %d new partition(s) assigned: %v\n", | ||
c, c.GetRebalanceProtocol(), len(ev.Partitions), | ||
ev.Partitions) | ||
err := c.Assign(ev.Partitions) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
case RevokedPartitions: | ||
atomic.AddInt32(revokedEvents, 1) | ||
|
||
t.Logf("%v, %s rebalance: %d partition(s) revoked: %v\n", | ||
c, c.GetRebalanceProtocol(), len(ev.Partitions), | ||
ev.Partitions) | ||
if c.AssignmentLost() { | ||
// Our consumer has been kicked out of the group and the | ||
// entire assignment is thus lost. | ||
t.Logf("%v, Current assignment lost!\n", c) | ||
} | ||
|
||
// The client automatically calls Unassign() unless | ||
// the callback has already called that method. | ||
} | ||
return nil | ||
} | ||
} | ||
|
||
func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup) { | ||
defer wg.Done() | ||
|
||
run := true | ||
for run { | ||
select { | ||
case <-doneChan: | ||
run = false | ||
|
||
default: | ||
ev := c.Poll(100) | ||
if ev == nil { | ||
continue | ||
} | ||
switch e := ev.(type) { | ||
case *Message: | ||
t.Logf("Message on %s:\n%s\n", | ||
e.TopicPartition, string(e.Value)) | ||
if e.Headers != nil { | ||
t.Logf("Headers: %v\n", e.Headers) | ||
} | ||
|
||
case Error: | ||
// Errors should generally be | ||
// considered informational, the client | ||
// will try to automatically recover. | ||
t.Logf("Error: %v: %v for "+ | ||
"consumer %v\n", e.Code(), e, c) | ||
|
||
default: | ||
t.Logf("Ignored %v for consumer %v\n", | ||
e, c) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// TestConsumerCloseForStaticMember verifies the rebalance | ||
jliunyu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// for static membership. | ||
// According to KIP-345, the consumer group will not trigger rebalance unless | ||
// 1) A new member joins | ||
// 2) A leader rejoins (possibly due to topic assignment change) | ||
// 3) An existing member offline time is over session timeout | ||
// 4) Broker receives a leave group request containing alistof | ||
// `group.instance.id`s (details later) | ||
// | ||
// This test uses 3 consumers while each consumer joins after the assignment | ||
// finished for the previous consumers. | ||
// The expected behavior for these consumers are: | ||
// 1) First consumer joins, AssignedPartitions happens. Assign all the | ||
// partitions to it. | ||
// 2) Second consumer joins, RevokedPartitions happens from the first consumer, | ||
// then AssignedPartitions happens to both consumers. | ||
// 3) Third consumer joins, RevokedPartitions happens from the previous two | ||
// consumers, then AssignedPartitions happens to all the three consumers. | ||
// 4) Close the second consumer, RevokedPartitions should not happen. | ||
// 5) Rejoin the second consumer, rebalance should not happen to all the other | ||
// consumers since it's not the leader, AssignedPartitions only happened | ||
// to this consumer to assign the partitions. | ||
// | ||
// The total number of AssignedPartitions for the first consumer is 3, | ||
// and the total number of RevokedPartitions for the first consumer is 2. | ||
// The total number of AssignedPartitions for the second consumer is 2, | ||
// and the total number of RevokedPartitions for the second consumer is 1. | ||
// The total number of AssignedPartitions for the third consumer is 1, | ||
// and the total number of RevokedPartitions for the second consumer is 0. | ||
// The total number of AssignedPartitions for the rejoined consumer | ||
// (originally second consumer) is 1, | ||
// and the total number of RevokedPartitions for the rejoined consumer | ||
// (originally second consumer) is 0. | ||
func TestConsumerCloseForStaticMember(t *testing.T) { | ||
jliunyu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if !testconfRead() { | ||
t.Skipf("Missing testconf.json") | ||
} | ||
broker := testconf.Brokers | ||
topic := testconf.Topic | ||
|
||
var assignedEvents1 int32 | ||
var revokedEvents1 int32 | ||
|
||
var assignedEvents2 int32 | ||
var revokedEvents2 int32 | ||
|
||
var assignedEvents3 int32 | ||
var revokedEvents3 int32 | ||
|
||
var assignedEvents4 int32 | ||
var revokedEvents4 int32 | ||
|
||
conf1 := ConfigMap{ | ||
"bootstrap.servers": broker, | ||
"group.id": "rebalance", | ||
"session.timeout.ms": "6000", | ||
"max.poll.interval.ms": "10000", | ||
"group.instance.id": "staticmember1", | ||
} | ||
c1, err := NewConsumer(&conf1) | ||
|
||
conf2 := ConfigMap{ | ||
"bootstrap.servers": broker, | ||
"group.id": "rebalance", | ||
"session.timeout.ms": "6000", | ||
"max.poll.interval.ms": "10000", | ||
"group.instance.id": "staticmember2", | ||
} | ||
c2, err := NewConsumer(&conf2) | ||
if err != nil { | ||
t.Fatalf("%s", err) | ||
} | ||
|
||
conf3 := ConfigMap{ | ||
"bootstrap.servers": broker, | ||
"group.id": "rebalance", | ||
"session.timeout.ms": "6000", | ||
"max.poll.interval.ms": "10000", | ||
"group.instance.id": "staticmember3", | ||
} | ||
|
||
c3, err := NewConsumer(&conf3) | ||
if err != nil { | ||
t.Fatalf("%s", err) | ||
} | ||
wrapRebalancecb1 := wrapRebalanceCb(&assignedEvents1, &revokedEvents1, t) | ||
err = c1.Subscribe(topic, wrapRebalancecb1) | ||
if err != nil { | ||
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) | ||
} | ||
|
||
wg := sync.WaitGroup{} | ||
doneChan := make(chan bool, 3) | ||
|
||
wg.Add(1) | ||
go testPoll(c1, doneChan, t, &wg) | ||
testConsumerWaitAssignment(c1, t) | ||
|
||
closeChan := make(chan bool) | ||
wrapRebalancecb2 := wrapRebalanceCb(&assignedEvents2, &revokedEvents2, t) | ||
err = c2.Subscribe(topic, wrapRebalancecb2) | ||
if err != nil { | ||
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) | ||
} | ||
wg.Add(1) | ||
go testPoll(c2, closeChan, t, &wg) | ||
testConsumerWaitAssignment(c2, t) | ||
|
||
wrapRebalancecb3 := wrapRebalanceCb(&assignedEvents3, &revokedEvents3, t) | ||
err = c3.Subscribe(topic, wrapRebalancecb3) | ||
if err != nil { | ||
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) | ||
} | ||
wg.Add(1) | ||
go testPoll(c3, doneChan, t, &wg) | ||
testConsumerWaitAssignment(c3, t) | ||
|
||
closeChan <- true | ||
close(closeChan) | ||
c2.Close() | ||
|
||
c2, err = NewConsumer(&conf2) | ||
if err != nil { | ||
t.Fatalf("%s", err) | ||
} | ||
|
||
wrapRebalancecb4 := wrapRebalanceCb(&assignedEvents4, &revokedEvents4, t) | ||
err = c2.Subscribe(topic, wrapRebalancecb4) | ||
if err != nil { | ||
t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) | ||
jliunyu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
wg.Add(1) | ||
go testPoll(c2, doneChan, t, &wg) | ||
testConsumerWaitAssignment(c2, t) | ||
|
||
doneChan <- true | ||
close(doneChan) | ||
|
||
c3.Close() | ||
c2.Close() | ||
c1.Close() | ||
|
||
wg.Wait() | ||
|
||
// Wait 2 * session.timeout.ms to make sure no revokedEvents happens | ||
time.Sleep(2 * 6000 * time.Millisecond) | ||
|
||
if atomic.LoadInt32(&assignedEvents1) != 3 { | ||
t.Fatalf("3 assignedEvents are Expected to happen for the first consumer, but %d happened\n", | ||
atomic.LoadInt32(&assignedEvents1)) | ||
} | ||
|
||
if atomic.LoadInt32(&revokedEvents1) != 2 { | ||
t.Fatalf("2 revokedEvents are Expected to happen for the first consumer, but %d happened\n", | ||
atomic.LoadInt32(&revokedEvents1)) | ||
} | ||
|
||
if atomic.LoadInt32(&assignedEvents2) != 2 { | ||
t.Fatalf("2 assignedEvents are Expected to happen for the second consumer, but %d happened\n", | ||
atomic.LoadInt32(&assignedEvents2)) | ||
} | ||
if atomic.LoadInt32(&revokedEvents2) != 1 { | ||
t.Fatalf("1 revokedEvents is Expected to happen for the second consumer, but %d happened\n", | ||
atomic.LoadInt32(&revokedEvents2)) | ||
} | ||
|
||
if atomic.LoadInt32(&assignedEvents3) != 1 { | ||
t.Fatalf("1 assignedEvents is Expected to happen for the third consumer, but %d happened\n", | ||
atomic.LoadInt32(&assignedEvents3)) | ||
} | ||
if atomic.LoadInt32(&revokedEvents3) != 0 { | ||
t.Fatalf("0 revokedEvents is Expected to happen for the third consumer, but %d happened\n", | ||
atomic.LoadInt32(&revokedEvents3)) | ||
} | ||
|
||
if atomic.LoadInt32(&assignedEvents4) != 1 { | ||
t.Fatalf("1 assignedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n", | ||
atomic.LoadInt32(&assignedEvents4)) | ||
} | ||
if atomic.LoadInt32(&revokedEvents4) != 0 { | ||
t.Fatalf("0 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n", | ||
atomic.LoadInt32(&revokedEvents4)) | ||
} | ||
} | ||
|
||
func testConsumerWaitAssignment(c *Consumer, t *testing.T) { | ||
run := true | ||
for run { | ||
assignment, err := c.Assignment() | ||
if err != nil { | ||
t.Fatalf("Assignment failed: %s\n", err) | ||
} | ||
|
||
if len(assignment) != 0 { | ||
t.Logf("%v Assigned partitions are: %v\n", c, assignment) | ||
run = false | ||
} | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will block until doneChan is written to, which means if Poll() is triggered once for some other reason, e.g., a stats callback, then it will hang here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use
int(C.rd_kafka_queue_length(c.handle.rkq)) == 0
to exist the loop which is same with the previous way?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I'm also confused why it will hang, I tested with this code with the existing change in this PR, it doesn't hang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with the following sequence of events it will hang:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is easily solved by moving the Poll to the default case in the select, like in other places in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, when doneChan is done we don't need to call Poll no more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the queue_length() check needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need it, I already removed it from my code.