Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JMX Metric records-lag incorrect when topic is produced to external to consuming thread #933

Open
retinaburn opened this issue Dec 4, 2023 · 0 comments

Comments

@retinaburn
Copy link

Expected Behavior

When viewing the JMX Metric records-lag for a given topic and partition the records-lag should be somewhat comparable to the lag reported in Red Panda/Kafka while producing.

If the production of messages is faster than the consumption of messages the records-lag should increase over time.

Actual Behaviour

If producing to kafka is done via consumption thread then its reported correctly, with the records-lag increasing over time as documented in #634

If producing to kafka is done via a REST Controller in the project, then the records-lag in JMX does not increase, but the consumer lag does increase when viewed in Red Panda/Kafka.

What's even stranger, is if you pre-load the topic with messages (say 10k) and start the app, then it records the records-lag correctly.

Steps To Reproduce

  1. Navigate to https://micronaut.io/launch
  2. Select kafka, micrometer-jmx, micrometer-prometheus, with name: consumer, java version: 21
  3. Generate Project
  4. Extract demo project
  5. Define Consumer:
@KafkaListener(threads = 2)
public class Consumer {
    private Logger log = LoggerFactory.getLogger(Consumer.class);

    @Inject
    Producer producer;

    @Topic("${topic.chat}")
    public void listener(ConsumerRecord<String, String> record) throws InterruptedException{
        log.debug("{}-{}:{} {}={}", record.topic(), record.partition(), record.offset(), record.key(), record.value());
        //Thread.sleep(1000);
        // producer.sendChat("1", "1");
        // producer.sendChat("2", "2");
    }
    
}
  1. Define Producer Interface:
@KafkaClient(id="producer")
public interface Producer {
    @Topic("${topic.chat}")
    void sendChat(@KafkaKey String key, String value);
}
  1. Define Controller:
@Controller("/produce")
public class BatchController {
    
    @Inject
    Producer producer;
    
    @Post("/batch/{numOfThreads}/{numMessages}")
    public boolean batchProduce(@PathVariable Integer numOfThreads, @PathVariable Integer numMessages){
        ExecutorService executor = Executors.newFixedThreadPool(numOfThreads);

        for (int i=0; i<numOfThreads; i++){
            ThreadProducer threadProducer = new ThreadProducer(i, producer, numMessages);
            executor.submit(threadProducer);
        }
        
        return true;
    }

    class ThreadProducer implements Runnable {
        int numOfMessages;
        String threadId;
        Producer producer;
        ThreadProducer(int threadId, Producer producer, int numOfMessages){
            this.threadId = "" + threadId;
            this.producer = producer;
            this.numOfMessages = numOfMessages;
        }

        @Override
        public void run() {
                for(int j=0; j<numOfMessages; j++){
                    producer.sendChat(threadId, ""+j);
                }   
        }
        
    }
}
  1. Execute Application
  2. Confirm Lag for consumer is 0 in JMX and Red Panda/Kafka
  3. curl -X POST http://localhost:8082/produce/batch/10/100000
  4. Confirm Lag is increasing in Red Panda/Kafka for consumer
  5. Confirm jmx kafka.consumer -> consumer-fetch-manager-metrics -> consumer-consumer-11 -> chat-room -> 0 -> Attributes -> records-lag is not increasing

Environment Information

  • Operating System: Windows 11
  • JDK: Adoptium Temurin 21

Example Application

https://github.com/retinaburn/micronaut-kafka-investigation

Version

4.2.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant