Skip to content

Commit

Permalink
Add back-off controller for sleep time of reconnection when connectio…
Browse files Browse the repository at this point in the history
…n lost is detected immediately after connecting. eclipse#589

Signed-off-by: Daichi Tomaru <banaoa7543@gmail.com>
  • Loading branch information
tomatod committed Dec 27, 2022
1 parent 4b066a0 commit 2749ad4
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 8 deletions.
75 changes: 75 additions & 0 deletions backoff.go
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Matt Brittan
* Daichi Tomaru
*/

package mqtt

import (
"sync"
"time"
)

// Controller for sleep period with backoff when some reconnection attempting or connection lost occure.
// It has statuses for each situation caused retry.
type backoffController struct {
sync.RWMutex
statusMap map[string]*backoffStatus
}

type backoffStatus struct {
lastSleepPeriod time.Duration
lastErrorTime time.Time
}

func newBackoffController() *backoffController {
return &backoffController{
statusMap: map[string]*backoffStatus{},
}
}

// Calculate next sleep period from initial and max one and elapsed time since last sleeping.
// Returned values are next sleep period and whether the error situation is continual.
// If connection errors continuouslly occurs, its sleep period is exponentially increased.
// Also if there is a lot of time between last and this error, sleep period is initialized.
func (b *backoffController) getBackoffSleepTime(
initSleepPeriod time.Duration, maxSleepPeriod time.Duration, situation string, processTime time.Duration,
) (time.Duration, bool) {
b.Lock()
defer b.Unlock()

status, exist := b.statusMap[situation]
if !exist {
b.statusMap[situation] = &backoffStatus{initSleepPeriod, time.Now()}
return initSleepPeriod, false
}

oldTime := status.lastErrorTime
status.lastErrorTime = time.Now()

// When there is a lot of time between last and this error, sleep period is initialized.
if status.lastErrorTime.Sub(oldTime) > (processTime * 2 + status.lastSleepPeriod) {
status.lastSleepPeriod = initSleepPeriod
return initSleepPeriod, false
}

if nextSleepPeriod := status.lastSleepPeriod * 2; nextSleepPeriod <= maxSleepPeriod {
status.lastSleepPeriod = nextSleepPeriod
} else {
status.lastSleepPeriod = maxSleepPeriod
}

return status.lastSleepPeriod, true
}
54 changes: 54 additions & 0 deletions backoff_test.go
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Matt Brittan
* Daichi Tomaru
*/

package mqtt

import (
"testing"
"time"
)

func TestGetBackoffSleepTime(t *testing.T) {
// Test for adding new situation
controller := newBackoffController()
if s, c := controller.getBackoffSleepTime(1 * time.Second, 5 * time.Second, "not-exist", 1 * time.Second); !((s == 1 * time.Second) && !c) {
t.Errorf("When new situation is added, period should be initSleepPeriod and naturally it shouldn't be continual error. s:%d c%t", s, c)
}

// Test for the continual error in the same situation and suppression of sleep period by maxSleepPeriod
controller.getBackoffSleepTime(10 * time.Second, 30 * time.Second, "multi", 1 * time.Second)
if s, c := controller.getBackoffSleepTime(10 * time.Second, 30 * time.Second, "multi", 1 * time.Second); !((s == 20 * time.Second) && c) {
t.Errorf("When same situation is called again, period should be increased and it should be regarded as a continual error. s:%d c%t", s, c)
}
if s, c := controller.getBackoffSleepTime(10 * time.Second, 30 * time.Second, "multi", 1 * time.Second); !((s == 30 * time.Second) && c) {
t.Errorf("A same situation is called three times. 10 * 2 * 2 = 40 but maxSleepPeriod is 30. So the next period should be 30. s:%d c%t", s, c)
}

// Test for initialization by elapsed time.
controller.getBackoffSleepTime(1 * time.Second, 128 * time.Second, "elapsed", 1 * time.Second)
controller.getBackoffSleepTime(1 * time.Second, 128 * time.Second, "elapsed", 1 * time.Second)
time.Sleep((1 * 2 + 1 * 2 + 1) * time.Second)
if s, c := controller.getBackoffSleepTime(1 * time.Second, 128 * time.Second, "elapsed", 1 * time.Second); !((s == 1 * time.Second) && !c) {
t.Errorf("Initialization should be triggered by elapsed time. s:%d c%t", s, c)
}

// Test when initial and max period is same.
controller.getBackoffSleepTime(1 * time.Second, 1 * time.Second, "same", 1 * time.Second)
if s, c := controller.getBackoffSleepTime(1 * time.Second, 1 * time.Second, "same", 1 * time.Second); !((s == 1 * time.Second) && c) {
t.Errorf("Sleep time should be always 1. s:%d c%t", s, c)
}
}
20 changes: 12 additions & 8 deletions client.go
Expand Up @@ -141,6 +141,8 @@ type client struct {
stop chan struct{} // Closed to request that workers stop
workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)

backoff *backoffController
}

// NewClient will create an MQTT v3.1.1 client with all of the options specified
Expand Down Expand Up @@ -169,6 +171,7 @@ func NewClient(o *ClientOptions) Client {
c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
c.obound = make(chan *PacketAndToken)
c.oboundP = make(chan *PacketAndToken)
c.backoff = newBackoffController()
return c
}

Expand Down Expand Up @@ -302,10 +305,17 @@ func (c *client) Connect() Token {
func (c *client) reconnect(connectionUp connCompletedFn) {
DEBUG.Println(CLI, "enter reconnect")
var (
sleep = 1 * time.Second
initSleep = 1 * time.Second
conn net.Conn
)

// If the reason of connection lost is same as the before one, sleep timer is set before attempting connection is started.
// Sleep time is exponentially increased as the same situation continues
if slp, isSameErr := c.backoff.getBackoffSleepTime(initSleep, c.options.MaxReconnectInterval, "connectionLost", 5 * time.Second); isSameErr {
DEBUG.Println(CLI, "Detect continual connection lost after reconnect, sleeping for", int(slp.Seconds()), "seconds")
time.Sleep(slp)
}

for {
if nil != c.options.OnReconnecting {
c.options.OnReconnecting(c, &c.options)
Expand All @@ -315,15 +325,9 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
if err == nil {
break
}
sleep, _ := c.backoff.getBackoffSleepTime(initSleep, c.options.MaxReconnectInterval, "attemptReconnection", c.options.ConnectTimeout)
DEBUG.Println(CLI, "Reconnect failed, sleeping for", int(sleep.Seconds()), "seconds:", err)
time.Sleep(sleep)
if sleep < c.options.MaxReconnectInterval {
sleep *= 2
}

if sleep > c.options.MaxReconnectInterval {
sleep = c.options.MaxReconnectInterval
}

if c.status.ConnectionStatus() != reconnecting { // Disconnect may have been called
if err := connectionUp(false); err != nil { // Should always return an error
Expand Down

0 comments on commit 2749ad4

Please sign in to comment.