Skip to content

Commit

Permalink
Rearchitect status handling as per issue #605
Browse files Browse the repository at this point in the history
This is a major change that moves handling of the status (i.e. disconnected, connecting etc) out to status.go. It also introduces the new status disconnecting.

All existing tests (including 10000 runs through Test_DisconnectWhileProcessingIncomingPublish) pass and I have added new ones to specifically test the connectionStatus functionality. However this is a major change so bugs may have been introduced!

The major aim of this change is to simplify future work, It has been difficult to implement changes/refactor because the status handling was fragile (and not fully thread safe in some instances).
  • Loading branch information
MattBrittan committed Aug 10, 2022
2 parents bc0e78b + 2b1657a commit 079a117
Show file tree
Hide file tree
Showing 8 changed files with 952 additions and 182 deletions.
336 changes: 190 additions & 146 deletions client.go

Large diffs are not rendered by default.

62 changes: 40 additions & 22 deletions fvt_client_test.go
Expand Up @@ -126,7 +126,7 @@ func Test_Disconnect(t *testing.T) {
go func() {
c.Disconnect(250)
cli := c.(*client)
cli.status = connected
cli.status.forceConnectionStatus(connected)
c.Disconnect(250)
close(disconnectC)
}()
Expand Down Expand Up @@ -1191,29 +1191,36 @@ func Test_cleanUpMids_2(t *testing.T) {
ops.SetKeepAlive(10 * time.Second)

c := NewClient(ops)
cl := c.(*client)

if token := c.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}

token := c.Publish("/test/cleanUP", 2, false, "cleanup test 2")
if len(c.(*client).messageIds.index) == 0 {
cl.messageIds.mu.Lock()
mq := len(c.(*client).messageIds.index)
cl.messageIds.mu.Unlock()
if mq == 0 {
t.Fatalf("Should be a token in the messageIDs, none found")
}
fmt.Println("Disconnecting", len(c.(*client).messageIds.index))
// fmt.Println("Disconnecting", len(cl.messageIds.index))
c.Disconnect(0)

fmt.Println("Wait on Token")
// We should be able to wait on this token without any issue
token.Wait()

if len(c.(*client).messageIds.index) > 0 {
cl.messageIds.mu.Lock()
mq = len(c.(*client).messageIds.index)
cl.messageIds.mu.Unlock()
if mq > 0 {
t.Fatalf("Should have cleaned up messageIDs, have %d left", len(c.(*client).messageIds.index))
}
if token.Error() == nil {
t.Fatal("token should have received an error on connection loss")
}
fmt.Println(token.Error())
// fmt.Println(token.Error())
}

func Test_ConnectRetry(t *testing.T) {
Expand Down Expand Up @@ -1339,7 +1346,6 @@ func Test_ResumeSubs(t *testing.T) {
t.Fatalf("Expected 1 packet to be in store")
}
packet := subMemStore.Get(ids[0])
fmt.Println("packet", packet)
if packet == nil {
t.Fatal("Failed to retrieve packet from store")
}
Expand Down Expand Up @@ -1471,11 +1477,12 @@ func Test_ResumeSubsWithReconnect(t *testing.T) {
c.Disconnect(250)
}

// Issue 209 - occasional deadlock when connections are lost unexpectedly
// Issue 509 - occasional deadlock when connections are lost unexpectedly
// This was quite a nasty deadlock which occurred in very rare circumstances; I could not come up with a reliable way of
// replicating this but the below would cause it to happen fairly consistently (when the test was run a decent number
// of times). Following the fix it ran 10,000 times without issue.
// go test -count 10000 -run DisconnectWhileProcessingIncomingPublish
//
// go test -count 10000 -run DisconnectWhileProcessingIncomingPublish
func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
topic := "/test/DisconnectWhileProcessingIncomingPublish"

Expand All @@ -1487,11 +1494,11 @@ func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {

sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetAutoReconnect(false) // We dont want the connection to be re-established
sops.SetAutoReconnect(false) // We don't want the connection to be re-established
sops.SetWriteTimeout(500 * time.Millisecond) // We will be sending a lot of publish messages and want go routines to clear...
// sops.SetOrderMatters(false)
sops.SetClientID("dwpip-sub")
// We need to know when the subscriber has lost its connection (this indicates that the deadlock has not occured)
// We need to know when the subscriber has lost its connection (this indicates that the deadlock has not occurred)
sDisconnected := make(chan struct{})
sops.SetConnectionLostHandler(func(Client, error) { close(sDisconnected) })

Expand Down Expand Up @@ -1523,20 +1530,23 @@ func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
i := 0
for {
p.Publish(topic, 1, false, fmt.Sprintf("test message: %d", i))
// After the connection goes down s.Publish will start blocking (this is not ideal but fixing its a problem for another time)
go func() { s.Publish(topic+"IGNORE", 1, false, fmt.Sprintf("test message: %d", i)) }()
// After the connection goes down s.Publish will start blocking (this is not ideal but fixing it's a problem for another time)
go func(i int) { s.Publish(topic+"IGNORE", 1, false, fmt.Sprintf("test message: %d", i)) }(i)
i++

if ctx.Err() != nil {
return
}
}
}()

// Wait until we have received a message (ensuring that the stream of messages has started)
delay := time.NewTimer(time.Second) // Be careful with timers as this will be run in a tight loop!
select {
case <-msgReceived: // All good
case <-time.After(time.Second):
if !delay.Stop() { // Cleanly close timer as this may be run in a tight loop!
<-delay.C
}
case <-delay.C:
t.Errorf("no messages received")
}

Expand All @@ -1545,34 +1555,42 @@ func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
err := dm.Write(s.conn)
if err != nil {
t.Fatalf("error dending disconnect packet: %s", err)
t.Fatalf("error sending disconnect packet: %s", err)
}

// Lets give the library up to a second to shutdown (indicated by the status changing)
delay = time.NewTimer(time.Second) // Be careful with timers as this will be run in a tight loop!
select {
case <-sDisconnected: // All good
case <-time.After(time.Second):
cancel() // no point leaving publisher running
time.Sleep(time.Second) // Allow publish calls to timeout (otherwise there will be tons of go routines running!)
if !delay.Stop() {
<-delay.C
}
case <-delay.C:
cancel() // no point leaving publisher running
time.Sleep(10 * time.Second) // Allow publish calls to timeout (otherwise there will be tons of go routines running!)
buf := make([]byte, 1<<20)
stacklen := runtime.Stack(buf, true)
t.Fatalf("connection was not lost as expected - probable deadlock. Stacktrace follows: %s", buf[:stacklen])
}

cancel() // no point leaving publisher running

delay = time.NewTimer(time.Second) // Be careful with timers as this will be run in a tight loop!
select {
case <-pubDone:
case <-time.After(time.Second):
t.Errorf("pubdone not closed within a second")
if !delay.Stop() {
<-delay.C
}
case <-delay.C:
t.Errorf("pubdone not closed within two seconds (probably due to load on system but may be an issue)")
}
p.Disconnect(250) // Close publisher
}

// Test_ResumeSubsMaxInflight - Check the MaxResumePubInFlight option.
// This is difficult to test without control of the broker (because we will be communicating via the broker not
// directly. However due to the way resume works when there is no limit to inflight messages message ordering is not
// guaranteed. However with SetMaxResumePubInFlight(1) it is guaranteed so we use that to test.
// directly. However, due to the way resume works when there is no limit to inflight messages message ordering is not
// guaranteed. However, with SetMaxResumePubInFlight(1) it is guaranteed so we use that to test.
// On my PC (using mosquitto under docker) running this without SetMaxResumePubInFlight(1) will fail with 1000 messages
// (generally passes if only 100 are sent). With the option set it always passes.
func Test_ResumeSubsMaxInflight(t *testing.T) {
Expand Down
11 changes: 6 additions & 5 deletions net.go
Expand Up @@ -150,7 +150,7 @@ type incomingComms struct {

// startIncomingComms initiates incoming communications; this includes starting a goroutine to process incoming
// messages.
// Accepts a channel of inbound messages from the store (persisted messages); note this must be closed as soon as the
// Accepts a channel of inbound messages from the store (persisted messages); note this must be closed as soon as
// everything in the store has been sent.
// Returns a channel that will be passed any received packets; this will be closed on a network error (and inboundFromStore closed)
func startIncomingComms(conn io.Reader,
Expand Down Expand Up @@ -332,7 +332,7 @@ func startOutgoingComms(conn net.Conn,
DEBUG.Println(NET, "outbound wrote disconnect, closing connection")
// As per the MQTT spec "After sending a DISCONNECT Packet the Client MUST close the Network Connection"
// Closing the connection will cause the goroutines to end in sequence (starting with incoming comms)
conn.Close()
_ = conn.Close()
}
case msg, ok := <-oboundFromIncoming: // message triggered by an inbound message (PubrecPacket or PubrelPacket)
if !ok {
Expand Down Expand Up @@ -370,9 +370,10 @@ type commsFns interface {
// startComms initiates goroutines that handles communications over the network connection
// Messages will be stored (via commsFns) and deleted from the store as necessary
// It returns two channels:
// packets.PublishPacket - Will receive publish packets received over the network.
// Closed when incoming comms routines exit (on shutdown or if network link closed)
// error - Any errors will be sent on this channel. The channel is closed when all comms routines have shut down
//
// packets.PublishPacket - Will receive publish packets received over the network.
// Closed when incoming comms routines exit (on shutdown or if network link closed)
// error - Any errors will be sent on this channel. The channel is closed when all comms routines have shut down
//
// Note: The comms routines monitoring oboundp and obound will not shutdown until those channels are both closed. Any messages received between the
// connection being closed and those channels being closed will generate errors (and nothing will be sent). That way the chance of a deadlock is
Expand Down
4 changes: 2 additions & 2 deletions ping.go
Expand Up @@ -58,8 +58,8 @@ func keepalive(c *client, conn io.Writer) {
if atomic.LoadInt32(&c.pingOutstanding) == 0 {
DEBUG.Println(PNG, "keepalive sending ping")
ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
// We don't want to wait behind large messages being sent, the Write call
// will block until it it able to send the packet.
// We don't want to wait behind large messages being sent, the `Write` call
// will block until it is able to send the packet.
atomic.StoreInt32(&c.pingOutstanding, 1)
if err := ping.Write(conn); err != nil {
ERROR.Println(PNG, err)
Expand Down

0 comments on commit 079a117

Please sign in to comment.