Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer_timeout_ms #897

Open
bunengxiu opened this issue Jun 1, 2023 · 6 comments
Open

consumer_timeout_ms #897

bunengxiu opened this issue Jun 1, 2023 · 6 comments
Labels

Comments

@bunengxiu
Copy link

I have a topic named my_topic, I want to exit process that no data is put into the topic within ten seconds when consuming my_topic

using kafka-python package, I can make consumer_timeout_ms = 10000, it works, code show as below:

from kafka import KafkaConsumer
from kafka.errors import KafkaError

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers='localhost:9092',
    group_id="my-group",
    auto_offset_reset='earliest',
    # exit process that no data is put into the topic within ten seconds
    consumer_timeout_ms=10000,  # 10s
)


def test():
    for msg in consumer:
        print(msg.value)
    print('consumer timeout, exit process that no data is put into the topic within ten seconds')


if __name__ == '__main__':
    test()

but using aiokafka package, I also make consumer_timeout_ms = 10000, It doesn't work, code show as below:

from aiokafka import AIOKafkaConsumer
import asyncio


async def async_test():
    c = AIOKafkaConsumer(
        'my_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group",
        auto_offset_reset='earliest',
        # exit process that no data is put into the topic within ten seconds
        consumer_timeout_ms=10000,  # 10秒
    )
    try:
        # exit process that no data is put into the topic within ten seconds
        async for msg in c:           #  process stuck here, After ten seconds, still stuck here, consumer_timeout_ms parameter does not works
            print("consumed: ", msg.topic, msg.partition, msg.offset, msg.key, msg.value, msg.timestamp)
    except KafkaError as e:
        print(e)
        print('kafka error')
    finally:
        print(f'my_topic is empty, time out: 10s, exit process, ')
        await c.stop()


if __name__ == '__main__':
    asyncio.run(async_test())

Which parameter should I assign to achieve my goal tthat exiting process that no data is put into the topic within ten seconds

@vmaurin
Copy link
Contributor

vmaurin commented Jun 1, 2023

I would use the "timeout_ms" parameter of the getmany method https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer.getmany

It is different from the iterator consumption of your example : instead of iterating over the consumer, you would have to loop around getmany until you get no messages back

@ods
Copy link
Collaborator

ods commented Jun 1, 2023

Looks like a bug to me.

@bunengxiu
Copy link
Author

bunengxiu commented Jun 2, 2023

I would use the "timeout_ms" parameter of the getmany method https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer.getmany

It is different from the iterator consumption of your example : instead of iterating over the consumer, you would have to loop around getmany until you get no messages back

The same parameters(consumer_timeout_ms), why the effect is not the same

@ods
Copy link
Collaborator

ods commented Jun 2, 2023

The same parameters(consumer_timeout_ms), why the effect is not the same

I believe Vincent meant to use timeout_ms parameter of getmany, not consumer_timeout_ms .

@bunengxiu
Copy link
Author

Vincent

The same parameters(consumer_timeout_ms), why the effect is not the same

I believe Vincent meant to use timeout_ms parameter of getmany, not consumer_timeout_ms .

using timeout_ms parameter of getmany, can achieve my purpose, it works
but I think that it seems simpler and easier to assign value to consumer_timerout_ms parameter of AIOKafkaConsumer

@vmaurin
Copy link
Contributor

vmaurin commented Jun 2, 2023

Despite the fact both parameters have the same name in aiokafka and python-kafka, it doesn't seem they have the same behavior

python kafka

number of milliseconds to block during message iteration before raising StopIteration (i.e., ending the iterator). Default block forever [float(‘inf’)].

aiokafka

maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. Default: 200

With a quick look, in aiokafka it is used in the fetcher in a way that matches what the documentation is saying. So the closer thing you could have in aiokafka is timeout_ms on getmany, but maybe the consumer parameter could be renamed to avoid a confusion with python-kafka (and eventually have a parameter that is used as default timeout_ms without the need to use getmany)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants