Fix partitions initial offsets in Kafka connector #25769
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Hello,
I've discovered a bug in Kafka initial offsets functionality that was introduced some time ago. Steps to reproduce this bug:
*To be precise, it's not necessary to provide initial offsets pointing specifically to the end of each partition, because in theory any initial offset could be used and the wrong behavior would occur (i.e. after the job is resumed, the processor will add +1 to the initial offsets), but in reality using any values other than the "last offset" of a given partition will (in most cases) cause the Jet job to consume some records before we have a chance to suspend the job.
I made a fix for this problem by decrementing the value that is put into the
StreamKafkaP
's internaloffsets
map at the time whenseekToInitialOffsets()
is being executed.Checklist:
Team:
,Type:
,Source:
,Module:
) and Milestone setAdd to Release Notes
orNot Release Notes content
set@Nonnull/@Nullable
annotations@since
tags in Javadoc