Skip to content

Commit

Permalink
GODRIVER-2464 Add timeout for RTT monitor hello operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewdale committed Jun 21, 2022
1 parent 95de0fb commit e1d920e
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 36 deletions.
94 changes: 58 additions & 36 deletions x/mongo/driver/topology/rtt_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,28 @@ const (
rttAlphaValue = 0.2
minSamples = 10
maxSamples = 500

// The default RTT monitor operation timeout is 1 minute. The purpose of the timeout is to
// allow the RTT monitor to continue monitoring server RTTs after an operation gets stuck. An
// operation can get stuck if the server or a proxy stops responding to requests on the RTT
// connection but does not close the TCP socket, effectively creating an operation that will
// never complete. We pick 1 minute because it is longer than the longest expected round-trip
// latency and short enough to allow the RTT monitor to recover in a reasonable period of time.
defaultTimeout = 1 * time.Minute
)

type rttConfig struct {
interval time.Duration
minRTTWindow time.Duration // Window size to calculate minimum RTT over.
// The minimum interval between RTT measurements. The actual interval may be greater if running
// the operation takes longer than the interval.
interval time.Duration

// Timeout applied to running the "hello" operation. If the timeout is reached while running the
// operation, the RTT sample is considered invalid and discarded. The default is 1 minute.
timeout time.Duration

// Window size to calculate minimum RTT over.
minRTTWindow time.Duration

createConnectionFn func() *connection
createOperationFn func(driver.Connection) *operation.Hello
}
Expand All @@ -50,6 +67,9 @@ func newRTTMonitor(cfg *rttConfig) *rttMonitor {
if cfg.interval <= 0 {
panic("RTT monitor interval must be greater than 0")
}
if cfg.timeout <= 0 {
cfg.timeout = defaultTimeout
}

ctx, cancel := context.WithCancel(context.Background())
// Determine the number of samples we need to keep to store the minWindow of RTT durations. The
Expand Down Expand Up @@ -77,8 +97,6 @@ func (r *rttMonitor) disconnect() {

func (r *rttMonitor) start() {
defer r.closeWg.Done()
ticker := time.NewTicker(r.cfg.interval)
defer ticker.Stop()

var conn *connection
defer func() {
Expand All @@ -93,48 +111,52 @@ func (r *rttMonitor) start() {
}
}()

for {
conn = r.runHello(conn)

select {
case <-ticker.C:
case <-r.ctx.Done():
return
}
}
}

// runHello runs a "hello" operation using the provided connection, measures the duration, and adds
// the duration as an RTT sample and returns the connection used. If the provided connection is nil
// or closed, runHello tries to establish a new connection. If the "hello" operation returns an
// error, runHello closes the connection.
func (r *rttMonitor) runHello(conn *connection) *connection {
if conn == nil || conn.closed() {
for r.ctx.Err() == nil {
conn := r.cfg.createConnectionFn()

err := conn.connect(r.ctx)
if err != nil {
return nil
_ = conn.close()
continue
}

// If we just created a new connection, record the "hello" RTT from the new connection and
// return the new connection. Don't run another "hello" command this interval because it's
// now unnecessary.
// We just created a new connection, so record the "hello" RTT from the new connection.
r.addSample(conn.helloRTT)
return conn
}

start := time.Now()
err := r.cfg.createOperationFn(initConnection{conn}).Execute(r.ctx)
if err != nil {
// Errors from the RTT monitor do not reset the RTTs or update the topology, so we close the
// existing connection and recreate it on the next check.
r.runHellos(conn)

// runHellos only returns when it encounters an error or is cancelled. In either case, we
// want to close the connection.
_ = conn.close()
return nil
}
r.addSample(time.Since(start))
}

// runHellos runs "hello" operations in a loop using the provided connection, measuring and
// recording the operation durations as RTT samples. If it encounters any errors, it returns.
func (r *rttMonitor) runHellos(conn *connection) {
ticker := time.NewTicker(r.cfg.interval)
defer ticker.Stop()

for {
// Assume that the connection establishment recorded the first RTT sample, so wait for the
// first tick before trying to record another RTT sample.
select {
case <-ticker.C:
case <-r.ctx.Done():
return
}

return conn
start := time.Now()
ctx, cancel := context.WithTimeout(r.ctx, r.cfg.timeout)
err := r.cfg.createOperationFn(initConnection{conn}).Execute(ctx)
cancel()
if err != nil {
return
}
// Only record a sample if the "hello" operation was successful. If it was not successful,
// the operation may not have actually performed a complete round trip, so the duration may
// be artificially short.
r.addSample(time.Since(start))
}
}

// reset sets the average and min RTT to 0. This should only be called from the server monitor when an error
Expand Down
103 changes: 103 additions & 0 deletions x/mongo/driver/topology/rtt_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import (
"io"
"math"
"net"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/mongo/address"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver"
"go.mongodb.org/mongo-driver/x/mongo/driver/drivertest"
Expand Down Expand Up @@ -219,6 +222,106 @@ func TestRTTMonitor(t *testing.T) {
rtt.reset()
}
})

// GODRIVER-2464
// Test that the RTT monitor can continue monitoring server RTTs after an operation gets stuck.
// An operation can get stuck if the server or a proxy stops responding to requests on the RTT
// connection but does not close the TCP socket, effectively creating an operation that will
// never complete.
t.Run("stuck operations time out", func(t *testing.T) {
t.Parallel()

// Start a goroutine that listens for and accepts TCP connections, reads requests, and
// responds with {"ok": 1}. The first 2 connections simulate "stuck" connections and never
// respond or close.
l, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

for i := 0; ; i++ {
conn, err := l.Accept()
if err != nil {
// The listen loop is cancelled by closing the listener, so there will always be
// an error here. Log the error to make debugging easier in case of unexpected
// errors.
t.Logf("Accept error: %v", err)
return
}

// Only close connections when the listener loop returns to prevent closing "stuck"
// connections while the test is running.
defer conn.Close()

wg.Add(1)
go func(i int) {
defer wg.Done()

buf := make([]byte, 256)
for {
if _, err := conn.Read(buf); err != nil {
// The connection read/write loop is cancelled by closing the connection,
// so may be an expected error here. Log the error to make debugging
// easier in case of unexpected errors.
t.Logf("Read error: %v", err)
return
}

// For the first 2 connections, read the request but never respond and don't
// close the connection. That simulates the behavior of a "stuck" connection.
if i < 2 {
return
}

if _, err := conn.Write(makeHelloReply()); err != nil {
// The connection read/write loop is cancelled by closing the connection,
// so may be an expected error here. Log the error to make debugging
// easier in case of unexpected errors.
t.Logf("Write error: %v", err)
return
}
}
}(i)
}
}()

rtt := newRTTMonitor(&rttConfig{
interval: 10 * time.Millisecond,
timeout: 100 * time.Millisecond,
createConnectionFn: func() *connection {
return newConnection(address.Address(l.Addr().String()))
},
createOperationFn: func(conn driver.Connection) *operation.Hello {
return operation.NewHello().Deployment(driver.SingleConnectionDeployment{C: conn})
},
})
rtt.connect()

assert.Eventuallyf(
t,
func() bool { return rtt.getRTT() > 0 },
1*time.Second,
10*time.Millisecond,
"expected getRTT() to return a positive duration within 1 second")
assert.Eventuallyf(
t,
func() bool { return rtt.getMinRTT() > 0 },
1*time.Second,
10*time.Millisecond,
"expected getMinRTT() to return a positive duration within 1 second")
assert.Eventuallyf(
t,
func() bool { return rtt.getRTT90() > 0 },
1*time.Second,
10*time.Millisecond,
"expected getRTT90() to return a positive duration within 1 second")

rtt.disconnect()
l.Close()
wg.Wait()
})
}

func TestMin(t *testing.T) {
Expand Down

0 comments on commit e1d920e

Please sign in to comment.