Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrong last consumed offset when using transaction #911

Open
vmaurin opened this issue Jul 27, 2023 · 2 comments
Open

Wrong last consumed offset when using transaction #911

vmaurin opened this issue Jul 27, 2023 · 2 comments

Comments

@vmaurin
Copy link
Contributor

vmaurin commented Jul 27, 2023

Describe the bug

Here what I posted on the kafka user mailing list

I am working on an exactly once stream processors in Python, using
aiokafka client library. My program stores a state in memory, that is
recovered from a changelog topic, like in kafka streams.
On each processing loop, I am consuming messages, producing messages
to an output topics and to my changelog topic, within a transaction.
When I need to restart a runner, to restore the state in memory, I
have a routine consuming the changelog topic from the beginning to the
"end" with a read_commited isolation level. Here I am struggling to
define when to stop my recovery :

  • my current (maybe) working solution is to loop over "poll" until
    poll is not returning any messages anymore
  • I tried to do more something based on the end offests, the checking
    the consumer position, but with control messages at the end of the
    partition, I am running into an issue where position is one below end
    offsets, and doesn't go further

and the response I got

Sounds like a bug in aiokafka library to me.
If the last message in a topic partition is a tx-marker, the consumer should step over it, and report the correct position after the marker.
The official KafkaConsumer (ie, the Java one), does the exact same thing.

Expected behaviour

So as mentioned by the response I got, when a consumer receive a control message, it should "step over" in term of offset consumed. To be honest, I am not 100% sure that the behavior of the java KafkaConsumer, as kafka internal tool (like kafka-consumer-groups.sh) that reports the lag, also show an offset of 1 or 2

Environment (please complete the following information):

  • aiokafka version (python -c "import aiokafka; print(aiokafka.__version__)"): 0.8.1
  • kafka-python version (python -c "import kafka; print(kafka.__version__)"): 2.0.2
  • Kafka Broker version (kafka-topics.sh --version): 2.8.1

Reproducible example

Not a reproducable example, but here my recovery code

consumer = AIOKafkaConsumer(
    bootstrap_servers=self._bootstrap_servers,
    isolation_level="read_committed",
    enable_auto_commit=False,
)
offset = -1
await consumer.start()
try:
    tp = TopicPartition(topic=self._changelog_topic, partition=state_partition.partition)
    consumer.assign([tp])
    await consumer.seek_to_beginning(tp)
    recovering = True
    while recovering:
        recovering = False
        msg_batch = await consumer.getmany(timeout_ms=1000, max_records=500)
        processing_listener.check_for_cancellation()
        for _, msgs in msg_batch.items():
            recovering = True
            await state_partition.restore(msgs)
            offset = msgs[-1].offset

and instead of while recovering when I am trying to do something like offset < end_offset (or end_offset -1, I never remember the logic here), it doesn't work when the latest messages are control ones

@vmaurin
Copy link
Contributor Author

vmaurin commented Oct 16, 2023

Could it be related to the fact of doing manual commit ? i.e no "real" message are consumed, so the consumer loop doesn't trigger and it is not committing offsets ? Any idea how a consuming loop could trigger a commit on transactional messages ?

@y4n9squared
Copy link

We're also seeing this issue. It feels like the library should support iterating over the end of transaction record. Both of the kafka-python and confluent_kafka consumer implementations allow the consumer to reach the HWM even when the last record is a control record.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants