Skip to content

Commit

Permalink
rpcclient: Add retry with backoffs to HTTP POST requests
Browse files Browse the repository at this point in the history
Motivation: When used as a library in LND, current behavior means that a
single request failing will result in the entire  process shutting down.

This adds behavior similar to the retries of persistent RPC connections
to HTTP request.

* Initial backoff: 500ms
* Linear increase
* Max retries: 10

Room for future improvement:
* Make configurable
* Add jitter
* Tests for retry behavior
  • Loading branch information
3nprob committed Aug 21, 2021
1 parent f5a1fb9 commit 5b22bc9
Showing 1 changed file with 60 additions and 61 deletions.
121 changes: 60 additions & 61 deletions rpcclient/infrastructure.go
Expand Up @@ -86,15 +86,11 @@ const (
// connectionRetryInterval is the amount of time to wait in between
// retries when automatically reconnecting to an RPC server.
connectionRetryInterval = time.Second * 5
)

// sendPostDetails houses an HTTP POST request to send to an RPC server as well
// as the original JSON-RPC command and a channel to reply on when the server
// responds with the result.
type sendPostDetails struct {
httpRequest *http.Request
jsonRequest *jsonRequest
}
// requestRetryInterval is the initial amount of time to wait in between
// retries when sending HTTP POST requests.
requestRetryInterval = time.Millisecond * 500
)

// jsonRequest holds information about a json request that is used to properly
// detect, interpret, and deliver a reply to it.
Expand Down Expand Up @@ -183,7 +179,7 @@ type Client struct {

// Networking infrastructure.
sendChan chan []byte
sendPostChan chan *sendPostDetails
sendPostChan chan *jsonRequest
connEstablished chan struct{}
disconnect chan struct{}
shutdown chan struct{}
Expand Down Expand Up @@ -765,10 +761,50 @@ out:
// handleSendPostMessage handles performing the passed HTTP request, reading the
// result, unmarshalling it, and delivering the unmarshalled result to the
// provided response channel.
func (c *Client) handleSendPostMessage(details *sendPostDetails) {
jReq := details.jsonRequest
log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
httpResponse, err := c.httpClient.Do(details.httpRequest)
func (c *Client) handleSendPostMessage(jReq *jsonRequest) {
protocol := "http"
if !c.config.DisableTLS {
protocol = "https"
}
url := protocol + "://" + c.config.Host

var err error
var backoff time.Duration
var httpResponse *http.Response
tries := 10
for i := 0; tries == 0 || i < tries; i++ {
bodyReader := bytes.NewReader(jReq.marshalledJSON)
httpReq, err := http.NewRequest("POST", url, bodyReader)
if err != nil {
jReq.responseChan <- &response{result: nil, err: err}
return
}
httpReq.Close = true
httpReq.Header.Set("Content-Type", "application/json")
for key, value := range c.config.ExtraHeaders {
httpReq.Header.Set(key, value)
}

// Configure basic access authorization.
user, pass, err := c.config.getAuth()
if err != nil {
jReq.responseChan <- &response{result: nil, err: err}
return
}
httpReq.SetBasicAuth(user, pass)

httpResponse, err = c.httpClient.Do(httpReq)
if err != nil {
backoff = requestRetryInterval * time.Duration(i+1)
if backoff > time.Minute {
backoff = time.Minute
}
log.Debugf("Failed command [%s] with id %d attempt %d. Retrying in %v... \n", jReq.method, jReq.id, i, backoff)
time.Sleep(backoff)
continue
}
break
}
if err != nil {
jReq.responseChan <- &response{err: err}
return
Expand Down Expand Up @@ -821,8 +857,8 @@ out:
// Send any messages ready for send until the shutdown channel
// is closed.
select {
case details := <-c.sendPostChan:
c.handleSendPostMessage(details)
case jReq := <-c.sendPostChan:
c.handleSendPostMessage(jReq)

case <-c.shutdown:
break out
Expand All @@ -834,8 +870,8 @@ out:
cleanup:
for {
select {
case details := <-c.sendPostChan:
details.jsonRequest.responseChan <- &response{
case jReq := <-c.sendPostChan:
jReq.responseChan <- &response{
result: nil,
err: ErrClientShutdown,
}
Expand All @@ -852,18 +888,17 @@ cleanup:
// sendPostRequest sends the passed HTTP request to the RPC server using the
// HTTP client associated with the client. It is backed by a buffered channel,
// so it will not block until the send channel is full.
func (c *Client) sendPostRequest(httpReq *http.Request, jReq *jsonRequest) {
func (c *Client) sendPostRequest(jReq *jsonRequest) {
// Don't send the message if shutting down.
select {
case <-c.shutdown:
jReq.responseChan <- &response{result: nil, err: ErrClientShutdown}
default:
}

c.sendPostChan <- &sendPostDetails{
jsonRequest: jReq,
httpRequest: httpReq,
}
log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)

c.sendPostChan <- jReq
}

// newFutureError returns a new future result channel that already has the
Expand All @@ -885,42 +920,6 @@ func receiveFuture(f chan *response) ([]byte, error) {
return r.result, r.err
}

// sendPost sends the passed request to the server by issuing an HTTP POST
// request using the provided response channel for the reply. Typically a new
// connection is opened and closed for each command when using this method,
// however, the underlying HTTP client might coalesce multiple commands
// depending on several factors including the remote server configuration.
func (c *Client) sendPost(jReq *jsonRequest) {
// Generate a request to the configured RPC server.
protocol := "http"
if !c.config.DisableTLS {
protocol = "https"
}
url := protocol + "://" + c.config.Host
bodyReader := bytes.NewReader(jReq.marshalledJSON)
httpReq, err := http.NewRequest("POST", url, bodyReader)
if err != nil {
jReq.responseChan <- &response{result: nil, err: err}
return
}
httpReq.Close = true
httpReq.Header.Set("Content-Type", "application/json")
for key, value := range c.config.ExtraHeaders {
httpReq.Header.Set(key, value)
}

// Configure basic access authorization.
user, pass, err := c.config.getAuth()
if err != nil {
jReq.responseChan <- &response{result: nil, err: err}
return
}
httpReq.SetBasicAuth(user, pass)

log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
c.sendPostRequest(httpReq, jReq)
}

// sendRequest sends the passed json request to the associated server using the
// provided response channel for the reply. It handles both websocket and HTTP
// POST mode depending on the configuration of the client.
Expand All @@ -935,7 +934,7 @@ func (c *Client) sendRequest(jReq *jsonRequest) {
log.Warn(err)
}
} else {
c.sendPost(jReq)
c.sendPostRequest(jReq)
}
return
}
Expand Down Expand Up @@ -1428,7 +1427,7 @@ func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error
ntfnHandlers: ntfnHandlers,
ntfnState: newNotificationState(),
sendChan: make(chan []byte, sendBufferSize),
sendPostChan: make(chan *sendPostDetails, sendPostBufferSize),
sendPostChan: make(chan *jsonRequest, sendPostBufferSize),
connEstablished: connEstablished,
disconnect: make(chan struct{}),
shutdown: make(chan struct{}),
Expand Down Expand Up @@ -1642,7 +1641,7 @@ func (c *Client) sendAsync() FutureGetBulkResult {
marshalledJSON: marshalledRequest,
responseChan: responseChan,
}
c.sendPost(&request)
c.sendPostRequest(&request)
return responseChan
}

Expand Down

0 comments on commit 5b22bc9

Please sign in to comment.