Skip to content

Commit

Permalink
GODRIVER-2464 Add timeout for RTT monitor hello operations. (#994)
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewdale committed Jul 12, 2022
1 parent 6f2489e commit d86e0aa
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 2
"times": 4
},
"data": {
"failCommands": [
Expand Down Expand Up @@ -162,22 +162,6 @@
}
]
}
},
{
"name": "assertEventCount",
"object": "testRunner",
"arguments": {
"event": "ServerMarkedUnknownEvent",
"count": 1
}
},
{
"name": "assertEventCount",
"object": "testRunner",
"arguments": {
"event": "PoolClearedEvent",
"count": 1
}
}
],
"expectations": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ tests:
documents:
- _id: 1
- _id: 2
# Configure the next streaming hello check to fail with a command
# error.
# Use times: 2 so that the RTT hello is blocked as well.
# Configure the next streaming hello check to fail with a command error.
# Use "times: 4" to increase the probability that the Monitor check fails
# since the RTT hello may trigger this failpoint one or many times as
# well.
- name: configureFailPoint
object: testRunner
arguments:
failPoint:
configureFailPoint: failCommand
mode: { times: 2 }
mode: { times: 4 }
data:
failCommands: ["hello", "isMaster"]
appName: commandErrorCheckTest
Expand All @@ -119,17 +120,19 @@ tests:
documents:
- _id: 3
- _id: 4
# Assert the server was marked Unknown and pool was cleared exactly once.
- name: assertEventCount
object: testRunner
arguments:
event: ServerMarkedUnknownEvent
count: 1
- name: assertEventCount
object: testRunner
arguments:
event: PoolClearedEvent
count: 1
# We cannot assert the server was marked Unknown and pool was cleared an
# exact number of times because the RTT hello may have triggered this
# failpoint one or many times as well.
# - name: assertEventCount
# object: testRunner
# arguments:
# event: ServerMarkedUnknownEvent
# count: 1
# - name: assertEventCount
# object: testRunner
# arguments:
# event: PoolClearedEvent
# count: 1

