Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seek of the Commit not done in the transaction #920

Open
jainal09 opened this issue Aug 28, 2023 · 0 comments
Open

Seek of the Commit not done in the transaction #920

jainal09 opened this issue Aug 28, 2023 · 0 comments

Comments

@jainal09
Copy link

jainal09 commented Aug 28, 2023

Describe the bug
I have a code (added below). It first fetches a message and sends it to a new topic using the producer transaction and sends the offsets using the producer

Expected behaviour
The message should be committed and new message needs to be picked up in the new iteration.

Environment (please complete the following information):

  • aiokafka version (python -c "import aiokafka; print(aiokafka.__version__)"): 0.8.1
  • kafka-python version (python -c "import kafka; print(kafka.__version__)"): 2.0.2
  • Kafka Broker version (kafka-topics.sh --version): 3.3.1 (Commit:e23c59d00e687ff5)

Reproducible example

import asyncio

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer, TopicPartition
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.errors import IllegalStateError
from kafka.errors import KafkaConnectionError

from fault_tolerant_aio_kafka.consumers.handlers import RebalancedListener
from fault_tolerant_aio_kafka.logs import logger  # avoid circular imports


async def produce_message(
    topic,
    message_key,
    message_val,
    partition,
    consumer_obj: AIOKafkaConsumer,
    consumer_record_obj,
    consumer_group_id,
    producer_topic,
    logger,
    producer_obj: AIOKafkaProducer,
):
    logger.info(f"Producing message to topic: {producer_topic}")
    print(f"Producing message to topic: {producer_topic}")
    print(f"message key: {message_key}")
    print(f"message value: {message_val}")
    print(f"partition: {partition}")
    print(f"consumer_obj: {consumer_obj}")
    print(f"consumer_record_obj: {consumer_record_obj}")
    print(f"consumer_group_id: {consumer_group_id}")
    print(f"producer_topic: {producer_topic}")
    print(f"topic: {topic}")
    
    async with producer_obj.transaction():
        tp = TopicPartition(topic, partition)
        logger.info(f"Message key: {message_key}")
        logger.info(f"Message value: {message_val}")
        await producer_obj.send_and_wait(
            topic=producer_topic,
            value=message_val,
            key=message_key,
        )
        logger.info("message sent")
        await producer_obj.send_offsets_to_transaction(
            {tp: consumer_record_obj.offset + 1}, consumer_group_id
        )
        logger.info("offset sent")
        # await consumer_obj.seek_to_committed(tp, consumer_record_obj.offset + 1)
        logger.info("seek done")
    return consumer_obj


async def consume_messages() -> None:

    consumer = AIOKafkaConsumer(
        "my_topic",
        bootstrap_servers="localhost:9092",
        group_id = "my_group_id",
        max_poll_records = 1,
        enable_auto_commit = False,
        partition_assignment_strategy = [RangePartitionAssignor],
        metadata_max_age_ms = 30000
    )
    producer_obj = AIOKafkaProducer(
        bootstrap_servers="localhost:9092",
        transactional_id="my_transactional_id"
    )
    consumer.max_poll_records = 1
    listener = RebalancedListener(consumer)
    print("consumer started")
    consumer.subscribe(topics=["my_topic"], listener=listener)
    try:
        await consumer.start()
        logger.info("Consumer started")
    except KafkaConnectionError as e:
        logger.error(e)
        logger.error(
            f"Error while connecting to Kafka: localhost:9092. Is it running? {e}"
        )
        await consumer.stop()
        # exit the consumer
        return
    while True:
        try:
            message = await asyncio.wait_for(consumer.getone(), timeout=1.0)
            logger.info(f"Received message: {message.value}")
            logger.info(f"Received message from Partition: {message.partition}")
            await producer_obj.start()
            consumer = await produce_message(
                topic="my_topic",
                message_key=message.key,
                message_val=message.value,
                partition=message.partition,
                consumer_record_obj=message,
                consumer_obj=consumer,
                producer_topic="new_topic1",
                logger=logger,
                producer_obj=producer_obj,
                consumer_group_id="my_group_id",
            )
            logger.info("message produced")

        except IllegalStateError:
            logger.info("Consumer stopped probably due to a partition mismanagement.")
            break
        except asyncio.TimeoutError:
            # no message fetched within 1 second
            continue
    await producer_obj.flush()
    await producer_obj.stop()
    await consumer.stop()


