Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rearchitect status handling as per issue #605 #607

Merged
merged 1 commit into from Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
336 changes: 190 additions & 146 deletions client.go

Large diffs are not rendered by default.

62 changes: 40 additions & 22 deletions fvt_client_test.go
Expand Up @@ -126,7 +126,7 @@ func Test_Disconnect(t *testing.T) {
go func() {
c.Disconnect(250)
cli := c.(*client)
cli.status = connected
cli.status.forceConnectionStatus(connected)
c.Disconnect(250)
close(disconnectC)
}()
Expand Down Expand Up @@ -1191,29 +1191,36 @@ func Test_cleanUpMids_2(t *testing.T) {
ops.SetKeepAlive(10 * time.Second)

c := NewClient(ops)
cl := c.(*client)

if token := c.Connect(); token.Wait() && token.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", token.Error())
}

token := c.Publish("/test/cleanUP", 2, false, "cleanup test 2")
if len(c.(*client).messageIds.index) == 0 {
cl.messageIds.mu.Lock()
mq := len(c.(*client).messageIds.index)
cl.messageIds.mu.Unlock()
if mq == 0 {
t.Fatalf("Should be a token in the messageIDs, none found")
}
fmt.Println("Disconnecting", len(c.(*client).messageIds.index))
// fmt.Println("Disconnecting", len(cl.messageIds.index))
c.Disconnect(0)

fmt.Println("Wait on Token")
// We should be able to wait on this token without any issue
token.Wait()

if len(c.(*client).messageIds.index) > 0 {
cl.messageIds.mu.Lock()
mq = len(c.(*client).messageIds.index)
cl.messageIds.mu.Unlock()
if mq > 0 {
t.Fatalf("Should have cleaned up messageIDs, have %d left", len(c.(*client).messageIds.index))
}
if token.Error() == nil {
t.Fatal("token should have received an error on connection loss")
}
fmt.Println(token.Error())
// fmt.Println(token.Error())
}

func Test_ConnectRetry(t *testing.T) {
Expand Down Expand Up @@ -1339,7 +1346,6 @@ func Test_ResumeSubs(t *testing.T) {
t.Fatalf("Expected 1 packet to be in store")
}
packet := subMemStore.Get(ids[0])
fmt.Println("packet", packet)
if packet == nil {
t.Fatal("Failed to retrieve packet from store")
}
Expand Down Expand Up @@ -1471,11 +1477,12 @@ func Test_ResumeSubsWithReconnect(t *testing.T) {
c.Disconnect(250)
}

// Issue 209 - occasional deadlock when connections are lost unexpectedly
// Issue 509 - occasional deadlock when connections are lost unexpectedly
// This was quite a nasty deadlock which occurred in very rare circumstances; I could not come up with a reliable way of
// replicating this but the below would cause it to happen fairly consistently (when the test was run a decent number
// of times). Following the fix it ran 10,000 times without issue.
// go test -count 10000 -run DisconnectWhileProcessingIncomingPublish
//
// go test -count 10000 -run DisconnectWhileProcessingIncomingPublish
func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
topic := "/test/DisconnectWhileProcessingIncomingPublish"

Expand All @@ -1487,11 +1494,11 @@ func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {

sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetAutoReconnect(false) // We dont want the connection to be re-established
sops.SetAutoReconnect(false) // We don't want the connection to be re-established
sops.SetWriteTimeout(500 * time.Millisecond) // We will be sending a lot of publish messages and want go routines to clear...
// sops.SetOrderMatters(false)
sops.SetClientID("dwpip-sub")
// We need to know when the subscriber has lost its connection (this indicates that the deadlock has not occured)
// We need to know when the subscriber has lost its connection (this indicates that the deadlock has not occurred)
sDisconnected := make(chan struct{})
sops.SetConnectionLostHandler(func(Client, error) { close(sDisconnected) })

Expand Down Expand Up @@ -1523,20 +1530,23 @@ func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
i := 0
for {
p.Publish(topic, 1, false, fmt.Sprintf("test message: %d", i))
// After the connection goes down s.Publish will start blocking (this is not ideal but fixing its a problem for another time)
go func() { s.Publish(topic+"IGNORE", 1, false, fmt.Sprintf("test message: %d", i)) }()
// After the connection goes down s.Publish will start blocking (this is not ideal but fixing it's a problem for another time)
go func(i int) { s.Publish(topic+"IGNORE", 1, false, fmt.Sprintf("test message: %d", i)) }(i)
i++

if ctx.Err() != nil {
return
}
}
}()

// Wait until we have received a message (ensuring that the stream of messages has started)
delay := time.NewTimer(time.Second) // Be careful with timers as this will be run in a tight loop!
select {
case <-msgReceived: // All good
case <-time.After(time.Second):
if !delay.Stop() { // Cleanly close timer as this may be run in a tight loop!
<-delay.C
}
case <-delay.C:
t.Errorf("no messages received")
}

Expand All @@ -1545,34 +1555,42 @@ func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
err := dm.Write(s.conn)
if err != nil {
t.Fatalf("error dending disconnect packet: %s", err)
t.Fatalf("error sending disconnect packet: %s", err)
}

// Lets give the library up to a second to shutdown (indicated by the status changing)
delay = time.NewTimer(time.Second) // Be careful with timers as this will be run in a tight loop!
select {
case <-sDisconnected: // All good
case <-time.After(time.Second):
cancel() // no point leaving publisher running
time.Sleep(time.Second) // Allow publish calls to timeout (otherwise there will be tons of go routines running!)
if !delay.Stop() {
<-delay.C
}
case <-delay.C:
cancel() // no point leaving publisher running
time.Sleep(10 * time.Second) // Allow publish calls to timeout (otherwise there will be tons of go routines running!)
buf := make([]byte, 1<<20)
stacklen := runtime.Stack(buf, true)
t.Fatalf("connection was not lost as expected - probable deadlock. Stacktrace follows: %s", buf[:stacklen])
}

cancel() // no point leaving publisher running

delay = time.NewTimer(time.Second) // Be careful with timers as this will be run in a tight loop!
select {
case <-pubDone:
case <-time.After(time.Second):
t.Errorf("pubdone not closed within a second")
if !delay.Stop() {
<-delay.C
}
case <-delay.C:
t.Errorf("pubdone not closed within two seconds (probably due to load on system but may be an issue)")
}
p.Disconnect(250) // Close publisher
}

// Test_ResumeSubsMaxInflight - Check the MaxResumePubInFlight option.
// This is difficult to test without control of the broker (because we will be communicating via the broker not
// directly. However due to the way resume works when there is no limit to inflight messages message ordering is not
// guaranteed. However with SetMaxResumePubInFlight(1) it is guaranteed so we use that to test.
// directly. However, due to the way resume works when there is no limit to inflight messages message ordering is not
// guaranteed. However, with SetMaxResumePubInFlight(1) it is guaranteed so we use that to test.
// On my PC (using mosquitto under docker) running this without SetMaxResumePubInFlight(1) will fail with 1000 messages
// (generally passes if only 100 are sent). With the option set it always passes.
func Test_ResumeSubsMaxInflight(t *testing.T) {
Expand Down
11 changes: 6 additions & 5 deletions net.go
Expand Up @@ -150,7 +150,7 @@ type incomingComms struct {

// startIncomingComms initiates incoming communications; this includes starting a goroutine to process incoming
// messages.
// Accepts a channel of inbound messages from the store (persisted messages); note this must be closed as soon as the
// Accepts a channel of inbound messages from the store (persisted messages); note this must be closed as soon as
// everything in the store has been sent.
// Returns a channel that will be passed any received packets; this will be closed on a network error (and inboundFromStore closed)
func startIncomingComms(conn io.Reader,
Expand Down Expand Up @@ -332,7 +332,7 @@ func startOutgoingComms(conn net.Conn,
DEBUG.Println(NET, "outbound wrote disconnect, closing connection")
// As per the MQTT spec "After sending a DISCONNECT Packet the Client MUST close the Network Connection"
// Closing the connection will cause the goroutines to end in sequence (starting with incoming comms)
conn.Close()
_ = conn.Close()
}
case msg, ok := <-oboundFromIncoming: // message triggered by an inbound message (PubrecPacket or PubrelPacket)
if !ok {
Expand Down Expand Up @@ -370,9 +370,10 @@ type commsFns interface {
// startComms initiates goroutines that handles communications over the network connection
// Messages will be stored (via commsFns) and deleted from the store as necessary
// It returns two channels:
// packets.PublishPacket - Will receive publish packets received over the network.
// Closed when incoming comms routines exit (on shutdown or if network link closed)
// error - Any errors will be sent on this channel. The channel is closed when all comms routines have shut down
//
// packets.PublishPacket - Will receive publish packets received over the network.
// Closed when incoming comms routines exit (on shutdown or if network link closed)
// error - Any errors will be sent on this channel. The channel is closed when all comms routines have shut down
//
// Note: The comms routines monitoring oboundp and obound will not shutdown until those channels are both closed. Any messages received between the
// connection being closed and those channels being closed will generate errors (and nothing will be sent). That way the chance of a deadlock is
Expand Down
4 changes: 2 additions & 2 deletions ping.go
Expand Up @@ -58,8 +58,8 @@ func keepalive(c *client, conn io.Writer) {
if atomic.LoadInt32(&c.pingOutstanding) == 0 {
DEBUG.Println(PNG, "keepalive sending ping")
ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
// We don't want to wait behind large messages being sent, the Write call
// will block until it it able to send the packet.
// We don't want to wait behind large messages being sent, the `Write` call
// will block until it is able to send the packet.
atomic.StoreInt32(&c.pingOutstanding, 1)
if err := ping.Write(conn); err != nil {
ERROR.Println(PNG, err)
Expand Down