Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Exponential backoff for server reconnects #1224

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 52 additions & 16 deletions nats.go
Expand Up @@ -50,8 +50,7 @@ const (
Version = "1.24.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
DefaultReconnectWait = 2 * time.Second
DefaultMaxReconnect = -1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to change into a span of time but not sure about changing this var as the default since has been like that from the start... maybe as an option to test out new behavior like MaxTotalReconnectTime(2*time.Minute)? We also need to be able to set max reconnect to -1 so that it means that we want the client to reconnect forever and never give up reconnecting

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so on the websocket client under firefox we get huge issue, because aside from the fact that they don't honor socket requests to cancel etc, if the client doesn't somewhat line up, all sorts of crazy stuff happens - Firefox is the only one to implement that, but at any rate here's the description

https://www.rfc-editor.org/rfc/rfc6455#section-7.2.3

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piotrpio's change is right in line with this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am OK with values as we have them, with possibly adding a new option "ExponentialBackoff" which would be or the start backoff - understanding that it doubles every try and then at 10 increases, it resets - The trick is that at 2s start, that means 17m interval on the 10th try which is possibly way too long. Possibly it is right to say the first retry is like 25ms or so... - That would set the upper bound near 1m if I have my maths right.

DefaultReconnectJitter = 100 * time.Millisecond
DefaultReconnectJitterTLS = time.Second
DefaultTimeout = 2 * time.Second
Expand All @@ -62,6 +61,10 @@ const (
RequestChanLen = 8
DefaultDrainTimeout = 30 * time.Second
LangString = "go"

// DEPRECATED: Client now uses [nats.DefaultReconnectBackoffHandler] to
// handle default reconnect wait time.
DefaultReconnectWait = 2 * time.Second
)

const (
Expand Down Expand Up @@ -143,17 +146,17 @@ var (
// GetDefaultOptions returns default configuration options for the client.
func GetDefaultOptions() Options {
return Options{
AllowReconnect: true,
MaxReconnect: DefaultMaxReconnect,
ReconnectWait: DefaultReconnectWait,
ReconnectJitter: DefaultReconnectJitter,
ReconnectJitterTLS: DefaultReconnectJitterTLS,
Timeout: DefaultTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
ReconnectBufSize: DefaultReconnectBufSize,
DrainTimeout: DefaultDrainTimeout,
AllowReconnect: true,
MaxReconnect: DefaultMaxReconnect,
ReconnectJitter: DefaultReconnectJitter,
ReconnectJitterTLS: DefaultReconnectJitterTLS,
Timeout: DefaultTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
ReconnectBufSize: DefaultReconnectBufSize,
DrainTimeout: DefaultDrainTimeout,
IgnoreAuthErrorAbort: true,
}
}

Expand Down Expand Up @@ -470,6 +473,7 @@ type Options struct {

// IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting
// subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy).
// DEPRECATED: This option will be removed in future releases.
IgnoreAuthErrorAbort bool

// SkipHostLookup skips the DNS lookup for the server hostname.
Expand Down Expand Up @@ -1260,13 +1264,22 @@ func CustomInboxPrefix(p string) Option {

// IgnoreAuthErrorAbort opts out of the default connect behavior of aborting
// subsequent reconnect attempts if server returns the same auth error twice.
// DEPRECATED: This option is now set to 'true' by default, therefore this option will be removed in future releases.
func IgnoreAuthErrorAbort() Option {
return func(o *Options) error {
o.IgnoreAuthErrorAbort = true
return nil
}
}

// AbortOnAuthErrors causes the client to bail out after 2 subsequent auth connection errors.
func AbortOnAuthErrors() Option {
return func(o *Options) error {
o.IgnoreAuthErrorAbort = false
return nil
}
}

// SkipHostLookup is an Option to skip the host lookup when connecting to a server.
func SkipHostLookup() Option {
return func(o *Options) error {
Expand Down Expand Up @@ -2559,6 +2572,28 @@ func (nc *Conn) stopPingTimer() {
}
}

// DefaultReconnectBackoffHandler returns a default reconnect exponential backoff interval.
// Base reconnect wait is 10ms, with x2 multiplier. Max wait time is 2 minutes.
// 10ms, 20ms, 40ms, 80ms...2m
// A random jitter is added to the result.
func DefaultReconnectBackoffHandler(jitter time.Duration) ReconnectDelayHandler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this feature, I am wondering if some of the other options we have on the clients should be deprecated or go away (at the very least, this should be the default)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is default with this PR (I removed having ReconnectWait as default). I'm not sure we need to deprecate ReconnectWait, it's still a valid solution if user want's to have constant wait time between reconnects.

return func(attempts int) time.Duration {
// base interval is 10ms
backoff := 10 * time.Millisecond
for i := 0; i < attempts-1; i++ {
backoff *= 2
if backoff > 2*time.Minute {
backoff = 2 * time.Minute
break
}
}
if jitter > 0 {
jitter = time.Duration(rand.Int63n(int64(jitter)))
}
return backoff + jitter
}
}

// Try to reconnect using the option parameters.
// This function assumes we are allowed to reconnect.
func (nc *Conn) doReconnect(err error) {
Expand Down Expand Up @@ -2596,18 +2631,19 @@ func (nc *Conn) doReconnect(err error) {
var wlf int

var jitter time.Duration
var rw time.Duration
// If a custom reconnect delay handler is set, this takes precedence.
crd := nc.Opts.CustomReconnectDelayCB
if crd == nil {
rw = nc.Opts.ReconnectWait
rw := nc.Opts.ReconnectWait
if crd == nil && rw == 0 {
// TODO: since we sleep only after the whole list has been tried, we can't
// rely on individual *srv to know if it is a TLS or non-TLS url.
// We have to pick which type of jitter to use, for now, we use these hints:
jitter = nc.Opts.ReconnectJitter
if nc.Opts.Secure || nc.Opts.TLSConfig != nil {
jitter = nc.Opts.ReconnectJitterTLS
}

crd = DefaultReconnectBackoffHandler(jitter)
}

for i := 0; len(nc.srvPool) > 0; {
Expand Down
61 changes: 53 additions & 8 deletions nats_test.go
Expand Up @@ -511,6 +511,7 @@ func TestSelectNextServer(t *testing.T) {
opts := GetDefaultOptions()
opts.Servers = testServers
opts.NoRandomize = true
opts.MaxReconnect = 60
nc := &Conn{Opts: opts}
if err := nc.setupServerPool(); err != nil {
t.Fatalf("Problem setting up Server Pool: %v\n", err)
Expand Down Expand Up @@ -1609,14 +1610,14 @@ func TestExpiredAuthentication(t *testing.T) {
name string
expectedProto string
expectedErr error
ignoreAbort bool
withAuthAbort bool
}{
{"expired users credentials", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, false},
{"revoked users credentials", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, false},
{"expired account", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, false},
{"expired users credentials", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, true},
{"revoked users credentials", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, true},
{"expired account", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, true},
{"expired users credentials, abort connection", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, false},
{"revoked users credentials, abort connection", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, false},
{"expired account, abort connection", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, false},
} {
t.Run(test.name, func(t *testing.T) {
l, e := net.Listen("tcp", "127.0.0.1:0")
Expand Down Expand Up @@ -1678,16 +1679,16 @@ func TestExpiredAuthentication(t *testing.T) {
ch <- true
}),
}
if test.ignoreAbort {
opts = append(opts, IgnoreAuthErrorAbort())
if test.withAuthAbort {
opts = append(opts, AbortOnAuthErrors())
}
nc, err := Connect(url, opts...)
if err != nil {
t.Fatalf("Expected to connect, got %v", err)
}
defer nc.Close()

if test.ignoreAbort {
if !test.withAuthAbort {
// We expect more than 3 errors, as the connect attempt should not be aborted after 2 failed attempts.
for i := 0; i < 4; i++ {
select {
Expand Down Expand Up @@ -2171,7 +2172,7 @@ func BenchmarkNextMsgNoTimeout(b *testing.B) {
}
}

func TestAuthErrorOnReconnect(t *testing.T) {
func TestAuthErrorOnReconnectWithAuthErrorAbort(t *testing.T) {
// This is a bit of an artificial test, but it is to demonstrate
// that if the client is disconnected from a server (not due to an auth error),
// it will still correctly stop the reconnection logic if it gets twice an
Expand Down Expand Up @@ -2199,6 +2200,7 @@ func TestAuthErrorOnReconnect(t *testing.T) {
MaxReconnects(-1),
DontRandomize(),
ErrorHandler(func(_ *Conn, _ *Subscription, _ error) {}),
AbortOnAuthErrors(),
DisconnectErrHandler(func(_ *Conn, e error) {
dch <- true
}),
Expand Down Expand Up @@ -2948,6 +2950,49 @@ func TestInProcessConn(t *testing.T) {
}
}

func TestDefaultReconnectBackoffHandler(t *testing.T) {
tests := []struct {
name string
attempts int
jitter time.Duration
expectedRange []time.Duration
}{
{
name: "4 attempts, no jitter",
attempts: 4,
expectedRange: []time.Duration{80 * time.Millisecond},
},
{
name: "1 attempt, no jitter, return base value",
attempts: 1,
expectedRange: []time.Duration{10 * time.Millisecond},
},
{
name: "100 attempts, no jitter, return max",
attempts: 100,
expectedRange: []time.Duration{2 * time.Minute},
},
{
name: "4 attempts, with jitter",
attempts: 4,
jitter: 20 * time.Millisecond,
expectedRange: []time.Duration{80 * time.Millisecond, 99 * time.Millisecond},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cb := DefaultReconnectBackoffHandler(test.jitter)
res := cb(test.attempts)
if test.jitter == 0 {
if res != test.expectedRange[0] {
t.Fatalf("Invalid result; want: %s; got: %s", test.expectedRange[0], res)
}
}
})
}
}

func TestServerListWithTrailingComma(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()
Expand Down
1 change: 1 addition & 0 deletions test/conn_test.go
Expand Up @@ -2564,6 +2564,7 @@ func TestRetryOnFailedConnect(t *testing.T) {
nats.RetryOnFailedConnect(true),
nats.MaxReconnects(-1),
nats.ReconnectWait(15*time.Millisecond),
nats.AbortOnAuthErrors(),
nats.ReconnectHandler(func(_ *nats.Conn) {
ch <- true
}),
Expand Down
2 changes: 1 addition & 1 deletion test/js_test.go
Expand Up @@ -8629,7 +8629,7 @@ func TestJetStreamOrderedConsumerRecreateAfterReconnect(t *testing.T) {
}
hbMissed <- struct{}{}
}
nc, js := jsClient(t, s, nats.ErrorHandler(errHandler))
nc, js := jsClient(t, s, nats.ErrorHandler(errHandler), nats.ReconnectWait(500*time.Millisecond))
defer nc.Close()

if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}); err != nil {
Expand Down
10 changes: 0 additions & 10 deletions test/reconnect_test.go
Expand Up @@ -30,15 +30,6 @@ func startReconnectServer(t *testing.T) *server.Server {
return RunServerOnPort(22222)
}

func TestReconnectTotalTime(t *testing.T) {
opts := nats.GetDefaultOptions()
totalReconnectTime := time.Duration(opts.MaxReconnect) * opts.ReconnectWait
if totalReconnectTime < (2 * time.Minute) {
t.Fatalf("Total reconnect time should be at least 2 mins: Currently %v\n",
totalReconnectTime)
}
}

func TestDefaultReconnectJitter(t *testing.T) {
opts := nats.GetDefaultOptions()
if opts.ReconnectJitter != nats.DefaultReconnectJitter {
Expand Down Expand Up @@ -123,7 +114,6 @@ var reconnectOpts = nats.Options{
Url: "nats://127.0.0.1:22222",
AllowReconnect: true,
MaxReconnect: 10,
ReconnectWait: 100 * time.Millisecond,
Timeout: nats.DefaultTimeout,
}

Expand Down