Skip to content

Commit

Permalink
rpcclient: Add retry with backoffs to HTTP POST requests
Browse files Browse the repository at this point in the history
Adds behavior similar to the retries of persistent RPC connections
to HTTP request.

* Initial backoff: 500ms
* Linear increase
* Max retries: 10

Room for future improvement:
* Make configurable
* Add jitter
* Tests for retry behavior
  • Loading branch information
3nprob committed Nov 14, 2021
1 parent 65e9868 commit 1181268
Showing 1 changed file with 60 additions and 61 deletions.
121 changes: 60 additions & 61 deletions rpcclient/infrastructure.go
Expand Up @@ -86,15 +86,11 @@ const (
// connectionRetryInterval is the amount of time to wait in between
// retries when automatically reconnecting to an RPC server.
connectionRetryInterval = time.Second * 5
)

// sendPostDetails houses an HTTP POST request to send to an RPC server as well
// as the original JSON-RPC command and a channel to reply on when the server
// responds with the result.
type sendPostDetails struct {
httpRequest *http.Request
jsonRequest *jsonRequest
}
// requestRetryInterval is the initial amount of time to wait in between
// retries when sending HTTP POST requests.
requestRetryInterval = time.Millisecond * 500
)

// jsonRequest holds information about a json request that is used to properly
// detect, interpret, and deliver a reply to it.
Expand Down Expand Up @@ -183,7 +179,7 @@ type Client struct {

// Networking infrastructure.
sendChan chan []byte
sendPostChan chan *sendPostDetails
sendPostChan chan *jsonRequest
connEstablished chan struct{}
disconnect chan struct{}
shutdown chan struct{}
Expand Down Expand Up @@ -765,10 +761,50 @@ out:
// handleSendPostMessage handles performing the passed HTTP request, reading the
// result, unmarshalling it, and delivering the unmarshalled result to the
// provided response channel.
func (c *Client) handleSendPostMessage(details *sendPostDetails) {
jReq := details.jsonRequest
log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
httpResponse, err := c.httpClient.Do(details.httpRequest)
func (c *Client) handleSendPostMessage(jReq *jsonRequest) {
protocol := "http"
if !c.config.DisableTLS {
protocol = "https"
}
url := protocol + "://" + c.config.Host

var err error
var backoff time.Duration
var httpResponse *http.Response
tries := 10
for i := 0; tries == 0 || i < tries; i++ {
bodyReader := bytes.NewReader(jReq.marshalledJSON)
httpReq, err := http.NewRequest("POST", url, bodyReader)
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
}
httpReq.Close = true
httpReq.Header.Set("Content-Type", "application/json")
for key, value := range c.config.ExtraHeaders {
httpReq.Header.Set(key, value)
}

// Configure basic access authorization.
user, pass, err := c.config.getAuth()
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
}
httpReq.SetBasicAuth(user, pass)

httpResponse, err = c.httpClient.Do(httpReq)
if err != nil {
backoff = requestRetryInterval * time.Duration(i+1)
if backoff > time.Minute {
backoff = time.Minute
}
log.Debugf("Failed command [%s] with id %d attempt %d. Retrying in %v... \n", jReq.method, jReq.id, i, backoff)
time.Sleep(backoff)
continue
}
break
}
if err != nil {
jReq.responseChan <- &Response{err: err}
return
Expand Down Expand Up @@ -821,8 +857,8 @@ out:
// Send any messages ready for send until the shutdown channel
// is closed.
select {
case details := <-c.sendPostChan:
c.handleSendPostMessage(details)
case jReq := <-c.sendPostChan:
c.handleSendPostMessage(jReq)

case <-c.shutdown:
break out
Expand All @@ -834,8 +870,8 @@ out:
cleanup:
for {
select {
case details := <-c.sendPostChan:
details.jsonRequest.responseChan <- &Response{
case jReq := <-c.sendPostChan:
jReq.responseChan <- &Response{
result: nil,
err: ErrClientShutdown,
}
Expand All @@ -852,18 +888,17 @@ cleanup:
// sendPostRequest sends the passed HTTP request to the RPC server using the
// HTTP client associated with the client. It is backed by a buffered channel,
// so it will not block until the send channel is full.
func (c *Client) sendPostRequest(httpReq *http.Request, jReq *jsonRequest) {
func (c *Client) sendPostRequest(jReq *jsonRequest) {
// Don't send the message if shutting down.
select {
case <-c.shutdown:
jReq.responseChan <- &Response{result: nil, err: ErrClientShutdown}
default:
}

c.sendPostChan <- &sendPostDetails{
jsonRequest: jReq,
httpRequest: httpReq,
}
log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)

c.sendPostChan <- jReq
}

// newFutureError returns a new future result channel that already has the
Expand All @@ -885,42 +920,6 @@ func ReceiveFuture(f chan *Response) ([]byte, error) {
return r.result, r.err
}

// sendPost sends the passed request to the server by issuing an HTTP POST
// request using the provided response channel for the reply. Typically a new
// connection is opened and closed for each command when using this method,
// however, the underlying HTTP client might coalesce multiple commands
// depending on several factors including the remote server configuration.
func (c *Client) sendPost(jReq *jsonRequest) {
// Generate a request to the configured RPC server.
protocol := "http"
if !c.config.DisableTLS {
protocol = "https"
}
url := protocol + "://" + c.config.Host
bodyReader := bytes.NewReader(jReq.marshalledJSON)
httpReq, err := http.NewRequest("POST", url, bodyReader)
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
}
httpReq.Close = true
httpReq.Header.Set("Content-Type", "application/json")
for key, value := range c.config.ExtraHeaders {
httpReq.Header.Set(key, value)
}

// Configure basic access authorization.
user, pass, err := c.config.getAuth()
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
}
httpReq.SetBasicAuth(user, pass)

log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
c.sendPostRequest(httpReq, jReq)
}

// sendRequest sends the passed json request to the associated server using the
// provided response channel for the reply. It handles both websocket and HTTP
// POST mode depending on the configuration of the client.
Expand All @@ -935,7 +934,7 @@ func (c *Client) sendRequest(jReq *jsonRequest) {
log.Warn(err)
}
} else {
c.sendPost(jReq)
c.sendPostRequest(jReq)
}
return
}
Expand Down Expand Up @@ -1428,7 +1427,7 @@ func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error
ntfnHandlers: ntfnHandlers,
ntfnState: newNotificationState(),
sendChan: make(chan []byte, sendBufferSize),
sendPostChan: make(chan *sendPostDetails, sendPostBufferSize),
sendPostChan: make(chan *jsonRequest, sendPostBufferSize),
connEstablished: connEstablished,
disconnect: make(chan struct{}),
shutdown: make(chan struct{}),
Expand Down Expand Up @@ -1642,7 +1641,7 @@ func (c *Client) sendAsync() FutureGetBulkResult {
marshalledJSON: marshalledRequest,
responseChan: responseChan,
}
c.sendPost(&request)
c.sendPostRequest(&request)
return responseChan
}

Expand Down

0 comments on commit 1181268

Please sign in to comment.