using add_callback_threadsafe lost acknowledgement when stopping program #1402
-
I'm using pika to process RabitMQ message by small batch, and using a thread for each batch. At the end of the function in the thread, I send acknowledgement of the messages through add_callback_threadsafe to the channel. In parallèle I'm catching SIGINT signals to stop the program properly, by waiting with thread.join() that all threads finish before stopping the channel consume and closing the connection. But once the CtrlC is sent to generate the SIGINT, event if the program wait for all threads to finish, the acknowledgement will not be processed. ==> is there a way to force the channel/connection to process the waiting add_callback_threadsafe before closing the connection ? ( initial post here) (note : you need to change the connection info to RabbitMQ : host, port, login, password line 107 to 111) Versions :
|
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 14 replies
-
I have forked your test project - https://github.com/lukebakken/pika-1402 Seems like there are some errors. I'll fix them in my fork https://github.com/lukebakken/pika-1402/tree/lukebakken/pika-1402
|
Beta Was this translation helpful? Give feedback.
-
@Stephane-Thales please see the latest version of my code here: https://github.com/lukebakken/pika-1402/blob/lukebakken/pika-1402/test_pika_blockthread.py Note that I did the following
The code works correctly when I'll file a bug but fixing it will be low priority. Thanks for reporting this and let me know if my solution works in your final product. |
Beta Was this translation helpful? Give feedback.
@Stephane-Thales please see the latest version of my code here:
https://github.com/lukebakken/pika-1402/blob/lukebakken/pika-1402/test_pika_blockthread.py
Note that I did the following
for
loop which allows for checking if exiting is requested. This could also be accomplished viaSelectConnection
and a timer.The code works correctly when
CTRL-C
is pressed. My guess is that the original issue stems from the fact thatBlockingConnection
is built on top ofSelectConnection
…