Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor MessageID Mutex #602

Merged
merged 2 commits into from Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 32 additions & 14 deletions fvt_client_test.go
Expand Up @@ -1147,24 +1147,24 @@ func Test_cleanUpMids(t *testing.T) {
}

token := c.Publish("/test/cleanUP", 2, false, "cleanup test")
c.(*client).messageIds.Lock()
c.(*client).messageIds.mu.Lock()
fmt.Println("Breaking connection", len(c.(*client).messageIds.index))
if len(c.(*client).messageIds.index) == 0 {
t.Fatalf("Should be a token in the messageIDs, none found")
}
c.(*client).messageIds.Unlock()
c.(*client).messageIds.mu.Unlock()
c.(*client).internalConnLost(fmt.Errorf("cleanup test"))

time.Sleep(1 * time.Second)
if !c.IsConnected() {
t.Fail()
}

c.(*client).messageIds.Lock()
c.(*client).messageIds.mu.Lock()
if len(c.(*client).messageIds.index) > 0 {
t.Fatalf("Should have cleaned up messageIDs, have %d left", len(c.(*client).messageIds.index))
}
c.(*client).messageIds.Unlock()
c.(*client).messageIds.mu.Unlock()

// This test used to check that token.Error() was not nil. However this is not something that can
// be done reliably - it is likely to work with a remote broker but less so with a local one.
Expand Down Expand Up @@ -1320,8 +1320,6 @@ func Test_ConnectRetryPublish(t *testing.T) {
func Test_ResumeSubs(t *testing.T) {
topic := "/test/ResumeSubs"
var qos byte = 1
payload := "sample Payload"
choke := make(chan bool)

// subscribe to topic before establishing a connection, and publish a message after the publish client has connected successfully
subMemStore := NewMemoryStore()
Expand All @@ -1331,17 +1329,17 @@ func Test_ResumeSubs(t *testing.T) {

s := NewClient(sops)
sConnToken := s.Connect()
subToken := s.Subscribe(topic, qos, nil) // Message should be stored before this returns

subToken := s.Subscribe(topic, qos, nil)

// Verify the subscribe packet exists in the memorystore
// Verify subscribe packet exists in the memory store
ids := subMemStore.All()
if len(ids) == 0 {
t.Fatalf("Expected subscribe packet to be in store")
} else if len(ids) != 1 {
t.Fatalf("Expected 1 packet to be in store")
}
packet := subMemStore.Get(ids[0])
fmt.Println("packet", packet)
if packet == nil {
t.Fatal("Failed to retrieve packet from store")
}
Expand Down Expand Up @@ -1373,11 +1371,9 @@ func Test_ResumeSubs(t *testing.T) {
SetStore(subMemStore2).SetResumeSubs(true).SetCleanSession(false).SetConnectRetry(true).
SetConnectRetryInterval(time.Second / 2)

msgChan := make(chan Message)
var f MessageHandler = func(client Client, msg Message) {
if msg.Topic() != topic || string(msg.Payload()) != payload {
t.Fatalf("Received unexpected message: %v, %v", msg.Topic(), msg.Payload())
}
choke <- true
msgChan <- msg
}
sops.SetDefaultPublishHandler(f)
s = NewClient(sops).(*client)
Expand All @@ -1393,11 +1389,33 @@ func Test_ResumeSubs(t *testing.T) {
t.Fatalf("Error on valid Publish.Connect(): %v", pConnToken.Error())
}

payload := "sample Payload"
if pubToken := p.Publish(topic, 1, false, payload); pubToken.Wait() && pubToken.Error() != nil {
t.Fatalf("Error on valid Client.Publish(): %v", pubToken.Error())
}

wait(choke)
timer := time.NewTicker(time.Second) // We wait a second to ensure message is only received once
var gotMsg bool
resultLoop:
for {
select {
case msg := <-msgChan:
if msg.Topic() == topic && string(msg.Payload()) == payload {
if gotMsg {
t.Fatalf("Received message 1 twice")
}
gotMsg = true
} else {
t.Fatalf("Received unexpected message: %v, %v", msg.Topic(), msg.Payload())
}
case <-timer.C:
break resultLoop
}

}
if !gotMsg {
t.Error("did not receive message 1")
}

s.Disconnect(250)
p.Disconnect(250)
Expand Down
26 changes: 13 additions & 13 deletions messageids.go
Expand Up @@ -31,7 +31,7 @@ import (
type MId uint16

type messageIds struct {
sync.RWMutex
mu sync.RWMutex // Named to prevent Mu from being accessible directly via client
index map[uint16]tokenCompletor

lastIssuedID uint16 // The most recently issued ID. Used so we cycle through ids rather than immediately reusing them (can make debugging easier)
Expand All @@ -44,7 +44,7 @@ const (

// cleanup clears the message ID map; completes all token types and sets error on PUB, SUB and UNSUB tokens.
func (mids *messageIds) cleanUp() {
mids.Lock()
mids.mu.Lock()
for _, token := range mids.index {
switch token.(type) {
case *PublishToken:
Expand All @@ -59,14 +59,14 @@ func (mids *messageIds) cleanUp() {
token.flowComplete()
}
mids.index = make(map[uint16]tokenCompletor)
mids.Unlock()
mids.mu.Unlock()
DEBUG.Println(MID, "cleaned up")
}

// cleanUpSubscribe removes all SUBSCRIBE and UNSUBSCRIBE tokens (setting error)
// This may be called when the connection is lost, and we will not be resending SUB/UNSUB packets
func (mids *messageIds) cleanUpSubscribe() {
mids.Lock()
mids.mu.Lock()
for mid, token := range mids.index {
switch token.(type) {
case *SubscribeToken:
Expand All @@ -77,19 +77,19 @@ func (mids *messageIds) cleanUpSubscribe() {
delete(mids.index, mid)
}
}
mids.Unlock()
mids.mu.Unlock()
DEBUG.Println(MID, "cleaned up subs")
}

func (mids *messageIds) freeID(id uint16) {
mids.Lock()
mids.mu.Lock()
delete(mids.index, id)
mids.Unlock()
mids.mu.Unlock()
}

func (mids *messageIds) claimID(token tokenCompletor, id uint16) {
mids.Lock()
defer mids.Unlock()
mids.mu.Lock()
defer mids.mu.Unlock()
if _, ok := mids.index[id]; !ok {
mids.index[id] = token
} else {
Expand All @@ -105,8 +105,8 @@ func (mids *messageIds) claimID(token tokenCompletor, id uint16) {
// getID will return an available id or 0 if none available
// The id will generally be the previous id + 1 (because this makes tracing messages a bit simpler)
func (mids *messageIds) getID(t tokenCompletor) uint16 {
mids.Lock()
defer mids.Unlock()
mids.mu.Lock()
defer mids.mu.Unlock()
i := mids.lastIssuedID // note: the only situation where lastIssuedID is 0 the map will be empty
looped := false // uint16 will loop from 65535->0
for {
Expand All @@ -127,8 +127,8 @@ func (mids *messageIds) getID(t tokenCompletor) uint16 {
}

func (mids *messageIds) getToken(id uint16) tokenCompletor {
mids.RLock()
defer mids.RUnlock()
mids.mu.RLock()
defer mids.mu.RUnlock()
if token, ok := mids.index[id]; ok {
return token
}
Expand Down