diff --git a/fvt_client_test.go b/fvt_client_test.go index bfb3760..cf57901 100644 --- a/fvt_client_test.go +++ b/fvt_client_test.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "runtime" + "strconv" "sync" "testing" "time" @@ -1545,3 +1546,74 @@ func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) { } p.Disconnect(250) // Close publisher } + +// Test_ResumeSubsMaxInflight - Check the MaxResumePubInFlight option. +// This is difficult to test without control of the broker (because we will be communicating via the broker not +// directly. However due to the way resume works when there is no limit to inflight messages message ordering is not +// guaranteed. However with SetMaxResumePubInFlight(1) it is guaranteed so we use that to test. +// On my PC (using mosquitto under docker) running this without SetMaxResumePubInFlight(1) will fail with 1000 messages +// (generally passes if only 100 are sent). With the option set it always passes. +func Test_ResumeSubsMaxInflight(t *testing.T) { + topic := "/test/ResumeSubsMaxInflight" + var qos byte = 1 + + // When a connection is made with messages in the store normally it would be expected that many messages will be + // transmitted simultaneously; using MaxResumePubInFlight we can limit this to 1. + // subscribe to topic before establishing a connection, and publish a message after the publish client has connected successfully + sops := NewClientOptions().SetClientID("rsmif-Sub").AddBroker(FVTTCP).SetOrderMatters(true) + s := NewClient(sops) // s = subscriber + if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil { + t.Fatalf("Error on subscriber Client.Connect(): %v", sToken.Error()) + } + + incommingMsg := make(chan int, 1000) + var f MessageHandler = func(client Client, msg Message) { + num, _ := strconv.Atoi(string(msg.Payload())) + incommingMsg <- num + } + + if sToken := s.Subscribe(topic, qos, f); sToken.Wait() && sToken.Error() != nil { + t.Fatalf("Error on subscriber Client.Subscribe(): %v", sToken.Error()) + } + + // Now we preload an ordered memory store with 100 messages and connect... + memStore := NewOrderedMemoryStore() + memStore.Open() + + for i := 0; i < 1000; i++ { + pub := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket) + pub.Qos = qos + pub.TopicName = topic + pub.Payload = []byte(strconv.Itoa(i)) + pub.MessageID = uint16(i + 1) + memStore.Put(outboundKeyFromMID(pub.Details().MessageID), pub) + time.Sleep(time.Nanosecond) + } + + pops := NewClientOptions().AddBroker(FVTTCP).SetClientID("rsmif-Pub").SetOrderMatters(false). + SetCleanSession(false).SetStore(memStore).SetMaxResumePubInFlight(1) + p := NewClient(pops) + if pToken := p.Connect(); pToken.Wait() && pToken.Error() != nil { // Note: messages will be received before this completes + t.Fatalf("Error on publisher Client.Connect(): %v", pToken.Error()) + } + // We should receive 100 * 1's + timeOut := time.NewTimer(30 * time.Second) + defer timeOut.Stop() +getLoop: + for i := 0; i < 1000; i++ { + select { + case <-timeOut.C: + t.Errorf("timed out waiting for messages (after receiving %d)", i) + break getLoop + case s := <-incommingMsg: + if s != i { + t.Errorf("received message out of order (expected %d, got %d)", i, s) + break getLoop + } + continue + } + } + + p.Disconnect(250) + s.Disconnect(250) +}