async def start():
    loop = asyncio.get_event_loop()
    tasks = [loop.create_task(consume_messages()) for _ in range(1)]
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(start())

The log of the script

2023-08-28 10:40:05,384 - [INFO] - [fault_tolerant_aio_kafka.logs] - logs.py:10 - process id: 88963
2023-08-28 10:40:05,389 - [INFO] - [aiokafka.consumer.subscription_state] - subscription_state.py:111 - Updating subscribed topics to: frozenset({'my_topic'})
consumer started
2023-08-28 10:40:05,390 - [INFO] - [aiokafka.consumer.subscription_state] - subscription_state.py:111 - Updating subscribed topics to: frozenset({'my_topic'})
2023-08-28 10:40:05,390 - [INFO] - [aiokafka.consumer.consumer] - consumer.py:1090 - Subscribed to topic(s): {'my_topic'}
2023-08-28 10:40:05,441 - [INFO] - [aiokafka.consumer.group_coordinator] - group_coordinator.py:550 - Discovered coordinator 1 for group my_group_id
2023-08-28 10:40:05,442 - [INFO] - [aiokafka.consumer.group_coordinator] - group_coordinator.py:384 - Revoking previously assigned partitions set() for group my_group_id
2023-08-28 10:40:05,442 - [INFO] - [aiokafka.consumer.group_coordinator] - group_coordinator.py:1196 - (Re-)joining group my_group_id
2023-08-28 10:40:08,453 - [INFO] - [aiokafka.consumer.group_coordinator] - group_coordinator.py:1251 - Joined group 'my_group_id' (generation 33) with member_id aiokafka-0.8.1-e97145da-1c17-486f-a074-fb9ccb5ac07b
2023-08-28 10:40:08,453 - [INFO] - [aiokafka.consumer.group_coordinator] - group_coordinator.py:1255 - Elected group leader -- performing partition assignments using range
2023-08-28 10:40:08,461 - [INFO] - [aiokafka.consumer.group_coordinator] - group_coordinator.py:1372 - Successfully synced group my_group_id with generation 33
2023-08-28 10:40:08,461 - [INFO] - [aiokafka.consumer.group_coordinator] - group_coordinator.py:464 - Setting newly assigned partitions {TopicPartition(topic='my_topic', partition=0)} for group my_group_id
                    ╭─────────────────────────────────────────────╮                    
                    │ Assigned partitions:  0, On Topic: my_topic │                    
                    ╰─────────────────────────────────────────────╯                    
