Skip to content

Commit

Permalink
privval: increase read/write timeout to 5s and calculate ping interva…
Browse files Browse the repository at this point in the history
…l based on it (#5638)

Partially closes #5550
  • Loading branch information
melekes committed Nov 16, 2020
1 parent 5ba30e6 commit f80a1bc
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Expand Up @@ -24,3 +24,4 @@ program](https://hackerone.com/tendermint).
### BUG FIXES:

- [consensus] [\#4895](https://github.com/tendermint/tendermint/pull/4895) Cache the address of the validator to reduce querying a remote KMS (@joe-bowman)
- [privval] \#5638 Increase read/write timeout to 5s and calculate ping interval based on it (@JoeKash)
14 changes: 8 additions & 6 deletions privval/signer_dialer_endpoint.go
Expand Up @@ -15,24 +15,26 @@ const (
// SignerServiceEndpointOption sets an optional parameter on the SignerDialerEndpoint.
type SignerServiceEndpointOption func(*SignerDialerEndpoint)

// SignerDialerEndpointTimeoutReadWrite sets the read and write timeout for connections
// from external signing processes.
// SignerDialerEndpointTimeoutReadWrite sets the read and write timeout for
// connections from client processes.
func SignerDialerEndpointTimeoutReadWrite(timeout time.Duration) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.timeoutReadWrite = timeout }
}

// SignerDialerEndpointConnRetries sets the amount of attempted retries to acceptNewConnection.
// SignerDialerEndpointConnRetries sets the amount of attempted retries to
// acceptNewConnection.
func SignerDialerEndpointConnRetries(retries int) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.maxConnRetries = retries }
}

// SignerDialerEndpointRetryWaitInterval sets the retry wait interval to a custom value
// SignerDialerEndpointRetryWaitInterval sets the retry wait interval to a
// custom value.
func SignerDialerEndpointRetryWaitInterval(interval time.Duration) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.retryWait = interval }
}

// SignerDialerEndpoint dials using its dialer and responds to any
// signature requests using its privVal.
// SignerDialerEndpoint dials using its dialer and responds to any signature
// requests using its privVal.
type SignerDialerEndpoint struct {
signerEndpoint

Expand Down
2 changes: 1 addition & 1 deletion privval/signer_endpoint.go
Expand Up @@ -12,7 +12,7 @@ import (
)

const (
defaultTimeoutReadWriteSeconds = 3
defaultTimeoutReadWriteSeconds = 5
)

type signerEndpoint struct {
Expand Down
39 changes: 30 additions & 9 deletions privval/signer_listener_endpoint.go
Expand Up @@ -10,11 +10,22 @@ import (
"github.com/tendermint/tendermint/libs/log"
)

// SignerValidatorEndpointOption sets an optional parameter on the SocketVal.
type SignerValidatorEndpointOption func(*SignerListenerEndpoint)
// SignerListenerEndpointOption sets an optional parameter on the SignerListenerEndpoint.
type SignerListenerEndpointOption func(*SignerListenerEndpoint)

// SignerListenerEndpointTimeoutReadWrite sets the read and write timeout for
// connections from external signing processes.
//
// Default: 5s
func SignerListenerEndpointTimeoutReadWrite(timeout time.Duration) SignerListenerEndpointOption {
return func(sl *SignerListenerEndpoint) { sl.signerEndpoint.timeoutReadWrite = timeout }
}

// SignerListenerEndpoint listens for an external process to dial in
// and keeps the connection alive by dropping and reconnecting
// SignerListenerEndpoint listens for an external process to dial in and keeps
// the connection alive by dropping and reconnecting.
//
// The process will send pings every ~3s (read/write timeout * 2/3) to keep the
// connection alive.
type SignerListenerEndpoint struct {
signerEndpoint

Expand All @@ -24,6 +35,7 @@ type SignerListenerEndpoint struct {

timeoutAccept time.Duration
pingTimer *time.Ticker
pingInterval time.Duration

instanceMtx sync.Mutex // Ensures instance public methods access, i.e. SendRequest
}
Expand All @@ -32,23 +44,31 @@ type SignerListenerEndpoint struct {
func NewSignerListenerEndpoint(
logger log.Logger,
listener net.Listener,
options ...SignerListenerEndpointOption,
) *SignerListenerEndpoint {
sc := &SignerListenerEndpoint{
sl := &SignerListenerEndpoint{
listener: listener,
timeoutAccept: defaultTimeoutAcceptSeconds * time.Second,
}

sc.BaseService = *cmn.NewBaseService(logger, "SignerListenerEndpoint", sc)
sc.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second
return sc
sl.BaseService = *cmn.NewBaseService(logger, "SignerListenerEndpoint", sl)
sl.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second

for _, optionFunc := range options {
optionFunc(sl)
}

return sl
}

// OnStart implements cmn.Service.
func (sl *SignerListenerEndpoint) OnStart() error {
sl.connectRequestCh = make(chan struct{})
sl.connectionAvailableCh = make(chan net.Conn)

sl.pingTimer = time.NewTicker(defaultPingPeriodMilliseconds * time.Millisecond)
// NOTE: ping timeout must be less than read/write timeout
sl.pingInterval = time.Duration(sl.signerEndpoint.timeoutReadWrite.Milliseconds()*2/3) * time.Millisecond
sl.pingTimer = time.NewTicker(sl.pingInterval)

go sl.serviceLoop()
go sl.pingLoop()
Expand Down Expand Up @@ -116,6 +136,7 @@ func (sl *SignerListenerEndpoint) ensureConnection(maxWait time.Duration) error
}

// block until connected or timeout
sl.Logger.Info("SignerListener: Blocking for connection")
sl.triggerConnect()
err := sl.WaitConnection(sl.connectionAvailableCh, maxWait)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion privval/signer_listener_endpoint_test.go
Expand Up @@ -155,7 +155,11 @@ func newSignerListenerEndpoint(logger log.Logger, addr string, timeoutReadWrite
listener = tcpLn
}

return NewSignerListenerEndpoint(logger, listener)
return NewSignerListenerEndpoint(
logger,
listener,
SignerListenerEndpointTimeoutReadWrite(testTimeoutReadWrite),
)
}

func startListenerEndpointAsync(t *testing.T, sle *SignerListenerEndpoint, endpointIsOpenCh chan struct{}) {
Expand Down
3 changes: 1 addition & 2 deletions privval/socket_listeners.go
Expand Up @@ -9,8 +9,7 @@ import (
)

const (
defaultTimeoutAcceptSeconds = 3
defaultPingPeriodMilliseconds = 100
defaultTimeoutAcceptSeconds = 3
)

// timeoutError can be used to check if an error returned from the netp package
Expand Down

0 comments on commit f80a1bc

Please sign in to comment.