expectations:
- command_started_event:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 2
"times": 4
},
"data": {
"failCommands": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,15 @@ tests:
- _id: 1
- _id: 2
# Configure the next streaming hello check to fail with a non-timeout
# network error. Use times: 2 to ensure that the the Monitor check fails
# since the RTT hello may trigger this failpoint as well.
# network error. Use "times: 4" to increase the probability that the
# Monitor check fails since the RTT hello may trigger this failpoint one
# or many times as well.
- name: configureFailPoint
object: testRunner
arguments:
failPoint:
configureFailPoint: failCommand
mode: { times: 2 }
mode: { times: 4 }
data:
failCommands: ["hello", "isMaster"]
appName: networkErrorCheckTest
Expand All @@ -116,8 +117,8 @@ tests:
- _id: 3
- _id: 4
# We cannot assert the server was marked Unknown and pool was cleared an
# exact number of times because the RTT hello may or may not have
# triggered this failpoint as well.
# exact number of times because the RTT hello may have triggered this
# failpoint one or many times as well.
# - name: assertEventCount
# object: testRunner
# arguments:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 2
"times": 4
},
"data": {
"failCommands": [
Expand Down Expand Up @@ -160,22 +160,6 @@
}
]
}
},
{
"name": "assertEventCount",
"object": "testRunner",
"arguments": {
"event": "ServerMarkedUnknownEvent",
"count": 1
}
},
{
"name": "assertEventCount",
"object": "testRunner",
"arguments": {
"event": "PoolClearedEvent",
"count": 1
}
}
],
"expectations": [
Expand Down
32 changes: 18 additions & 14 deletions data/server-discovery-and-monitoring/integration/hello-timeout.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,16 @@ tests:
documents:
- _id: 1
- _id: 2
# Configure the next streaming hello check to fail with a timeout
# Use times: 2 so that the RTT hello is blocked as well.
# Configure the next streaming hello check to fail with a timeout.
# Use "times: 4" to increase the probability that the Monitor check times
# out since the RTT hello may trigger this failpoint one or many times as
# well.
- name: configureFailPoint
object: testRunner
arguments:
failPoint:
configureFailPoint: failCommand
mode: { times: 2 }
mode: { times: 4 }
data:
failCommands: ["hello", "isMaster"]
appName: timeoutMonitorCheckTest
Expand Down Expand Up @@ -119,17 +121,19 @@ tests:
documents:
- _id: 3
- _id: 4
# Assert the server was marked Unknown and pool was cleared exactly once.
- name: assertEventCount
object: testRunner
arguments:
event: ServerMarkedUnknownEvent
count: 1
- name: assertEventCount
object: testRunner
arguments:
event: PoolClearedEvent
count: 1
# We cannot assert the server was marked Unknown and pool was cleared an
# exact number of times because the RTT hello may have triggered this
# failpoint one or many times as well.
# - name: assertEventCount
# object: testRunner
# arguments:
# event: ServerMarkedUnknownEvent
# count: 1
# - name: assertEventCount
# object: testRunner
# arguments:
# event: PoolClearedEvent
# count: 1

expectations:
- command_started_event:
Expand Down
93 changes: 62 additions & 31 deletions x/mongo/driver/topology/rtt_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@ const (
)

type rttConfig struct {
interval time.Duration
minRTTWindow time.Duration // Window size to calculate minimum RTT over.
// The minimum interval between RTT measurements. The actual interval may be greater if running
// the operation takes longer than the interval.
interval time.Duration

// The timeout applied to running the "hello" operation. If the timeout is reached while running
// the operation, the RTT sample is discarded. The default is 1 minute.
timeout time.Duration

minRTTWindow time.Duration
createConnectionFn func() *connection
createOperationFn func(driver.Connection) *operation.Hello
}
Expand Down Expand Up @@ -77,8 +84,6 @@ func (r *rttMonitor) disconnect() {

func (r *rttMonitor) start() {
defer r.closeWg.Done()
ticker := time.NewTicker(r.cfg.interval)
defer ticker.Stop()

var conn *connection
defer func() {
Expand All @@ -93,9 +98,27 @@ func (r *rttMonitor) start() {
}
}()

ticker := time.NewTicker(r.cfg.interval)
defer ticker.Stop()

for {
conn = r.runHello(conn)
conn := r.cfg.createConnectionFn()
err := conn.connect(r.ctx)

// Add an RTT sample from the new connection handshake and start a runHellos() loop if we
// successfully established the new connection. Otherwise, close the connection and try to
// create another new connection.
if err == nil {
r.addSample(conn.helloRTT)
r.runHellos(conn)
}

// Close any connection here because we're either about to try to create another new
// connection or we're about to exit the loop.
_ = conn.close()

// If a connection error happens quickly, always wait for the monitoring interval to try
// to create a new connection to prevent creating connections too quickly.
select {
case <-ticker.C:
case <-r.ctx.Done():
Expand All @@ -104,37 +127,45 @@ func (r *rttMonitor) start() {
}
}

// runHello runs a "hello" operation using the provided connection, measures the duration, and adds
// the duration as an RTT sample and returns the connection used. If the provided connection is nil
// or closed, runHello tries to establish a new connection. If the "hello" operation returns an
// error, runHello closes the connection.
func (r *rttMonitor) runHello(conn *connection) *connection {
if conn == nil || conn.closed() {
conn := r.cfg.createConnectionFn()
// runHellos runs "hello" operations in a loop using the provided connection, measuring and
// recording the operation durations as RTT samples. If it encounters any errors, it returns.
func (r *rttMonitor) runHellos(conn *connection) {
ticker := time.NewTicker(r.cfg.interval)
defer ticker.Stop()

err := conn.connect(r.ctx)
if err != nil {
return nil
for {
// Assume that the connection establishment recorded the first RTT sample, so wait for the
// first tick before trying to record another RTT sample.
select {
case <-ticker.C:
case <-r.ctx.Done():
return
}

// If we just created a new connection, record the "hello" RTT from the new connection and
// return the new connection. Don't run another "hello" command this interval because it's
// now unnecessary.
r.addSample(conn.helloRTT)
return conn
}
// Create a Context with the operation timeout specified in the RTT monitor config. If a
// timeout is not set in the RTT monitor config, default to the connection's
// "connectTimeoutMS". The purpose of the timeout is to allow the RTT monitor to continue
// monitoring server RTTs after an operation gets stuck. An operation can get stuck if the
// server or a proxy stops responding to requests on the RTT connection but does not close
// the TCP socket, effectively creating an operation that will never complete. We expect
// that "connectTimeoutMS" provides at least enough time for a single round trip.
timeout := r.cfg.timeout
if timeout <= 0 {
timeout = conn.config.connectTimeout
}
ctx, cancel := context.WithTimeout(r.ctx, timeout)

start := time.Now()
err := r.cfg.createOperationFn(initConnection{conn}).Execute(r.ctx)
if err != nil {
// Errors from the RTT monitor do not reset the RTTs or update the topology, so we close the
// existing connection and recreate it on the next check.
_ = conn.close()
return nil
start := time.Now()
err := r.cfg.createOperationFn(initConnection{conn}).Execute(ctx)
cancel()
if err != nil {
return
}
// Only record a sample if the "hello" operation was successful. If it was not successful,
// the operation may not have actually performed a complete round trip, so the duration may
// be artificially short.
r.addSample(time.Since(start))
}
r.addSample(time.Since(start))

return conn
}

// reset sets the average and min RTT to 0. This should only be called from the server monitor when an error
Expand Down

0 comments on commit d86e0aa

Please sign in to comment.