Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SetMaxResumePubInFlight option - limit inflight publish packets on resume #521

Merged
merged 1 commit into from Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 38 additions & 1 deletion client.go
Expand Up @@ -19,6 +19,7 @@ package mqtt

import (
"bytes"
"context"
"errors"
"fmt"
"net"
Expand All @@ -27,6 +28,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/sync/semaphore"

"github.com/eclipse/paho.mqtt.golang/packets"
)

Expand Down Expand Up @@ -919,10 +922,42 @@ func (c *client) reserveStoredPublishIDs() {
// Load all stored messages and resend them
// Call this to ensure QOS > 1,2 even after an application crash
// Note: This function will exit if c.stop is closed (this allows the shutdown to proceed avoiding a potential deadlock)
//
// other than that it does not return until all messages in the store have been sent (connect() does not complete its
// token before this completes)
func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
DEBUG.Println(STR, "enter Resume")

// Prior to sending a message getSemaphore will be called and once sent releaseSemaphore will be called
// with the token (so semaphore can be released when ACK received if applicable).
// Using a weighted semaphore rather than channels because this retains ordering
getSemaphore := func() {} // Default = do nothing
releaseSemaphore := func(_ *PublishToken) {} // Default = do nothing
var sem *semaphore.Weighted
if c.options.MaxResumePubInFlight > 0 {
sem = semaphore.NewWeighted(int64(c.options.MaxResumePubInFlight))
ctx, cancel := context.WithCancel(context.Background()) // Context needed for semaphore
defer cancel() // ensure context gets cancelled

go func() {
select {
case <-c.stop: // Request to stop (due to comm error etc)
cancel()
case <-ctx.Done(): // resume completed normally
}
}()

getSemaphore = func() { sem.Acquire(ctx, 1) }
releaseSemaphore = func(token *PublishToken) { // Note: If token never completes then resume() may stall (will still exit on ctx.Done())
go func() {
select {
case <-token.Done():
case <-ctx.Done():
}
sem.Release(1)
}()
}
}

storedKeys := c.persist.All()
for _, key := range storedKeys {
packet := c.persist.Get(key)
Expand Down Expand Up @@ -986,12 +1021,14 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
c.claimID(token, details.MessageID)
DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID))
DEBUG.Println(STR, details)
getSemaphore()
select {
case c.obound <- &PacketAndToken{p: p, t: token}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
releaseSemaphore(token) // If limiting simultaneous messages then we need to know when message is acknowledged
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
c.persist.Del(key)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -5,4 +5,5 @@ go 1.14
require (
github.com/gorilla/websocket v1.4.2
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -3,6 +3,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
161 changes: 161 additions & 0 deletions memstore_ordered.go
@@ -0,0 +1,161 @@
/*
* Copyright (c) 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Seth Hoenig
* Allan Stockdill-Mander
* Mike Robertson
*/

package mqtt

import (
"sort"
"sync"
"time"

"github.com/eclipse/paho.mqtt.golang/packets"
)

// OrderedMemoryStore uses a map internally so the order in which All() returns packets is
// undefined. OrderedMemoryStore resolves this by storing the time the message is added
// and sorting based upon this.

// storedMessage encapsulates a message and the time it was initially stored
type storedMessage struct {
ts time.Time
msg packets.ControlPacket
}

// OrderedMemoryStore implements the store interface to provide a "persistence"
// mechanism wholly stored in memory. This is only useful for
// as long as the client instance exists.
type OrderedMemoryStore struct {
sync.RWMutex
messages map[string]storedMessage
opened bool
}

// NewOrderedMemoryStore returns a pointer to a new instance of
// OrderedMemoryStore, the instance is not initialized and ready to
// use until Open() has been called on it.
func NewOrderedMemoryStore() *OrderedMemoryStore {
store := &OrderedMemoryStore{
messages: make(map[string]storedMessage),
opened: false,
}
return store
}

// Open initializes a OrderedMemoryStore instance.
func (store *OrderedMemoryStore) Open() {
store.Lock()
defer store.Unlock()
store.opened = true
DEBUG.Println(STR, "OrderedMemoryStore initialized")
}

// Put takes a key and a pointer to a Message and stores the
// message.
func (store *OrderedMemoryStore) Put(key string, message packets.ControlPacket) {
store.Lock()
defer store.Unlock()
if !store.opened {
ERROR.Println(STR, "Trying to use memory store, but not open")
return
}
store.messages[key] = storedMessage{ts: time.Now(), msg: message}
}

// Get takes a key and looks in the store for a matching Message
// returning either the Message pointer or nil.
func (store *OrderedMemoryStore) Get(key string) packets.ControlPacket {
store.RLock()
defer store.RUnlock()
if !store.opened {
ERROR.Println(STR, "Trying to use memory store, but not open")
return nil
}
mid := mIDFromKey(key)
m, ok := store.messages[key]
if !ok || m.msg == nil {
CRITICAL.Println(STR, "OrderedMemoryStore get: message", mid, "not found")
} else {
DEBUG.Println(STR, "OrderedMemoryStore get: message", mid, "found")
}
return m.msg
}

// All returns a slice of strings containing all the keys currently
// in the OrderedMemoryStore.
func (store *OrderedMemoryStore) All() []string {
store.RLock()
defer store.RUnlock()
if !store.opened {
ERROR.Println(STR, "Trying to use memory store, but not open")
return nil
}
type tsAndKey struct {
ts time.Time
key string
}

tsKeys := make([]tsAndKey, 0, len(store.messages))
for k, v := range store.messages {
tsKeys = append(tsKeys, tsAndKey{ts: v.ts, key: k})
}
sort.Slice(tsKeys, func(a int, b int) bool { return tsKeys[a].ts.Before(tsKeys[b].ts) })

keys := make([]string, len(tsKeys))
for i := range tsKeys {
keys[i] = tsKeys[i].key
}
return keys
}

// Del takes a key, searches the OrderedMemoryStore and if the key is found
// deletes the Message pointer associated with it.
func (store *OrderedMemoryStore) Del(key string) {
store.Lock()
defer store.Unlock()
if !store.opened {
ERROR.Println(STR, "Trying to use memory store, but not open")
return
}
mid := mIDFromKey(key)
_, ok := store.messages[key]
if !ok {
WARN.Println(STR, "OrderedMemoryStore del: message", mid, "not found")
} else {
delete(store.messages, key)
DEBUG.Println(STR, "OrderedMemoryStore del: message", mid, "was deleted")
}
}

// Close will disallow modifications to the state of the store.
func (store *OrderedMemoryStore) Close() {
store.Lock()
defer store.Unlock()
if !store.opened {
ERROR.Println(STR, "Trying to close memory store, but not open")
return
}
store.opened = false
DEBUG.Println(STR, "OrderedMemoryStore closed")
}

// Reset eliminates all persisted message data in the store.
func (store *OrderedMemoryStore) Reset() {
store.Lock()
defer store.Unlock()
if !store.opened {
ERROR.Println(STR, "Trying to reset memory store, but not open")
}
store.messages = make(map[string]storedMessage)
WARN.Println(STR, "OrderedMemoryStore wiped")
}
11 changes: 11 additions & 0 deletions options.go
Expand Up @@ -88,6 +88,7 @@ type ClientOptions struct {
ResumeSubs bool
HTTPHeaders http.Header
WebsocketOptions *WebsocketOptions
MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming
}

// NewClientOptions will create a new ClientClientOptions type with some
Expand Down Expand Up @@ -401,3 +402,13 @@ func (o *ClientOptions) SetWebsocketOptions(w *WebsocketOptions) *ClientOptions
o.WebsocketOptions = w
return o
}

// SetMaxResumePubInFlight sets the maximum simultaneous publish messages that will be sent while resuming. Note that
// this only applies to messages coming from the store (so additional sends may push us over the limit)
// Note that the connect token will not be flagged as complete until all messages have been sent from the
// store. If broker does not respond to messages then resume may not complete.
// This option was put in place because resuming after downtime can saturate low capacity links.
func (o *ClientOptions) SetMaxResumePubInFlight(MaxResumePubInFlight int) *ClientOptions {
o.MaxResumePubInFlight = MaxResumePubInFlight
return o
}