Skip to content

Commit

Permalink
Merge pull request #602 from ChIoT-Tech/master
Browse files Browse the repository at this point in the history
`Client` has an anonymous `sync.RWMutex`. It also has an anonymous `messageIds` which itself has an anonymous `sync.RWMutex`. This complicates refactoring because if you remove `Client.RWMutex` everything appears to continue working but the `mutex` in `messageIds` is being called (unexpected behaviour so better to make things clear).
  • Loading branch information
MattBrittan committed Aug 1, 2022
2 parents bcb58ee + ccb7d1c commit 34dc80e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 27 deletions.
46 changes: 32 additions & 14 deletions fvt_client_test.go
Expand Up @@ -1147,24 +1147,24 @@ func Test_cleanUpMids(t *testing.T) {
}

token := c.Publish("/test/cleanUP", 2, false, "cleanup test")
c.(*client).messageIds.Lock()
c.(*client).messageIds.mu.Lock()
fmt.Println("Breaking connection", len(c.(*client).messageIds.index))
if len(c.(*client).messageIds.index) == 0 {
t.Fatalf("Should be a token in the messageIDs, none found")
}
c.(*client).messageIds.Unlock()
c.(*client).messageIds.mu.Unlock()
c.(*client).internalConnLost(fmt.Errorf("cleanup test"))

time.Sleep(1 * time.Second)
if !c.IsConnected() {
t.Fail()
}

c.(*client).messageIds.Lock()
c.(*client).messageIds.mu.Lock()
if len(c.(*client).messageIds.index) > 0 {
t.Fatalf("Should have cleaned up messageIDs, have %d left", len(c.(*client).messageIds.index))
}
c.(*client).messageIds.Unlock()
c.(*client).messageIds.mu.Unlock()

