Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

publish() does not always call the callback #1372

Closed
vishnureddy17 opened this issue Dec 15, 2021 · 6 comments
Closed

publish() does not always call the callback #1372

vishnureddy17 opened this issue Dec 15, 2021 · 6 comments
Assignees

Comments

@vishnureddy17
Copy link
Member

In the publish() function on MqttClient, there is the publishProc() function defined inside that does the important bits for publish(). publishProc() appears to return a boolean indicating success or failure of the operation. However, in the case where publishProc() returns false (i.e., when the message ID provider runs out of IDs, causing _nextId() to return null), the callback passed to publish() will never be called. The offending code is below:

MQTT.js/lib/client.js

Lines 619 to 626 in 2679952

var messageId = 0
if (opts.qos === 1 || opts.qos === 2) {
messageId = that._nextId()
if (messageId === null) {
debug('No messageId left')
return false
}
}

This proc() patten is seen elsewhere in the client code, so other cases like this should be investigated if and when this issue gets worked on.

@YoDaMa
Copy link
Contributor

YoDaMa commented Dec 30, 2021

@vishnureddy17 do you think this is still an issue?

@vishnureddy17
Copy link
Member Author

@YoDaMa yes, but it'd only be an issue if there are 2^16 inflight messages

@redboltz
Copy link
Contributor

redboltz commented Jan 4, 2022

@YoDaMa yes, but it'd only be an issue if there are 2^16 inflight messages

I added UniqueMessageIdProvider() to solve the issue and it has been merged.
The default behavior is the first messageId (MQTT's Packet Identifier) is randomly choosed.
See

this.nextId = Math.max(1, Math.floor(Math.random() * 65535))

And then, incremental value is used.
const id = this.nextId++
// Ensure 16 bit unsigned int (max 65535, nextId got one higher)
if (this.nextId === 65536) {
this.nextId = 1
}
return id

It could be conflict. If conflict happens, the behavior is undefined.

So I introduced UniqueMessageIdProvider().
https://github.com/mqttjs/MQTT.js/blob/f04c24bd082aa9f6db2da6da65e40b2e0b692bcc/README.md

messageIdProvider: custom messageId provider. when new UniqueMessageIdProvider() is set, then non conflict messageId is provided.

When you set it as the option, the messageId is never coniflct. The allocation algorithm is returing the smallest vacant value.
See https://www.npmjs.com/package/number-allocator

If the messageId rans out, then the message is enqueued. So the callback is not called immediately. But if messageId would be reusable by PUBACK, PUBCOMP, SUBACK, or UNSUBACK receiving, then the enqueued message would be processed and the callback is called.

UniqueMessageIdProvider() isn't used by default. You need to set it as the option.

@YoDaMa YoDaMa self-assigned this Jan 4, 2022
@vishnureddy17
Copy link
Member Author

vishnureddy17 commented Jan 4, 2022

If the messageId rans out, then the message is enqueued. So the callback is not called immediately. But if messageId would be reusable by PUBACK, PUBCOMP, SUBACK, or UNSUBACK receiving, then the enqueued message would be processed and the callback is called.

From my testing, the callback will NOT be called if using UniqueMessageIdProvider and the message Ids run out. Here's a quick experiment I ran to test this:

  1. Modify UniqueMessageIdProvider code to change 65535 to some low number like 5. This will make it easier to see what happens when message IDs run out.
    this.numberAllocator = new NumberAllocator(1, 65535)
  2. Create new client (tls protocol) with messageIdProvider option set to new UniqueMessageIdProvider()
  3. Publish 10 messages with QoS 1 quickly

Result: only the 5 of the publishes result in the callback being called.
Expected behavior: Callback gets called 10 times. 5 succeed and the other 5 have an error due to not enough message IDs left.

redboltz added a commit to redboltz/MQTT.js that referenced this issue Jan 5, 2022
Process is enqueued if messageId allocation is failure.
@redboltz
Copy link
Contributor

redboltz commented Jan 5, 2022

@vishnureddy17 thank you for additional information.
I reproduced the issue and fixed it by #1393 . Please test using it.

The expected behavior is a little different from yours.

After this fix, the expected behavior is as follows:

If the messageId rans out, then the message is enqueued. So the callback is not called immediately. But if messageId would be reusable by PUBACK, PUBCOMP, SUBACK, or UNSUBACK receiving, then the enqueued message would be processed and the callback is called.

So in your test case, first 5 publishs' callbacks are called when the packet is sent. The rest 5 publish's callbacks are called when the packet is sent too. But the timing is different. In order to send sixth publish, messageId is needed. So the publish is enqueued until the messageId become reusable. When PUBACK is received, then messageId becomes reusable. Then qneueued sixth publish is processed then callback is called.

Finally, all callbacks are called.

@vishnureddy17
Copy link
Member Author

Yep, just tested and seems good to me. Thanks! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants