Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected error during batch delivery #1004

Open
KPull opened this issue May 6, 2024 · 1 comment
Open

Unexpected error during batch delivery #1004

KPull opened this issue May 6, 2024 · 1 comment

Comments

@KPull
Copy link

KPull commented May 6, 2024

Describe the bug
We are getting an "Unexpected error during batch delivery" following an upgrade to Kafka 3.7.

We are using Aiven's platform for Kafka and we upgraded to Kafka version 3.7. However, as soon as the upgrade finished, we noticed that the Transactional Producers are issuing the following errors:

Traceback (most recent call last):
  File "/root/.cache/pypoetry/virtualenvs/redacted-9TtSrW0h-py3.11/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 154, in _sender_routine
    task.result()
  File "/root/.cache/pypoetry/virtualenvs/redacted-9TtSrW0h-py3.11/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 333, in _do_txn_offset_commit
    return (await handler.do(node_id))
            ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/.cache/pypoetry/virtualenvs/redacted-9TtSrW0h-py3.11/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 379, in do
    retry_backoff = self.handle_response(resp)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/.cache/pypoetry/virtualenvs/redacted-9TtSrW0h-py3.11/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 619, in handle_response
    raise error_type()
aiokafka.errors.StaleLeaderEpochCodeError: [Error 13] StaleLeaderEpochCodeError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/root/.cache/pypoetry/virtualenvs/redacted-9TtSrW0h-py3.11/lib/python3.11/site-packages/redacted/workers.py", line 148, in restartable_loop
    await self.fn()
  File "/root/.cache/pypoetry/virtualenvs/redacted-9TtSrW0h-py3.11/lib/python3.11/site-packages/redacted/transactional_processor.py", line 177, in run
    await producer.send_offsets_to_transaction({topic_partition: commit_offsets},
  File "/root/.cache/pypoetry/virtualenvs/redacted-9TtSrW0h-py3.11/lib/python3.11/site-packages/aiokafka/producer/producer.py", line 577, in send_offsets_to_transaction
    await asyncio.shield(fut)
  File "/root/.cache/pypoetry/virtualenvs/redacted-9TtSrW0h-py3.11/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 167, in _sender_routine
    raise KafkaError("Unexpected error during batch delivery")
aiokafka.errors.KafkaError: KafkaError: Unexpected error during batch delivery
2024-05-06T16:26:52+0000 | ERROR | Task exception was never retrieved
future: <Task finished name='Task-129748' coro=<Sender._send_produce_req() done, defined at /root/.cache/pypoetry/virtualenvs/redacted-9TtSrW0h-py3.11/lib/python3.11/site-packages/aiokafka/producer/sender.py:246> exception=KeyError(<aiokafka.producer.message_accumulator.MessageBatch object at 0xffff7900fd10>)>
Traceback (most recent call last):
  File "/root/.cache/pypoetry/virtualenvs/redacted-9TtSrW0h-py3.11/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 259, in _send_produce_req
    await handler.do(node_id)
  File "/root/.cache/pypoetry/virtualenvs/redacted-9TtSrW0h-py3.11/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 740, in do
    self._sender._message_accumulator.reenqueue(batch)
  File "/root/.cache/pypoetry/virtualenvs/redacted-9TtSrW0h-py3.11/lib/python3.11/site-packages/aiokafka/producer/message_accumulator.py", line 378, in reenqueue
    self._pending_batches.remove(batch)
KeyError: <aiokafka.producer.message_accumulator.MessageBatch object at 0xffff7900fd10>

The Producer seemingly still keeps going and any further attempts to send messages on the transaction end up with:

aiokafka.errors.OutOfOrderSequenceNumber: [Error 45] OutOfOrderSequenceNumber

We are using a Transactional Producer with acks set to all and with a set transactional.id.

Environment (please complete the following information):

  • aiokafka 0.10.0
  • Kafka 3.7 hosted on the Aiven Platform

Reproducible example

I have reproduced below the code we are using to issue transactional messages using the producer.

    async def run(self):
        log_info("PROCESSOR_STARTUP", processor="TransactionalProcessor", group_id=self.__group_id)
        consumer = await self.__get_consumer()

        try:
            while self.__running:
                messages = await consumer.getmany(timeout_ms=5000)

                for topic_partition, messages_in_partition in messages.items():
                    try:
                        if messages_in_partition:
                            producer = await self.__get_producer(topic_partition.topic, topic_partition.partition)

                            commit_offsets = messages_in_partition[-1].offset + 1
                            records_to_write = []

                            for record in messages_in_partition:
                                processor_result = self.__processor(
                                    Record(topic=record.topic, partition=record.partition, offset=record.offset,
                                           received_at=datetime.fromtimestamp(record.timestamp / 1000.0),
                                           key=record.key, value=record.value))
                                if inspect.isawaitable(processor_result):
                                    processor_result = await processor_result
                                records_to_write.extend(processor_result)

                            async with producer.transaction():
                                for record in records_to_write:
                                    await producer.send(record.topic, value=record.value, key=record.key)
                                await producer.send_offsets_to_transaction({topic_partition: commit_offsets},
                                                                           self.__group_id)
                    except ProducerFenced:
                        # This occurs when someone else takes over the processing of this topic partition. We simply
                        # close the producer, if any, and continue with the next partition
                        await self.__clear_producer(topic_partition.topic, topic_partition.partition)
                        log_info("PRODUCER_FENCED", topic=topic_partition.topic, partition=topic_partition.partition,
                                 group_id=self.__group_id)
                    except OutOfOrderSequenceNumber:
                        # This occurs if a fatal error occurred earlier and therefore the producer does not know the
                        # sequence number for its next message. Discard the producer, if any, and continue with the
                        # next partition.
                        await self.__clear_producer(topic_partition.topic, topic_partition.partition)
                        log_info("PRODUCER_DESYNC", topic=topic_partition.topic, partition=topic_partition.partition,
                                 group_id=self.__group_id)
        finally:
            await self.__clear_clients()
            log_info("PROCESSOR_CLEANUP", processor="TransactionalProcessor", group_id=self.__group_id)
@KPull
Copy link
Author

KPull commented May 8, 2024

After a couple of days trying to get to the bottom of this, we have resolved the issue by changing the order of sending the offsets and sending the records within the transaction, like so:

async with producer.transaction():
  await producer.send_offsets_to_transaction({topic_partition: commit_offsets}, self.__group_id)
  for record in records_to_write:
      await producer.send(record.topic, value=record.value, key=record.key)

Given that this has solved the problem, it means that there is some sort of race condition within the library that is only triggered when using Kafka 3.7.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant