Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NPE in ConsumerStateBatch.getCurrentRetryCount #1004

Open
mpkorstanje opened this issue Apr 2, 2024 · 2 comments
Open

NPE in ConsumerStateBatch.getCurrentRetryCount #1004

mpkorstanje opened this issue Apr 2, 2024 · 2 comments

Comments

@mpkorstanje
Copy link

mpkorstanje commented Apr 2, 2024

Expected Behavior

Given a Kafka listener with retry enabled, when an exception is thrown from the listener, it should be retried.

@KafkaListener(
    groupId = "example_failure-test",
    batch = true,
    errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCount = MAX_VALUE, retryDelay = "5s"),
    offsetReset = EARLIEST,
    offsetStrategy = OffsetStrategy.DISABLED
)
class BulkItemConsumer {


  @Topic("t.failure-test.test")
  void consume(List<ConsumerRecord<String, BulkItemOnIndex>> records, Consumer<String, BulkItemOnIndex> kafkaConsumer) throws IOException, ExecutionException {
     // An exception is thrown here to trigger the retry
  }

}

Actual Behaviour

2024-04-02 12:10:45.101Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Successfully joined group with generation Generation{generationId=1, memberId='failure-test-example-consumer-acb506c5-c0b1-49a1-8d80-e7eb491a69cc', protocol='range'}
2024-04-02 12:10:45.101Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Finished assignment for group at generation 1: {failure-test-example-consumer-acb506c5-c0b1-49a1-8d80-e7eb491a69cc=Assignment(partitions=[t.failure-test.test-0])}
2024-04-02 12:10:45.104Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Successfully synced group in generation Generation{generationId=1, memberId='failure-test-example-consumer-acb506c5-c0b1-49a1-8d80-e7eb491a69cc', protocol='range'}
2024-04-02 12:10:45.105Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Notifying assignor about the new Assignment(partitions=[t.failure-test.test-0])
2024-04-02 12:10:45.105Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Adding newly assigned partitions: t.failure-test.test-0
2024-04-02 12:10:45.106Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Found no committed offset for partition t.failure-test.test-0
2024-04-02 12:10:45.107Z | INFO  | pool-7-thread-1      | o.a.k.c.consumer.internals.SubscriptionState  | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Resetting offset for partition t.failure-test.test-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1001 rack: null)], epoch=0}}.

// Some log lines indicating the consumer was called and threw an exception

2024-04-02 12:10:45.123Z | ERROR | pool-7-thread-1      | i.m.c.k.e.KafkaListenerExceptionHandler       | Kafka consumer [com.example.ExampleConsumer@3d39e8d0] produced error: Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because the return value of "java.util.Map.get(Object)" is null
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because the return value of "java.util.Map.get(Object)" is null
	at io.micronaut.configuration.kafka.processor.ConsumerStateBatch.lambda$getCurrentRetryCount$3(ConsumerStateBatch.java:173)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1707)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.IntPipeline.reduce(IntPipeline.java:520)
	at java.base/java.util.stream.IntPipeline.max(IntPipeline.java:483)
	at io.micronaut.configuration.kafka.processor.ConsumerStateBatch.getCurrentRetryCount(ConsumerStateBatch.java:175)
	at io.micronaut.configuration.kafka.processor.ConsumerStateBatch.resolveWithErrorStrategy(ConsumerStateBatch.java:149)
	at io.micronaut.configuration.kafka.processor.ConsumerStateBatch.processRecords(ConsumerStateBatch.java:93)
	at io.micronaut.configuration.kafka.processor.ConsumerState.pollAndProcessRecords(ConsumerState.java:212)
	at io.micronaut.configuration.kafka.processor.ConsumerState.refreshAssignmentsPollAndProcessRecords(ConsumerState.java:164)
	at io.micronaut.configuration.kafka.processor.ConsumerState.threadPollLoop(ConsumerState.java:154)
	at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141)
	at io.micrometer.core.instrument.Timer.lambda$wrap$0(Timer.java:193)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
2024-04-02 12:10:45.124Z | INFO  | pool-7-thread-1      | i.m.c.kafka.processor.KafkaConsumerProcessor  | Consumer [failure-test-example-consumer] assignments changed: [] -> [t.failure-test.test-0]

// And at this point the application shuts down

2024-04-02 12:11:15.217Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Revoke previously assigned partitions t.failure-test.test-0
2024-04-02 12:11:15.218Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Member failure-test-example-consumer-acb506c5-c0b1-49a1-8d80-e7eb491a69cc sending LeaveGroup request to coordinator localhost:9092 (id: 2147482646 rack: null) due to the consumer is being closed
2024-04-02 12:11:15.218Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Resetting generation and member id due to: consumer pro-actively leaving the group
2024-04-02 12:11:15.218Z | INFO  | pool-7-thread-1      | o.a.k.c.c.internals.ConsumerCoordinator       | [Consumer clientId=failure-test-example-consumer, groupId=example_failure-test] Request joining group due to: consumer pro-actively leaving the group

Steps To Reproduce

I can only reproduce this problem on a rather slow CI machine. I've tried to reproduce this locally, with no success. If anything I wouldn't expect an NPE and suspect that this may be due to a race condition. At a glance it looks like the partitions and currentOffsets are collected from Kafka at different times and may be subject to change.

Environment Information

Ci uses the following docker images:

  • maven:3.8.6-eclipse-temurin-17
  • confluentinc/cp-zookeeper:7.3.1
  • confluentinc/cp-kafka:7.3.1

Example Application

No response

Version

  • micronaut-platform:4.3.4
  • micronaut-kafka:5.3.0
@jeremyg484
Copy link
Contributor

jeremyg484 commented Apr 4, 2024

It is tough to provide a known exact fix for this without a definite reproducer (and I have not successfully been able to reproduce it in our test suite so far either), but I do think we should at least make some changes in ConsumerStateBatch to code a little more defensively. currentOffsets in particular is marked as @Nullable in most of the methods where it is passed around, and yet we don't seem to be doing anything to verify that is it not null when we should be.

My best guess is that it is a timing problem in the "slow" CI machine and the consumer hasn't yet been assigned a partition by Kafka (or it could be in the midst of being reassigned), and we need to account for that state.

@mpkorstanje
Copy link
Author

My best guess is that it is a timing problem in the "slow" CI machine and the consumer hasn't yet been assigned a partition by Kafka (or it could be in the midst of being reassigned), and we need to account for that state.

I would concur with that guess. The current workaround is to run a passing test case before testing the failure, giving Kafka the time to assign partitions and finish balancing.

Defensive programming would make sense. I do think the @NonNull annotation are a red herring though.

private int getCurrentRetryCount(Set<TopicPartition> partitions,
@Nullable Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
return partitions.stream()
.map(tp -> getPartitionRetryState(tp, currentOffsets.get(tp).offset()))
.mapToInt(x -> x.currentRetryCount)
.max().orElse(info.retryCount);
}

For this specific exception, it is not currentOffsets that is null, but rather currentOffsets.get(tp) returns null.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Progress
Development

No branches or pull requests

2 participants