Skip to content

Commit

Permalink
Update the consumer close function with the latest rd_kafka_consumer_…
Browse files Browse the repository at this point in the history
…close_queue API
  • Loading branch information
jliunyu committed Jun 16, 2022
1 parent f195ac6 commit 431cab0
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 34 deletions.
23 changes: 3 additions & 20 deletions kafka/consumer.go
Expand Up @@ -424,29 +424,12 @@ func (c *Consumer) Close() (err error) {
close(c.events)
}

doneChan := make(chan bool)

go func() {
C.rd_kafka_consumer_close(c.handle.rk)
// wake up Poll()
C.rd_kafka_queue_yield(c.handle.rkq)
doneChan <- true
}()

// wait for consumer_close() to finish while serving c.Poll() for rebalance callbacks/events
run := true
for run {
select {
case <-doneChan:
run = false
C.rd_kafka_consumer_close_queue(c.handle.rk, c.handle.rkq)

default:
c.Poll(100)
}
for C.rd_kafka_consumer_closed(c.handle.rk) != 1 {
c.Poll(100)
}

close(doneChan)

// Destroy our queue
C.rd_kafka_queue_destroy(c.handle.rkq)
c.handle.rkq = nil
Expand Down
34 changes: 20 additions & 14 deletions kafka/consumer_test.go
Expand Up @@ -520,27 +520,33 @@ func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup)
// then AssignedPartitions happens to both consumers.
// 3) Third consumer joins, RevokedPartitions happens from the previous two
// consumers, then AssignedPartitions happens to all the three consumers.
// 4) Close the second consumer, RevokedPartitions should not happen.
// 4) Close the second consumer, revoke its assignments will happen, but it
// should not notice other consumers.
// 5) Rejoin the second consumer, rebalance should not happen to all the other
// consumers since it's not the leader, AssignedPartitions only happened
// to this consumer to assign the partitions.
// 6) Close the third consumer, revoke its assignments will happen, but it
// should not notice other consumers.
// 7) Close the rejoined consumer, revoke its assignments will happen,
// but it should not notice other consumers.
// 8) Close the first consumer, revoke its assignments will happen.
//
// The total number of AssignedPartitions for the first consumer is 3,
// and the total number of RevokedPartitions for the first consumer is 2.
// and the total number of RevokedPartitions for the first consumer is 3.
// The total number of AssignedPartitions for the second consumer is 2,
// and the total number of RevokedPartitions for the second consumer is 1.
// and the total number of RevokedPartitions for the second consumer is 2.
// The total number of AssignedPartitions for the third consumer is 1,
// and the total number of RevokedPartitions for the second consumer is 0.
// and the total number of RevokedPartitions for the third consumer is 1.
// The total number of AssignedPartitions for the rejoined consumer
// (originally second consumer) is 1,
// and the total number of RevokedPartitions for the rejoined consumer
// (originally second consumer) is 0.
// (originally second consumer) is 1.
func TestConsumerCloseForStaticMember(t *testing.T) {
if !testconfRead() {
t.Skipf("Missing testconf.json")
}
broker := testconf.Brokers
topic := testconf.Topic
topic := createTestTopic(t, "staticMembership", 3, 1)

var assignedEvents1 int32
var revokedEvents1 int32
Expand Down Expand Up @@ -655,35 +661,35 @@ func TestConsumerCloseForStaticMember(t *testing.T) {
atomic.LoadInt32(&assignedEvents1))
}

if atomic.LoadInt32(&revokedEvents1) != 2 {
t.Fatalf("2 revokedEvents are Expected to happen for the first consumer, but %d happened\n",
if atomic.LoadInt32(&revokedEvents1) != 3 {
t.Fatalf("3 revokedEvents are Expected to happen for the first consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents1))
}

if atomic.LoadInt32(&assignedEvents2) != 2 {
t.Fatalf("2 assignedEvents are Expected to happen for the second consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents2))
}
if atomic.LoadInt32(&revokedEvents2) != 1 {
t.Fatalf("1 revokedEvents is Expected to happen for the second consumer, but %d happened\n",
if atomic.LoadInt32(&revokedEvents2) != 2 {
t.Fatalf("2 revokedEvents is Expected to happen for the second consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents2))
}

if atomic.LoadInt32(&assignedEvents3) != 1 {
t.Fatalf("1 assignedEvents is Expected to happen for the third consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents3))
}
if atomic.LoadInt32(&revokedEvents3) != 0 {
t.Fatalf("0 revokedEvents is Expected to happen for the third consumer, but %d happened\n",
if atomic.LoadInt32(&revokedEvents3) != 1 {
t.Fatalf("1 revokedEvents is Expected to happen for the third consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents3))
}

if atomic.LoadInt32(&assignedEvents4) != 1 {
t.Fatalf("1 assignedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
atomic.LoadInt32(&assignedEvents4))
}
if atomic.LoadInt32(&revokedEvents4) != 0 {
t.Fatalf("0 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
if atomic.LoadInt32(&revokedEvents4) != 1 {
t.Fatalf("1 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
atomic.LoadInt32(&revokedEvents4))
}
}
Expand Down

0 comments on commit 431cab0

Please sign in to comment.