Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2464 Add timeout for RTT monitor hello operations. #994

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 2
"times": 4
},
"data": {
"failCommands": [
Expand Down Expand Up @@ -162,22 +162,6 @@
}
]
}
},
{
"name": "assertEventCount",
"object": "testRunner",
"arguments": {
"event": "ServerMarkedUnknownEvent",
"count": 1
}
},
{
"name": "assertEventCount",
"object": "testRunner",
"arguments": {
"event": "PoolClearedEvent",
"count": 1
}
}
],
"expectations": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ tests:
documents:
- _id: 1
- _id: 2
# Configure the next streaming hello check to fail with a command
# error.
# Use times: 2 so that the RTT hello is blocked as well.
# Configure the next streaming hello check to fail with a command error.
# Use "times: 4" to increase the probability that the Monitor check fails
# since the RTT hello may trigger this failpoint one or many times as
# well.
- name: configureFailPoint
object: testRunner
arguments:
failPoint:
configureFailPoint: failCommand
mode: { times: 2 }
mode: { times: 4 }
data:
failCommands: ["hello", "isMaster"]
appName: commandErrorCheckTest
Expand All @@ -119,17 +120,19 @@ tests:
documents:
- _id: 3
- _id: 4
# Assert the server was marked Unknown and pool was cleared exactly once.
- name: assertEventCount
object: testRunner
arguments:
event: ServerMarkedUnknownEvent
count: 1
- name: assertEventCount
object: testRunner
arguments:
event: PoolClearedEvent
count: 1
# We cannot assert the server was marked Unknown and pool was cleared an
# exact number of times because the RTT hello may have triggered this
# failpoint one or many times as well.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can leave this up to the spec PR, but if this is not an assertion we plan on uncommenting eventually, I'm not sure I see why we don't just remove it entirely.

# - name: assertEventCount
# object: testRunner
# arguments:
# event: ServerMarkedUnknownEvent
# count: 1
# - name: assertEventCount
# object: testRunner
# arguments:
# event: PoolClearedEvent
# count: 1

