New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Auto-ack of messages in router.go renders API call mqtt.Message.Ack() useless #459
Comments
This bug is similar to #396, but the description of the problem is different. I'm consuming my messages then passing them off to worker processes on a go channel, and expecting to be able to Ack() them there. He's requesting custom consumers or error-based execution of auto-ack. I suspect that automatic acks by router.go are the defect, not the solution. |
I think the two issues are essentially the same. Unfortunately the automatic call to
|
This is an interesting one, because by the spec the original behaviour is correct. Once the client library has successfully received the message it should acknowledge it, the resend mechanism isn't designed as a feature to be used by applications, it's to ensure that messages successfully pass the network. |
@alsm I think you're correct. Receiver initiates the onward delivery of the application message, discards the packet identifier, and issues PUBCOMP. I suppose that means mqtt.Message.Ack() should not be exposed to the public interface? |
Manual Ack is not about correctness but the efficiency of the message processing. Most message processing involves persisting the results into a database or another message queuing system, where high throughput could be only achieved by batches/transactions with a reasonable size. Processing message one by one is correct but very inefficient. With automatic ack always enabled, the only way to work around this seems to send the messages to a fast in-memory queue like Redis, automatically ack and start the real processing from there. |
Based on the comments on the v5 repo I'm happy to accept a PR for this. Any implementation will need to allow for the possibility of the connection dropping and reconnecting before the ACK is requested (in that case the broker should resend the packet when the connection comes up again so the ACK should probably just be dropped). Note that the proposed changes to the V5 client ensure the the ACK packets are sent in the correct order; while this is a requirement of the MQTT spec its not something I'd insist on (its not currently guaranteed and could cause issues when one ACK is delayed and there is a limit to the number of messages in flight). |
@MattBrittan PR for the same is opened for review. Thanks! |
Thanks for the PR (#578 ) @shivamkm07. I added the below comments to that PR and thought I'd copy them here as I suspect they will reach more people and I'm really keen to get input on this. These notes/issues may seem fussy but I think its important that we get this right (once live it is very difficult to change the API and I want to minimise future issues caused by users not understanding how this all works). It's probably possible to address these concerns by documenting the My concerns (with #578) are:
Adding this feature is likely to result in longer running handlers so I feel that its important to note:
Given these concerns I did consider whether we are going about this in the right way; alternatives would include:
If the consensus is that the approach in the PR works for everyone then I'm happy to move forward with it (subject to the addition of some comments covering the above issues). |
Updated the associated PR with adding an option in |
I attempted to verify that I could consume a message and pass it to a worker for processing. If the processing successfully completes, only then do I want to ack the message. However, I notice that the messages are acked during consumption by router.go in three places inside the matchAndDispatch function.
I believe this is a bug. If the intended purpose is to ack the message only after the message has been completely processed, then these internal acks are bypassing that functionality. If the intended purpose is to ack the message immediately after consumption but before processing, then exposing mqtt.Message.Ack() serves no purpose and executes no code (because it wraps a once.Do({}) clause).
The text was updated successfully, but these errors were encountered: