BlockingConnection.process_data_events does not account for channel closure #1453
Replies: 2 comments 5 replies
-
If your application takes more than a few milliseconds to process a message, you should process it on a separate thread. Sleeping for two minutes blocks Pika's I/O completely and is a misuse of the library. Pika is not thread-safe. |
Beta Was this translation helpful? Give feedback.
-
@lukebakken Thanks for the quick response. I have added the necessity to use a separate thread in the description for completeness but that was not the point of my ticket. The point is that the acknowledgement of a message to RabbitMQ is postponed until after the |
Beta Was this translation helpful? Give feedback.
-
Hi all,
Thanks for building and maintaining pika!
We followed this example in building an app using pika: https://github.com/pika/pika/blob/main/examples/long_running_publisher.py
The function on line 25
self.connection.process_data_events(time_limit=1)
blocks a thread to handle all AMQP processing.We are communicating with RabbitMQ and we ran into an edge case scenario where a job took so long to acknowledge a message it triggered the
consumer_timeout
which caused the channel to be closed, but the connection remained open. The pika app proceeded to silently die as the channel closure did not propagate as apika.exceptions.ChannelClosedByBroker
.Upon code inspection
BlockingChannel._process_data_events
does add this functionality where it callsBlockingConnection.process_data_events
and also checks if the channel had closed and raisesraise BlockingChannel._closing_reason
if it exists.Steps to reproduce:
consumer_timeout = 60
Channel.Close reason: PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 60000 ms. This timeout value can be configured, see consumers doc guide to learn more
self.connection.process_data_events(time_limit=1)
as the connection is still opened but there is no activity as the channel is closed. The app is not notified the channel has closed.What I expected to happen:
pika.exceptions.ChannelClosedByBroker
should be thrown when blocking onBlockingConnection.process_data_events
. From the documentation I read this method is the one a pika app should block on.Options how to resolve (from my limited understanding on the code base):
BlockingChannel._process_data_events
public instead of private.BlockingChannel._closing_reason
to be inspected through aBlockingChannel.get_closing_reason
so the app is able to check itself if the channel has closed and why.BlockingChannel._closing_reason
for any channel when blocking onBlockingConnection.process_data_events
. Perhaps this option makes less sense as theBlockingChannel
currently is layered on top ofBlockingConnection
.Unclear challenges:
If there are any comments or questions please let me know. I tried to do my due diligence in making sure this is a bug and not just a question/discussion.
Beta Was this translation helpful? Give feedback.
All reactions