Skip to content

Commit

Permalink
Improve handling of SUBSCRIBE/UNSUBSCRIBE over disconnect/reconnect.
Browse files Browse the repository at this point in the history
If the ResumeSubs option is not set then SUBSCRIBE/UNSUBSCRIBE packets should not be stored and any tokens should be completed when the connection is lost.
  • Loading branch information
MattBrittan committed Nov 5, 2021
1 parent e0f17f5 commit a43fab9
Showing 1 changed file with 3 additions and 11 deletions.
14 changes: 3 additions & 11 deletions messageids.go
Expand Up @@ -42,7 +42,7 @@ const (
midMax uint16 = 65535
)

// cleanup clears the message ID map; it sets an error and completes PUB, SUB and UNSUB tokens.
// cleanup clears the message ID map; completes all token types and sets error on PUB, SUB and UNSUB tokens.
func (mids *messageIds) cleanUp() {
mids.Lock()
for _, token := range mids.index {
Expand All @@ -53,12 +53,7 @@ func (mids *messageIds) cleanUp() {
token.setError(fmt.Errorf("connection lost before Subscribe completed"))
case *UnsubscribeToken:
token.setError(fmt.Errorf("connection lost before Unsubscribe completed"))
case *DisconnectToken:
token.setError(fmt.Errorf("connection lost before Disconnect completed"))
case *ConnectToken:
// Connect token will be closed in the connect function
continue
case nil:
case nil: // should not be any nil entries
continue
}
token.flowComplete()
Expand All @@ -69,7 +64,7 @@ func (mids *messageIds) cleanUp() {
}

// cleanUpSubscribe removes all SUBSCRIBE and UNSUBSCRIBE tokens (setting error)
// This may be called when the connection is lost and we will not be resending SUB/UNSUB packets
// This may be called when the connection is lost, and we will not be resending SUB/UNSUB packets
func (mids *messageIds) cleanUpSubscribe() {
mids.Lock()
for mid, token := range mids.index {
Expand All @@ -80,10 +75,7 @@ func (mids *messageIds) cleanUpSubscribe() {
case *UnsubscribeToken:
token.setError(fmt.Errorf("connection lost before Unsubscribe completed"))
delete(mids.index, mid)
case nil:
continue
}
token.flowComplete()
}
mids.Unlock()
DEBUG.Println(MID, "cleaned up subs")
Expand Down

0 comments on commit a43fab9

Please sign in to comment.