expectations:
- command_started_event:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 2
"times": 4
},
"data": {
"failCommands": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,15 @@ tests:
- _id: 1
- _id: 2
# Configure the next streaming hello check to fail with a non-timeout
# network error. Use times: 2 to ensure that the the Monitor check fails
# since the RTT hello may trigger this failpoint as well.
# network error. Use "times: 4" to increase the probability that the
# Monitor check fails since the RTT hello may trigger this failpoint one
# or many times as well.
- name: configureFailPoint
object: testRunner
arguments:
failPoint:
configureFailPoint: failCommand
mode: { times: 2 }
mode: { times: 4 }
data:
failCommands: ["hello", "isMaster"]
appName: networkErrorCheckTest
Expand All @@ -116,8 +117,8 @@ tests:
- _id: 3
- _id: 4
# We cannot assert the server was marked Unknown and pool was cleared an
# exact number of times because the RTT hello may or may not have
# triggered this failpoint as well.
# exact number of times because the RTT hello may have triggered this
# failpoint one or many times as well.
# - name: assertEventCount
# object: testRunner
# arguments:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 2
"times": 4
},
"data": {
"failCommands": [
Expand Down Expand Up @@ -160,22 +160,6 @@
}
]
}
},
{
"name": "assertEventCount",
"object": "testRunner",
"arguments": {
"event": "ServerMarkedUnknownEvent",
"count": 1
}
},
{
"name": "assertEventCount",
"object": "testRunner",
"arguments": {
"event": "PoolClearedEvent",
"count": 1
}
}
],
"expectations": [
Expand Down
32 changes: 18 additions & 14 deletions data/server-discovery-and-monitoring/integration/hello-timeout.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,16 @@ tests:
documents:
- _id: 1
- _id: 2
# Configure the next streaming hello check to fail with a timeout
# Use times: 2 so that the RTT hello is blocked as well.
# Configure the next streaming hello check to fail with a timeout.
# Use "times: 4" to increase the probability that the Monitor check times
# out since the RTT hello may trigger this failpoint one or many times as
# well.
- name: configureFailPoint
object: testRunner
arguments:
failPoint:
configureFailPoint: failCommand
mode: { times: 2 }
mode: { times: 4 }
data:
failCommands: ["hello", "isMaster"]
appName: timeoutMonitorCheckTest
Expand Down Expand Up @@ -119,17 +121,19 @@ tests:
documents:
- _id: 3
- _id: 4
# Assert the server was marked Unknown and pool was cleared exactly once.
- name: assertEventCount
object: testRunner
arguments:
event: ServerMarkedUnknownEvent
count: 1
- name: assertEventCount
object: testRunner
arguments:
event: PoolClearedEvent
count: 1
# We cannot assert the server was marked Unknown and pool was cleared an
# exact number of times because the RTT hello may have triggered this
# failpoint one or many times as well.
# - name: assertEventCount
# object: testRunner
# arguments:
# event: ServerMarkedUnknownEvent
# count: 1
# - name: assertEventCount
# object: testRunner
# arguments:
# event: PoolClearedEvent
# count: 1

expectations:
- command_started_event:
Expand Down
93 changes: 62 additions & 31 deletions x/mongo/driver/topology/rtt_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@ const (
)

type rttConfig struct {
interval time.Duration
minRTTWindow time.Duration // Window size to calculate minimum RTT over.
// The minimum interval between RTT measurements. The actual interval may be greater if running
// the operation takes longer than the interval.
interval time.Duration

// The timeout applied to running the "hello" operation. If the timeout is reached while running
// the operation, the RTT sample is discarded. The default is 1 minute.
timeout time.Duration

minRTTWindow time.Duration
matthewdale marked this conversation as resolved.
Show resolved Hide resolved
createConnectionFn func() *connection
createOperationFn func(driver.Connection) *operation.Hello
}
Expand Down Expand Up @@ -77,8 +84,6 @@ func (r *rttMonitor) disconnect() {

func (r *rttMonitor) start() {
defer r.closeWg.Done()
ticker := time.NewTicker(r.cfg.interval)
defer ticker.Stop()

var conn *connection
defer func() {
Expand All @@ -93,9 +98,27 @@ func (r *rttMonitor) start() {
}
}()

ticker := time.NewTicker(r.cfg.interval)
defer ticker.Stop()

for {
conn = r.runHello(conn)
conn := r.cfg.createConnectionFn()
err := conn.connect(r.ctx)

// Add an RTT sample from the new connection handshake and start a runHellos() loop if we
// successfully established the new connection. Otherwise, close the connection and try to
// create another new connection.
if err == nil {
r.addSample(conn.helloRTT)
r.runHellos(conn)
}

// Close any connection here because we're either about to try to create another new
// connection or we're about to exit the loop.
_ = conn.close()

// If a connection error happens quickly, always wait for the monitoring interval to try
// to create a new connection to prevent creating connections too quickly.
select {
case <-ticker.C:
case <-r.ctx.Done():
Expand All @@ -104,37 +127,45 @@ func (r *rttMonitor) start() {
}
}

// runHello runs a "hello" operation using the provided connection, measures the duration, and adds
// the duration as an RTT sample and returns the connection used. If the provided connection is nil
// or closed, runHello tries to establish a new connection. If the "hello" operation returns an
// error, runHello closes the connection.
func (r *rttMonitor) runHello(conn *connection) *connection {
if conn == nil || conn.closed() {
conn := r.cfg.createConnectionFn()
// runHellos runs "hello" operations in a loop using the provided connection, measuring and
// recording the operation durations as RTT samples. If it encounters any errors, it returns.
func (r *rttMonitor) runHellos(conn *connection) {
ticker := time.NewTicker(r.cfg.interval)
defer ticker.Stop()

err := conn.connect(r.ctx)
if err != nil {
return nil
for {
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
// Assume that the connection establishment recorded the first RTT sample, so wait for the
// first tick before trying to record another RTT sample.
select {
case <-ticker.C:
case <-r.ctx.Done():
return
}

// If we just created a new connection, record the "hello" RTT from the new connection and
// return the new connection. Don't run another "hello" command this interval because it's
// now unnecessary.
r.addSample(conn.helloRTT)
return conn
}
// Create a Context with the operation timeout specified in the RTT monitor config. If a
// timeout is not set in the RTT monitor config, default to the connection's
// "connectTimeoutMS". The purpose of the timeout is to allow the RTT monitor to continue
// monitoring server RTTs after an operation gets stuck. An operation can get stuck if the
// server or a proxy stops responding to requests on the RTT connection but does not close
// the TCP socket, effectively creating an operation that will never complete. We expect
// that "connectTimeoutMS" provides at least enough time for a single round trip.
timeout := r.cfg.timeout
if timeout <= 0 {
timeout = conn.config.connectTimeout
}
ctx, cancel := context.WithTimeout(r.ctx, timeout)

start := time.Now()
err := r.cfg.createOperationFn(initConnection{conn}).Execute(r.ctx)
if err != nil {
// Errors from the RTT monitor do not reset the RTTs or update the topology, so we close the
// existing connection and recreate it on the next check.
_ = conn.close()
return nil
start := time.Now()
err := r.cfg.createOperationFn(initConnection{conn}).Execute(ctx)
cancel()
if err != nil {
return
}
// Only record a sample if the "hello" operation was successful. If it was not successful,
// the operation may not have actually performed a complete round trip, so the duration may
// be artificially short.
r.addSample(time.Since(start))
}
r.addSample(time.Since(start))

return conn
}

// reset sets the average and min RTT to 0. This should only be called from the server monitor when an error
Expand Down