Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of SUBSCRIBE/UNSUBSCRIBE over disconnect/reconnect. #557

Merged
merged 2 commits into from Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 12 additions & 4 deletions client.go
Expand Up @@ -514,7 +514,9 @@ func (c *client) internalConnLost(err error) {
reconnect := c.options.AutoReconnect && c.connectionStatus() > connecting

if c.options.CleanSession && !reconnect {
c.messageIds.cleanUp()
c.messageIds.cleanUp() // completes PUB/SUB/UNSUB tokens
} else if !c.options.ResumeSubs {
c.messageIds.cleanUpSubscribe() // completes SUB/UNSUB tokens
}
if reconnect {
c.setConnected(reconnecting)
Expand Down Expand Up @@ -811,7 +813,9 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
}
DEBUG.Println(CLI, sub.String())

persistOutbound(c.persist, sub)
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
persistOutbound(c.persist, sub)
}
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing subscribe message (connecting), topic:", topic)
Expand Down Expand Up @@ -883,7 +887,9 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand
sub.MessageID = mID
token.messageID = mID
}
persistOutbound(c.persist, sub)
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
persistOutbound(c.persist, sub)
}
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing subscribe message (connecting), topics:", sub.Topics)
Expand Down Expand Up @@ -1096,7 +1102,9 @@ func (c *client) Unsubscribe(topics ...string) Token {
token.messageID = mID
}

persistOutbound(c.persist, unsub)
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
persistOutbound(c.persist, unsub)
}

switch c.connectionStatus() {
case connecting:
Expand Down
21 changes: 20 additions & 1 deletion messageids.go
Expand Up @@ -42,6 +42,7 @@ const (
midMax uint16 = 65535
)

// cleanup clears the message ID map; completes all token types and sets error on PUB, SUB and UNSUB tokens.
func (mids *messageIds) cleanUp() {
mids.Lock()
for _, token := range mids.index {
Expand All @@ -52,7 +53,7 @@ func (mids *messageIds) cleanUp() {
token.setError(fmt.Errorf("connection lost before Subscribe completed"))
case *UnsubscribeToken:
token.setError(fmt.Errorf("connection lost before Unsubscribe completed"))
case nil:
case nil: // should not be any nil entries
continue
}
token.flowComplete()
Expand All @@ -62,6 +63,24 @@ func (mids *messageIds) cleanUp() {
DEBUG.Println(MID, "cleaned up")
}

// cleanUpSubscribe removes all SUBSCRIBE and UNSUBSCRIBE tokens (setting error)
// This may be called when the connection is lost, and we will not be resending SUB/UNSUB packets
func (mids *messageIds) cleanUpSubscribe() {
mids.Lock()
for mid, token := range mids.index {
switch token.(type) {
case *SubscribeToken:
token.setError(fmt.Errorf("connection lost before Subscribe completed"))
delete(mids.index, mid)
case *UnsubscribeToken:
token.setError(fmt.Errorf("connection lost before Unsubscribe completed"))
delete(mids.index, mid)
}
}
mids.Unlock()
DEBUG.Println(MID, "cleaned up subs")
}

func (mids *messageIds) freeID(id uint16) {
mids.Lock()
delete(mids.index, id)
Expand Down