Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] Unable connect to node with id: X: [Errno 111]: Connection refused #992

Open
OlehDziuba opened this issue Mar 22, 2024 · 0 comments
Labels

Comments

@OlehDziuba
Copy link

OlehDziuba commented Mar 22, 2024

Hi, I have group of consumers that on startup connect to kafka and infinite consume to messages. And I've encountered a problem that when the address of the kafka changes, the consumers don't get an error, just an infinite error logging. Since the Kafka address is taken from environment variables and I have an automatic reboot if an error occurs, it would be great if the consumer would raise an error

 consumer = AIOKafkaConsumer(
        *kafka_consumer_settings.topics
        bootstrap_servers=kafka_consumer_settings.bootstrap_servers,
        group_id=kafka_consumer_settings.group_id
    )
 
 try:
     async for msg in consumer:
         # Some message processing 
 finally:
     await consumer.stop()  # <--- it is not executed  in case of 'Unable connect to node with id: X: [Errno 111]: Connection refused'

I found it in source code:

aiokafka/aiokafka/client.py

Lines 475 to 481 in 256ce17

except (OSError, asyncio.TimeoutError, KafkaError) as err:
log.error("Unable connect to node with id %s: %s", node_id, err)
if group == ConnectionGroup.DEFAULT:
# Connection failures imply that our metadata is stale, so
# let's refresh
self.force_metadata_update()
return None

Do I understand correctly that the error is not triggered only when the consumer is in a group?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant