Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] How to get old messages from topics? #998

Open
solidguy7 opened this issue Apr 13, 2024 · 4 comments
Open

[QUESTION] How to get old messages from topics? #998

solidguy7 opened this issue Apr 13, 2024 · 4 comments
Labels

Comments

@solidguy7
Copy link

solidguy7 commented Apr 13, 2024

Of course, I can get the messages with consumer in real time

async def test():
    consumer = AIOKafkaConsumer("test", bootstrap_servers="localhost:9092")
    await consumer.start()
    try:
        async for message in consumer:
            return message.value.decode()
    finally:
        await consumer.stop()

But what if I wanna get my old messages or find specific one from this topic?

@ods
Copy link
Collaborator

ods commented Apr 14, 2024

You can start consuming without group from the beginning (see seek_to_beginning method) or any other offset (e.g. see offsets_for_times and seek methods) in the past.

@solidguy7
Copy link
Author

solidguy7 commented Apr 14, 2024

Thanks for your reply! Actually, I`ve tried this method, but I have stuck using it. I create test code snippet like this:

import asyncio

from aiokafka import AIOKafkaConsumer, TopicPartition


async def test():
    consumer = AIOKafkaConsumer("test", bootstrap_servers="localhost:9092")
    try:
        tp = TopicPartition(topic="test", partition=0)
        consumer.assign([tp])
        await consumer.seek_to_beginning(tp)
    finally:
        await consumer.stop()


if __name__ == "__main__":
    asyncio.run(test())

Here I manually create partition, because consumer.assignment() returns an empty set(I don`t know because)
And this snippet returns the following error:

raise IllegalStateError(
    aiokafka.errors.IllegalStateError: IllegalStateError: Subscription to topics, partitions and pattern are mutually exclusive

I don`t understand what I do incorrectly

@ods
Copy link
Collaborator

ods commented Apr 14, 2024

You don't need assignment if you don't use groups and not going to distribute messages among workers. But you need to seek for all partitions you have. Use partitions_for_topic method to get list of all available partitions.

@solidguy7
Copy link
Author

I have also tried this method. It returns None, but my test topic has been created

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants