Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix#1065 #1067

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Conversation

swapnilgawade16
Copy link

Closes #1065

Few details on the fix:

KafkaConsumerManager's Consumer Expiration Thread acquires a lock on (KafkaConsumerManager.this instance) and calls expired method of KafkaConsumerState.java. The Consumer Expiration Thread sometimes takes time to get the lock on synchronized expired because there can be a consumer consuming records that has a lock on the KafkaConsumerState instance which may take longer to release depending on the time taken in the IO to fetch the records from kafka broker. The situation can get worse with more number of active consumers fetching records.

The two locks for expired and updateExpiration are not required.

Thread dump snippets

"qtp1551629761-29" prio=5 Id=29 BLOCKED on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 owned by "Consumer Expiration Thread" Id=47
 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.getConsumerInstance(KafkaConsumerManager.java:669)
 -  blocked on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.getConsumerInstance(KafkaConsumerManager.java:679)

"qtp1551629761-35" prio=5 Id=35 BLOCKED on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 owned by "Consumer Expiration Thread" Id=47
 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.createConsumer(KafkaConsumerManager.java:180)
 -  blocked on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 

"Consumer Expiration Thread" daemon prio=5 Id=47 BLOCKED on io.confluent.kafkarest.v2.JsonKafkaConsumerState@4d2bbeda owned by "pool-4-thread-13" Id=153
 at app//io.confluent.kafkarest.v2.KafkaConsumerState.expired(KafkaConsumerState.java:345)
 -  blocked on io.confluent.kafkarest.v2.JsonKafkaConsumerState@4d2bbeda at app//io.confluent.kafkarest.v2.KafkaConsumerManager$ExpirationThread.run(KafkaConsumerManager.java:761)
 -  locked io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4

"pool-4-thread-13" prio=5 Id=153 RUNNABLE (in native)
 at java.base@11.0.16.1/sun.nio.ch.EPoll.wait(Native Method)
 at java.base@11.0.16.1/sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:120)
 at java.base@11.0.16.1/sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:124)
 -  locked sun.nio.ch.Util$2@1458190c
 -  locked sun.nio.ch.EPollSelectorImpl@3ebca1e6
 at java.base@11.0.16.1/sun.nio.ch.SelectorImpl.select(SelectorImpl.java:136)
 at app//org.apache.kafka.common.network.Selector.select(Selector.java:869)
 at app//org.apache.kafka.common.network.Selector.poll(Selector.java:465)
 at app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
 at app//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
 

fter few days of experiments found the root cause.

Closes confluentinc#1065

Few details on the fix:
KafkaConsumerManager's "Consumer Expiration Thread" acquires a lock on (KafkaConsumerManager.this instance) and calls expired method of KafkaConsumerState.java. The Consumer Expiration Thread sometimes takes time to get the lock on synchronized expired because there can be a consumer consuming records that has a lock on the KafkaConsumerState instance which may take longer to release depending on the time taken in the IO to fetch the records from kafka broker. The situation can get worse with more number of active consumers fetching records.

The two locks for expired and updateExpiration are not required.

Thread dump snippets
"qtp1551629761-29" prio=5 Id=29 BLOCKED on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 owned by "Consumer Expiration Thread" Id=47
 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.getConsumerInstance(KafkaConsumerManager.java:669)
 -  blocked on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.getConsumerInstance(KafkaConsumerManager.java:679)

"qtp1551629761-35" prio=5 Id=35 BLOCKED on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 owned by "Consumer Expiration Thread" Id=47
 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.createConsumer(KafkaConsumerManager.java:180)
 -  blocked on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 

"Consumer Expiration Thread" daemon prio=5 Id=47 BLOCKED on io.confluent.kafkarest.v2.JsonKafkaConsumerState@4d2bbeda owned by "pool-4-thread-13" Id=153
 at app//io.confluent.kafkarest.v2.KafkaConsumerState.expired(KafkaConsumerState.java:345)
 -  blocked on io.confluent.kafkarest.v2.JsonKafkaConsumerState@4d2bbeda at app//io.confluent.kafkarest.v2.KafkaConsumerManager$ExpirationThread.run(KafkaConsumerManager.java:761)
 -  locked io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4

"pool-4-thread-13" prio=5 Id=153 RUNNABLE (in native)
 at java.base@11.0.16.1/sun.nio.ch.EPoll.wait(Native Method)
 at java.base@11.0.16.1/sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:120)
 at java.base@11.0.16.1/sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:124)
 -  locked sun.nio.ch.Util$2@1458190c
 -  locked sun.nio.ch.EPollSelectorImpl@3ebca1e6
 at java.base@11.0.16.1/sun.nio.ch.SelectorImpl.select(SelectorImpl.java:136)
 at app//org.apache.kafka.common.network.Selector.select(Selector.java:869)
 at app//org.apache.kafka.common.network.Selector.poll(Selector.java:465)
 at app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
 at app//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
@swapnilgawade16 swapnilgawade16 requested a review from a team as a code owner September 23, 2022 11:23
@ehumber
Copy link
Member

ehumber commented Sep 26, 2022

Hi @swapnilgawade16

I've not looked at the code changes yet, but to get me started on the details of this a bit quicker, could you please provide a description of why the two locks are no longer needed?

They were originally added as part of 1695f4e but unfortunately that doesn't describe the details of why they are needed either.

Thank you!

@swapnilgawade16
Copy link
Author

Hi @swapnilgawade16

I've not looked at the code changes yet, but to get me started on the details of this a bit quicker, could you please provide a description of why the two locks are no longer needed?

They were originally added as part of 1695f4e but unfortunately that doesn't describe the details of why they are needed either.

Thank you!

There was a ReentrantLock used before as you can see in KafkaConsumerState.java. This I guess it was replaced with making all the methods synchronized. However if you check specifically for expired and updateExpiration methods in latest code they are synchronized as well as use a expiration object lock. These two methods basically do not update with any of the core consumer state, they just are there to update and get the expiration state. Hence IMO there is no need for these two methods to be synchronized. Keeping them synchronized does cause a lot of problem in high traffic as I have mentioned in the description of the issue. Basically there is no need to block expiry check while any of the consumer operations are active such as seekToBeginning, assign, createConsumer, etc.

@swapnilgawade16
Copy link
Author

@ehumber Any idea by when this change will be merged ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Consumer creation API takes longer (results in timeout)
2 participants