New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix rdkafka consumer hanging when disconnecting #13144
Fix rdkafka consumer hanging when disconnecting #13144
Conversation
...this.consumerOptions.additionalOptions, | ||
...this.sslOptions, | ||
}; | ||
|
||
const consumer: kafkaTypes.KafkaConsumer = this.consumer = | ||
consumer = this.consumer = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the build is failing because it says consumer.subscribe()
in line 127 below and consumer.consume()
could possibly be undefined
src/rdkafkaConsumer.ts:127:4 - error TS2532: Object is possibly 'undefined'.
} | ||
|
||
// eslint-disable-next-line prefer-const | ||
let consumer: kafkaTypes.KafkaConsumer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't reassign this later, why not make it const instead of disabling the eslint rule?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it needs to be declared ahead of time for use by the "rebalance_cb" property in the kafka consumer constructor options.
Calling
consumer.close()
was hanging indefinitely when theoptimizedRebalance
config was enabled.The issue is the same as confluentinc/confluent-kafka-go#767
The
rebalance
method needs to callconsumer.unassign()
when closing. It was not doing this becausethis.consumer
was being set toundefined
before disconnecting.