2023-08-28 10:40:08,466 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:75 - Consumer started
2023-08-28 10:40:08,480 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:87 - Received message: b'djsc'
2023-08-28 10:40:08,482 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:88 - Received message from Partition: 0
2023-08-28 10:40:08,530 - [INFO] - [aiokafka.producer.sender] - sender.py:232 - Discovered coordinator 1 for transactional id my_transactional_id
2023-08-28 10:40:08,534 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:24 - Producing message to topic: new_topic1
Producing message to topic: new_topic1
message key: None
message value: b'djsc'
partition: 0
consumer_obj: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f5e6788aa10>
consumer_record_obj: ConsumerRecord(topic='my_topic', partition=0, offset=2007, timestamp=1693230775292, timestamp_type=0, key=None, value=b'djsc', checksum=None, serialized_key_size=-1, serialized_value_size=4, headers=())
consumer_group_id: my_group_id
producer_topic: new_topic1
topic: my_topic
2023-08-28 10:40:08,534 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:37 - Message key: None
2023-08-28 10:40:08,534 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:38 - Message value: b'djsc'
2023-08-28 10:40:08,545 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:44 - message sent
2023-08-28 10:40:08,555 - [INFO] - [aiokafka.producer.sender] - sender.py:226 - Discovered coordinator 1 for group id my_group_id
2023-08-28 10:40:08,562 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:48 - offset sent
2023-08-28 10:40:08,563 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:50 - seek done
2023-08-28 10:40:08,576 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:102 - message produced
2023-08-28 10:40:08,577 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:87 - Received message: b'yjoe'
2023-08-28 10:40:08,578 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:88 - Received message from Partition: 0
2023-08-28 10:40:08,610 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:24 - Producing message to topic: new_topic1
Producing message to topic: new_topic1
message key: None
message value: b'yjoe'
partition: 0
consumer_obj: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f5e6788aa10>
consumer_record_obj: ConsumerRecord(topic='my_topic', partition=0, offset=2008, timestamp=1693230775295, timestamp_type=0, key=None, value=b'yjoe', checksum=None, serialized_key_size=-1, serialized_value_size=4, headers=())
consumer_group_id: my_group_id
producer_topic: new_topic1
topic: my_topic
2023-08-28 10:40:08,613 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:37 - Message key: None
2023-08-28 10:40:08,613 - [INFO] - [fault_tolerant_aio_kafka.logs] - new_debug.py:38 - Message value: b'yjoe'
2023-08-28 10:40:08,621 - [ERROR] - [aiokafka.producer.sender] - sender.py:167 - Unexpected error in sender routine
Traceback (most recent call last):
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/.venv/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 155, in _sender_routine
    task.result()
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/.venv/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 309, in _do_add_partitions_to_txn
    return (await handler.do(node_id))
            ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/.venv/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 380, in do
    retry_backoff = self.handle_response(resp)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/.venv/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 462, in handle_response
    txn_manager.partition_added(tp)
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/.venv/lib/python3.11/site-packages/aiokafka/producer/transaction_manager.py", line 204, in partition_added
    self._pending_txn_partitions.remove(tp)
KeyError: TopicPartition(topic='new_topic1', partition=0)
Traceback (most recent call last):
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/POC/new_debug.py", line 122, in <module>
    asyncio.run(start())
  File "/home/linuxbrew/.linuxbrew/opt/python@3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/home/linuxbrew/.linuxbrew/opt/python@3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/linuxbrew/.linuxbrew/opt/python@3.11/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/POC/new_debug.py", line 118, in start
    await asyncio.gather(*tasks)
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/POC/new_debug.py", line 90, in consume_messages
    consumer = await produce_message(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/POC/new_debug.py", line 39, in produce_message
    await producer_obj.send_and_wait(
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/.venv/lib/python3.11/site-packages/aiokafka/producer/producer.py", line 479, in send_and_wait
    return (await future)
            ^^^^^^^^^^^^
kafka.errors.KafkaError: KafkaError: Unexpected error during batch delivery
2023-08-28 10:40:08,674 - [ERROR] - [asyncio] - base_events.py:1771 - Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f5e6788aa10>
2023-08-28 10:40:08,674 - [ERROR] - [asyncio] - base_events.py:1771 - Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7f5e67845250>
2023-08-28 10:40:08,674 - [ERROR] - [asyncio] - base_events.py:1771 - Future exception was never retrieved
future: <Future finished exception=KafkaError('Unexpected error during batch delivery')>
Traceback (most recent call last):
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/.venv/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 155, in _sender_routine
    task.result()
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/.venv/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 309, in _do_add_partitions_to_txn
    return (await handler.do(node_id))
            ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/.venv/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 380, in do
    retry_backoff = self.handle_response(resp)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/.venv/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 462, in handle_response
    txn_manager.partition_added(tp)
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/.venv/lib/python3.11/site-packages/aiokafka/producer/transaction_manager.py", line 204, in partition_added
    self._pending_txn_partitions.remove(tp)
KeyError: TopicPartition(topic='new_topic1', partition=0)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jainal09/aio-kafka-producer-consumer-wrapper-lib/.venv/lib/python3.11/site-packages/aiokafka/producer/sender.py", line 168, in _sender_routine
    raise KafkaError("Unexpected error during batch delivery")
kafka.errors.KafkaError: KafkaError: Unexpected error during batch delivery
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant