Skip to content

Commit

Permalink
Update the consumer close function with the latest rd_kafka_consumer_…
Browse files Browse the repository at this point in the history
…close_queue API (#800)

authored-by: Jing Liu <jl5311@nyu.edu>
  • Loading branch information
emasab committed Jun 21, 2022
1 parent c1ad021 commit e34bc9a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 35 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -12,7 +12,8 @@ This is a feature release:

### Fixes

* Fix Rebalance events behavior for static membership (@jliunyu, #757).
* Fix Rebalance events behavior for static membership (@jliunyu, #757,
#798).
* Fix consumer close taking 10 seconds when there's no rebalance
needed (@jliunyu, #757).

Expand Down
23 changes: 3 additions & 20 deletions kafka/consumer.go
Expand Up @@ -424,29 +424,12 @@ func (c *Consumer) Close() (err error) {
close(c.events)
}

doneChan := make(chan bool)

go func() {
C.rd_kafka_consumer_close(c.handle.rk)
// wake up Poll()
C.rd_kafka_queue_yield(c.handle.rkq)
doneChan <- true
}()

// wait for consumer_close() to finish while serving c.Poll() for rebalance callbacks/events
run := true
for run {
select {
case <-doneChan:
run = false
C.rd_kafka_consumer_close_queue(c.handle.rk, c.handle.rkq)

default:
c.Poll(100)
}
for C.rd_kafka_consumer_closed(c.handle.rk) != 1 {
c.Poll(100)
}

close(doneChan)

// Destroy our queue
C.rd_kafka_queue_destroy(c.handle.rkq)
c.handle.rkq = nil
Expand Down
34 changes: 20 additions & 14 deletions kafka/consumer_test.go
Expand Up @@ -520,27 +520,33 @@ func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup)
// then AssignedPartitions happens to both consumers.
// 3) Third consumer joins, RevokedPartitions happens from the previous two
// consumers, then AssignedPartitions happens to all the three consumers.
// 4) Close the second consumer, RevokedPartitions should not happen.
// 4) Close the second consumer, revoke its assignments will happen, but it
// should not notice other consumers.
// 5) Rejoin the second consumer, rebalance should not happen to all the other
// consumers since it's not the leader, AssignedPartitions only happened
// to this consumer to assign the partitions.
// 6) Close the third consumer, revoke its assignments will happen, but it
// should not notice other consumers.
// 7) Close the rejoined consumer, revoke its assignments will happen,
// but it should not notice other consumers.
// 8) Close the first consumer, revoke its assignments will happen.
//
// The total number of AssignedPartitions for the first consumer is 3,
// and the total number of RevokedPartitions for the first consumer is 2.
// and the total number of RevokedPartitions for the first consumer is 3.
// The total number of AssignedPartitions for the second consumer is 2,
// and the total number of RevokedPartitions for the second consumer is 1.
// and the total number of RevokedPartitions for the second consumer is 2.
// The total number of AssignedPartitions for the third consumer is 1,
// and the total number of RevokedPartitions for the second consumer is 0.
// and the total number of RevokedPartitions for the third consumer is 1.
// The total number of AssignedPartitions for the rejoined consumer
// (originally second consumer) is 1,
// and the total number of RevokedPartitions for the rejoined consumer
// (originally second consumer) is 0.
// (originally second consumer) is 1.
func TestConsumerCloseForStaticMember(t *testing.T) {
if !testconfRead() {
t.Skipf("Missing testconf.json")
}
broker := testconf.Brokers
topic := testconf.Topic
topic := createTestTopic(t, "staticMembership", 3, 1)

var assignedEvents1 int32
var revokedEvents1 int32
Expand Down Expand Up @@ -655,35 +661,35 @@ func TestConsumerCloseForStaticMember(t *testing.T) {
atomic.LoadInt32(&assignedEvents1))
}

if atomic.LoadInt32(&revokedEvents1) != 2 {
t.Fatalf("2 revokedEvents are Expected to happen for the first consumer, but %d happened\n",
if atomic.LoadInt32(&revokedEvents1) != 3 {
t.Fatalf("3 revokedEvents are Expected to happen for the first consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents1))
}

if atomic.LoadInt32(&assignedEvents2) != 2 {
t.Fatalf("2 assignedEvents are Expected to happen for the second consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents2))
}
if atomic.LoadInt32(&revokedEvents2) != 1 {
t.Fatalf("1 revokedEvents is Expected to happen for the second consumer, but %d happened\n",
if atomic.LoadInt32(&revokedEvents2) != 2 {
t.Fatalf("2 revokedEvents is Expected to happen for the second consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents2))
}

if atomic.LoadInt32(&assignedEvents3) != 1 {
t.Fatalf("1 assignedEvents is Expected to happen for the third consumer, but %d happened\n",
atomic.LoadInt32(&assignedEvents3))
}
if atomic.LoadInt32(&revokedEvents3) != 0 {
t.Fatalf("0 revokedEvents is Expected to happen for the third consumer, but %d happened\n",
if atomic.LoadInt32(&revokedEvents3) != 1 {
t.Fatalf("1 revokedEvents is Expected to happen for the third consumer, but %d happened\n",
atomic.LoadInt32(&revokedEvents3))
}

if atomic.LoadInt32(&assignedEvents4) != 1 {
t.Fatalf("1 assignedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
atomic.LoadInt32(&assignedEvents4))
}
if atomic.LoadInt32(&revokedEvents4) != 0 {
t.Fatalf("0 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
if atomic.LoadInt32(&revokedEvents4) != 1 {
t.Fatalf("1 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n",
atomic.LoadInt32(&revokedEvents4))
}
}
Expand Down

0 comments on commit e34bc9a

Please sign in to comment.