Skip to content

Commit

Permalink
Merge pull request #557 from ChIoT-Tech/master
Browse files Browse the repository at this point in the history
Improve handling of SUBSCRIBE/UNSUBSCRIBE over disconnect/reconnect.
If the ResumeSubs option is not set then SUBSCRIBE/UNSUBSCRIBE packets should not be stored and any tokens should be completed when the connection is lost. This should resolve #555 but I cannot think of a way of testing this.
  • Loading branch information
MattBrittan committed Nov 5, 2021
2 parents 04f5644 + a43fab9 commit d978c56
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 5 deletions.
16 changes: 12 additions & 4 deletions client.go
Expand Up @@ -514,7 +514,9 @@ func (c *client) internalConnLost(err error) {
reconnect := c.options.AutoReconnect && c.connectionStatus() > connecting

if c.options.CleanSession && !reconnect {
c.messageIds.cleanUp()
c.messageIds.cleanUp() // completes PUB/SUB/UNSUB tokens
} else if !c.options.ResumeSubs {
c.messageIds.cleanUpSubscribe() // completes SUB/UNSUB tokens
}
if reconnect {
c.setConnected(reconnecting)
Expand Down Expand Up @@ -811,7 +813,9 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
}
DEBUG.Println(CLI, sub.String())

persistOutbound(c.persist, sub)
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
persistOutbound(c.persist, sub)
}
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing subscribe message (connecting), topic:", topic)
Expand Down Expand Up @@ -883,7 +887,9 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand
sub.MessageID = mID
token.messageID = mID
}
persistOutbound(c.persist, sub)
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
persistOutbound(c.persist, sub)
}
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing subscribe message (connecting), topics:", sub.Topics)
Expand Down Expand Up @@ -1096,7 +1102,9 @@ func (c *client) Unsubscribe(topics ...string) Token {
token.messageID = mID
}

persistOutbound(c.persist, unsub)
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
persistOutbound(c.persist, unsub)
}

switch c.connectionStatus() {
case connecting:
Expand Down
21 changes: 20 additions & 1 deletion messageids.go
Expand Up @@ -42,6 +42,7 @@ const (
midMax uint16 = 65535
)

// cleanup clears the message ID map; completes all token types and sets error on PUB, SUB and UNSUB tokens.
func (mids *messageIds) cleanUp() {
mids.Lock()
for _, token := range mids.index {
Expand All @@ -52,7 +53,7 @@ func (mids *messageIds) cleanUp() {
token.setError(fmt.Errorf("connection lost before Subscribe completed"))
case *UnsubscribeToken:
token.setError(fmt.Errorf("connection lost before Unsubscribe completed"))
case nil:
case nil: // should not be any nil entries
continue
}
token.flowComplete()
Expand All @@ -62,6 +63,24 @@ func (mids *messageIds) cleanUp() {
DEBUG.Println(MID, "cleaned up")
}

// cleanUpSubscribe removes all SUBSCRIBE and UNSUBSCRIBE tokens (setting error)
// This may be called when the connection is lost, and we will not be resending SUB/UNSUB packets
func (mids *messageIds) cleanUpSubscribe() {
mids.Lock()
for mid, token := range mids.index {
switch token.(type) {
case *SubscribeToken:
token.setError(fmt.Errorf("connection lost before Subscribe completed"))
delete(mids.index, mid)
case *UnsubscribeToken:
token.setError(fmt.Errorf("connection lost before Unsubscribe completed"))
delete(mids.index, mid)
}
}
mids.Unlock()
DEBUG.Println(MID, "cleaned up subs")
}

func (mids *messageIds) freeID(id uint16) {
mids.Lock()
delete(mids.index, id)
Expand Down

0 comments on commit d978c56

Please sign in to comment.