New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix resubscribe message id allocate twice #1337
Fix resubscribe message id allocate twice #1337
Conversation
Please don't merge this PR. |
resubscribe is out of MQTT spec. It is MQTT.js expansion. On connect sequence, the following three steps are defined by the MQTT Spec. 1. The client sends CONNECT to the broker with CleanStart:false 2. The broker sends CONNACK to the client with SessionPresent:true if session exists 3. The client re-sends in-flight PUBLISH and PUBREL messages resubscribe was processed between the step 2 and step 3. It's too early. The resubscribe might allocate messageId that is the same as PUBLISH or PUBREL packet. It is not good. So I moved resubscribe process to after the step 3.
1a9518f
to
5212022
Compare
I updated the PR. I think that now the PR is ready to merge. |
@@ -335,6 +338,7 @@ function MqttClient (streamBuilder, options) { | |||
var packet = null | |||
|
|||
if (!entry) { | |||
that._resubscribe() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@redboltz I'm curious what the reason for validating there is no entry. Is that so if there are no queued packets left, then it will resubscribe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly.
lib/client.js
Outdated
@@ -343,7 +347,7 @@ function MqttClient (streamBuilder, options) { | |||
var send = true | |||
if (packet.messageId && packet.messageId !== 0) { | |||
if (!that.messageIdProvider.register(packet.messageId)) { | |||
packet.messageeId = that.messageIdProvider.allocate() | |||
packet.messageId = that.messageIdProvider.allocate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so why did this get through the tests? it's cause it was a double typo in here and in the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. There was no double typo. It is a fallback code for registration failure.
The failed test case was a combination of UniqueMessageIdProvider and resubscribe.
The error was when suback was received, no matched messageId was found in outgoing
.
Before fixed the typo, packet has already been allocated messageId
on resubscribe().
The messageId was 1. UniqueMessageIdProvider marked 1 as used.
The packet was pushed into queue
because connected is false.
And in subscribeProc that.outgoing[packet.messageId]
was added. The message id was 1.
See
Call resubscribe
Lines 1709 to 1721 in c92b877
MqttClient.prototype._onConnect = function (packet) { | |
if (this.disconnected) { | |
this.emit('connect', packet) | |
return | |
} | |
var that = this | |
this.messageIdProvider.clear() | |
this._setupPingTimer() | |
this._resubscribe(packet) | |
this.connected = true |
resubscribe
Line 1677 in c92b877
MqttClient.prototype._resubscribe = function (connack) { |
Line 1690 in c92b877
this.subscribe(resubscribeTopic, {properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties}) |
subscribe
Line 693 in c92b877
MqttClient.prototype.subscribe = function () { |
Line 853 in c92b877
subscribeProc() |
subscribeProc
Line 787 in c92b877
var subscribeProc = function () { |
Line 827 in c92b877
that.outgoing[packet.messageId] = { |
Line 841 in c92b877
that._sendPacket(packet) |
_sendPacket
Line 1203 in c92b877
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) { |
Lines 1214 to 1218 in c92b877
if (!this.connected) { | |
debug('_sendPacket :: client not connected. Storing packet offline.') | |
this._storePacket(packet, cb, cbStorePut) | |
return | |
} |
_storePacket
Line 1260 in c92b877
MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) { |
Line 1278 in c92b877
this.queue.push({ packet: storePacket, cb: cb }) |
_handleAck
Lines 1535 to 1548 in c92b877
MqttClient.prototype._handleAck = function (packet) { | |
/* eslint no-fallthrough: "off" */ | |
var messageId = packet.messageId | |
var type = packet.cmd | |
var response = null | |
var cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null | |
var that = this | |
var err | |
if (!cb) { | |
debug('_handleAck :: Server sent an ack in error. Ignoring.') | |
// Server sent an ack in error, ignore it. | |
return | |
} |
At the following point, try to register the messageId 1 but failed because UniqueMessageIdProvider had marked 1 as userd.
Lines 344 to 346 in c92b877
if (packet.messageId && packet.messageId !== 0) { | |
if (!that.messageIdProvider.register(packet.messageId)) { | |
packet.messageeId = that.messageIdProvider.allocate() |
Then tried to allocate the new messageId. It is a fallback code. I just noticed that the fallback code should be removed because there is no registration failed case. In addition allocating the new messageId doesn't help the situation if it happens by bug. I will update the PR.
Anyway the new allocated messageId was 2 but it was written as messageeId.
So the packet filed was as follows:
{
messageId:1
messageeId:2
}
The packet was sent with messageId 1 and suback is received messageId 1. That matched outgoing
. So the test was passed.
After the typo fixed, the packet filed became as follows:
{
messageId:2
}
messageId was overwritten. But outgoing
still has messageId 1. So it didn't mached.
lib/client.js
Outdated
debug('_resubscribe') | ||
var _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics) | ||
if (!this._firstConnection && | ||
(this.options.clean || (this.options.protocolVersion === 5 && !connack.sessionPresent)) && | ||
(this.options.clean || (this.options.protocolVersion === 5 && !this.sessionPresent)) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so resubscribe no longer uses CONNACK. this makes sense...
lib/client.js
Outdated
@@ -1714,9 +1718,9 @@ MqttClient.prototype._onConnect = function (packet) { | |||
|
|||
var that = this | |||
|
|||
this.sessionPresent = packet.sessionPresent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but why do this rather than just pass the packet with the .sessionPresent
to the reconnect? This introduces the possibility of less well defined behavior if the this.sessionPresent
field is manipulated elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean use this.connackPacket = packet
here?
And at #1337 (review)
(this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) &&
It still has possibility that someone uses undefined this.connackPacket
. However I think the possibility is lower than this.sessionPresent
.
No description provided.