New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add unconflict message id allocation #1243
Add unconflict message id allocation #1243
Conversation
This PR adds unique messageId provider option. I implemented number-allocator. And I implemented UniqueMessageIdProvider using number-allocator. I have added tests.
So I think it is not a problem. I don't add the document about the option yet. I'm ready to refine the pull request. Feedbacks are very welcome. |
1d10f8b
to
777b175
Compare
I updated the PR. Type script support is refined. The PR is ready to review. |
NOTE: |
In order to avoid messageId allocating and registering conflict, during store processing, publish, subscribe, and unsubscribe functions are enqueued. The enqueued functions are invoked when the store processing will be finished. During the invocation, messageId is allocated. messageIds could be run out. In this case, stop invocation. The rest of functions in the queue are remained. When puback, pubcomp, suback, or unsuback is received, the messageId is deallocated and become reusable, so the queue invocation process is continued.
It caused Uncaught Error: write after end as follows. It had happened very subtle timing. ``` 1) Websocket Client auto reconnect should resubscribe when reconnecting: Uncaught Error: write after end at writeAfterEnd (node_modules/duplexify/node_modules/readable-stream/lib/_stream_writable.js:288:12) at Connection.Writable.write (node_modules/duplexify/node_modules/readable-stream/lib/_stream_writable.js:332:20) at Connection.<computed> [as pingresp] (node_modules/mqtt-connection/connection.js:95:10) at Connection.<anonymous> (test/server_helpers_for_client_tests.js:96:20) at Connection.emitPacket (node_modules/mqtt-connection/connection.js:10:8) at addChunk (node_modules/duplexify/node_modules/readable-stream/lib/_stream_readable.js:291:12) at readableAddChunk (node_modules/duplexify/node_modules/readable-stream/lib/_stream_readable.js:278:11) at Connection.Readable.push (node_modules/duplexify/node_modules/readable-stream/lib/_stream_readable.js:245:10) at Connection.Duplexify._forward (node_modules/duplexify/index.js:170:26) at DestroyableTransform.onreadable (node_modules/duplexify/index.js:134:10) at emitReadable_ (node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:504:10) at emitReadable (node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:498:62) at addChunk (node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:298:29) at readableAddChunk (node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:278:11) at DestroyableTransform.Readable.push (node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:245:10) at DestroyableTransform.Transform.push (node_modules/through2/node_modules/readable-stream/lib/_stream_transform.js:148:32) at Parser.push (node_modules/mqtt-connection/lib/parseStream.js:19:12) at Parser._newPacket (node_modules/mqtt-packet/parser.js:672:12) at Parser.parse (node_modules/mqtt-packet/parser.js:43:45) at DestroyableTransform.process [as _transform] (node_modules/mqtt-connection/lib/parseStream.js:14:17) at DestroyableTransform.Transform._read (node_modules/through2/node_modules/readable-stream/lib/_stream_transform.js:184:10) at DestroyableTransform.Transform._write (node_modules/through2/node_modules/readable-stream/lib/_stream_transform.js:172:83) at doWrite (node_modules/through2/node_modules/readable-stream/lib/_stream_writable.js:428:64) at writeOrBuffer (node_modules/through2/node_modules/readable-stream/lib/_stream_writable.js:417:5) at DestroyableTransform.Writable.write (node_modules/through2/node_modules/readable-stream/lib/_stream_writable.js:334:11) at Socket.ondata (internal/streams/readable.js:719:22) at addChunk (internal/streams/readable.js:309:12) at readableAddChunk (internal/streams/readable.js:284:9) at Socket.Readable.push (internal/streams/readable.js:223:10) at TCP.onStreamRead (internal/stream_base_commons.js:188:23) at TCP.callbackTrampoline (internal/async_hooks.js:131:14) ```
777b175
to
7c1368f
Compare
var messageId = 0 | ||
if (opts.qos === 1 || opts.qos === 2) { | ||
messageId = that._nextId() | ||
if (messageId === null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see here you're checking that it is null, would packet.messageId be null or would it be undefined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is null.
Let me explain.
messageId is allocated using messageIdProvider.
See https://github.com/redboltz/MQTT.js/blob/add_unconflict_message_id_allocation/lib/client.js#L188
Note that the original implementation using _nextId()
. I remained _nextId()
but it uses messageIdProvider.allocate()
See https://github.com/redboltz/MQTT.js/blob/add_unconflict_message_id_allocation/lib/client.js#L1503-L1505
There are two types of messageIdProvider.
One is simply incremental. The initial value is random. It is the original behavior. The messageId could conflict. It always return Number. So it never return null, and it never return undefined.
See https://github.com/redboltz/MQTT.js/blob/add_unconflict_message_id_allocation/lib/default-message-id-provider.js#L25-L33
The other is allocating an unique number.
See https://github.com/redboltz/MQTT.js/blob/add_unconflict_message_id_allocation/lib/unique-message-id-provider.js#L23-L26
If the all available ids are used, then return null, not undefined.
Implementation is here;
https://github.com/redboltz/number-allocator/blob/1.0.7/lib/number-allocator.js#L62-L84
See alloc() https://www.npmjs.com/package/number-allocator
alloc()
Allocate the first vacant number. The number become occupied status.
Time Complexity O(1)
- return: Number
- The first vacant number. If all numbers are occupied, return null.
So the messageId in the client.js never be undefined.
I noticed that the comment is misleading. I will update it.
https://github.com/redboltz/MQTT.js/blob/add_unconflict_message_id_allocation/lib/unique-message-id-provider.js#L21
I'm going to hold off on merging this rn, so I can include it in the next release. Once this current release is out and my comments are resolved we can merge. |
Thank you for the comment. I answered your comment (null or undefined). |
No description provided.