// This test used to check that token.Error() was not nil. However this is not something that can
// be done reliably - it is likely to work with a remote broker but less so with a local one.
Expand Down Expand Up @@ -1320,8 +1320,6 @@ func Test_ConnectRetryPublish(t *testing.T) {
func Test_ResumeSubs(t *testing.T) {
topic := "/test/ResumeSubs"
var qos byte = 1
payload := "sample Payload"
choke := make(chan bool)

// subscribe to topic before establishing a connection, and publish a message after the publish client has connected successfully
subMemStore := NewMemoryStore()
Expand All @@ -1331,17 +1329,17 @@ func Test_ResumeSubs(t *testing.T) {

s := NewClient(sops)
sConnToken := s.Connect()
subToken := s.Subscribe(topic, qos, nil) // Message should be stored before this returns

subToken := s.Subscribe(topic, qos, nil)

// Verify the subscribe packet exists in the memorystore
// Verify subscribe packet exists in the memory store
ids := subMemStore.All()
if len(ids) == 0 {
t.Fatalf("Expected subscribe packet to be in store")
} else if len(ids) != 1 {
t.Fatalf("Expected 1 packet to be in store")
}
packet := subMemStore.Get(ids[0])
fmt.Println("packet", packet)
if packet == nil {
t.Fatal("Failed to retrieve packet from store")
}
Expand Down Expand Up @@ -1373,11 +1371,9 @@ func Test_ResumeSubs(t *testing.T) {
SetStore(subMemStore2).SetResumeSubs(true).SetCleanSession(false).SetConnectRetry(true).
SetConnectRetryInterval(time.Second / 2)

msgChan := make(chan Message)
var f MessageHandler = func(client Client, msg Message) {
if msg.Topic() != topic || string(msg.Payload()) != payload {
t.Fatalf("Received unexpected message: %v, %v", msg.Topic(), msg.Payload())
}
choke <- true
msgChan <- msg
}
sops.SetDefaultPublishHandler(f)
s = NewClient(sops).(*client)
Expand All @@ -1393,11 +1389,33 @@ func Test_ResumeSubs(t *testing.T) {
t.Fatalf("Error on valid Publish.Connect(): %v", pConnToken.Error())
}

payload := "sample Payload"
if pubToken := p.Publish(topic, 1, false, payload); pubToken.Wait() && pubToken.Error() != nil {
t.Fatalf("Error on valid Client.Publish(): %v", pubToken.Error())
}

wait(choke)
timer := time.NewTicker(time.Second) // We wait a second to ensure message is only received once
var gotMsg bool
resultLoop:
for {
select {
case msg := <-msgChan:
if msg.Topic() == topic && string(msg.Payload()) == payload {
if gotMsg {
t.Fatalf("Received message 1 twice")
}
gotMsg = true
} else {
t.Fatalf("Received unexpected message: %v, %v", msg.Topic(), msg.Payload())
}
case <-timer.C:
break resultLoop
}

}
if !gotMsg {
t.Error("did not receive message 1")
}

s.Disconnect(250)
p.Disconnect(250)
Expand Down
26 changes: 13 additions & 13 deletions messageids.go
Expand Up @@ -31,7 +31,7 @@ import (
type MId uint16

type messageIds struct {
sync.RWMutex
mu sync.RWMutex // Named to prevent Mu from being accessible directly via client
index map[uint16]tokenCompletor

lastIssuedID uint16 // The most recently issued ID. Used so we cycle through ids rather than immediately reusing them (can make debugging easier)
Expand All @@ -44,7 +44,7 @@ const (

// cleanup clears the message ID map; completes all token types and sets error on PUB, SUB and UNSUB tokens.
func (mids *messageIds) cleanUp() {
mids.Lock()
mids.mu.Lock()
for _, token := range mids.index {
switch token.(type) {
case *PublishToken:
Expand All @@ -59,14 +59,14 @@ func (mids *messageIds) cleanUp() {
token.flowComplete()
}
mids.index = make(map[uint16]tokenCompletor)
mids.Unlock()
mids.mu.Unlock()
DEBUG.Println(MID, "cleaned up")
}

// cleanUpSubscribe removes all SUBSCRIBE and UNSUBSCRIBE tokens (setting error)
// This may be called when the connection is lost, and we will not be resending SUB/UNSUB packets
func (mids *messageIds) cleanUpSubscribe() {
mids.Lock()
mids.mu.Lock()
for mid, token := range mids.index {
switch token.(type) {
case *SubscribeToken:
Expand All @@ -77,19 +77,19 @@ func (mids *messageIds) cleanUpSubscribe() {
delete(mids.index, mid)
}
}
mids.Unlock()
mids.mu.Unlock()
DEBUG.Println(MID, "cleaned up subs")
}

func (mids *messageIds) freeID(id uint16) {
mids.Lock()
mids.mu.Lock()
delete(mids.index, id)
mids.Unlock()
mids.mu.Unlock()
}

func (mids *messageIds) claimID(token tokenCompletor, id uint16) {
mids.Lock()
defer mids.Unlock()
mids.mu.Lock()
defer mids.mu.Unlock()
if _, ok := mids.index[id]; !ok {
mids.index[id] = token
} else {
Expand All @@ -105,8 +105,8 @@ func (mids *messageIds) claimID(token tokenCompletor, id uint16) {
// getID will return an available id or 0 if none available
// The id will generally be the previous id + 1 (because this makes tracing messages a bit simpler)
func (mids *messageIds) getID(t tokenCompletor) uint16 {
mids.Lock()
defer mids.Unlock()
mids.mu.Lock()
defer mids.mu.Unlock()
i := mids.lastIssuedID // note: the only situation where lastIssuedID is 0 the map will be empty
looped := false // uint16 will loop from 65535->0
for {
Expand All @@ -127,8 +127,8 @@ func (mids *messageIds) getID(t tokenCompletor) uint16 {
}

func (mids *messageIds) getToken(id uint16) tokenCompletor {
mids.RLock()
defer mids.RUnlock()
mids.mu.RLock()
defer mids.mu.RUnlock()
if token, ok := mids.index[id]; ok {
return token
}
Expand Down

0 comments on commit 34dc80e

Please sign in to comment.