Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpcclient: Add retry with backoffs to HTTP POST requests #1743

Merged
merged 1 commit into from Nov 16, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
121 changes: 60 additions & 61 deletions rpcclient/infrastructure.go
Expand Up @@ -86,15 +86,11 @@ const (
// connectionRetryInterval is the amount of time to wait in between
// retries when automatically reconnecting to an RPC server.
connectionRetryInterval = time.Second * 5
)

// sendPostDetails houses an HTTP POST request to send to an RPC server as well
// as the original JSON-RPC command and a channel to reply on when the server
// responds with the result.
type sendPostDetails struct {
httpRequest *http.Request
jsonRequest *jsonRequest
}
// requestRetryInterval is the initial amount of time to wait in between
// retries when sending HTTP POST requests.
requestRetryInterval = time.Millisecond * 500
)

// jsonRequest holds information about a json request that is used to properly
// detect, interpret, and deliver a reply to it.
Expand Down Expand Up @@ -183,7 +179,7 @@ type Client struct {

// Networking infrastructure.
sendChan chan []byte
sendPostChan chan *sendPostDetails
sendPostChan chan *jsonRequest
connEstablished chan struct{}
disconnect chan struct{}
shutdown chan struct{}
Expand Down Expand Up @@ -765,10 +761,50 @@ out:
// handleSendPostMessage handles performing the passed HTTP request, reading the
// result, unmarshalling it, and delivering the unmarshalled result to the
// provided response channel.
func (c *Client) handleSendPostMessage(details *sendPostDetails) {
jReq := details.jsonRequest
log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
httpResponse, err := c.httpClient.Do(details.httpRequest)
func (c *Client) handleSendPostMessage(jReq *jsonRequest) {
protocol := "http"
if !c.config.DisableTLS {
protocol = "https"
}
url := protocol + "://" + c.config.Host

var err error
var backoff time.Duration
var httpResponse *http.Response
tries := 10
for i := 0; tries == 0 || i < tries; i++ {
bodyReader := bytes.NewReader(jReq.marshalledJSON)
httpReq, err := http.NewRequest("POST", url, bodyReader)
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
}
httpReq.Close = true
httpReq.Header.Set("Content-Type", "application/json")
for key, value := range c.config.ExtraHeaders {
httpReq.Header.Set(key, value)
}

// Configure basic access authorization.
user, pass, err := c.config.getAuth()
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
}
httpReq.SetBasicAuth(user, pass)

httpResponse, err = c.httpClient.Do(httpReq)
if err != nil {
backoff = requestRetryInterval * time.Duration(i+1)
if backoff > time.Minute {
backoff = time.Minute
}
log.Debugf("Failed command [%s] with id %d attempt %d. Retrying in %v... \n", jReq.method, jReq.id, i, backoff)
time.Sleep(backoff)
continue
}
break
}
if err != nil {
jReq.responseChan <- &Response{err: err}
return
Expand Down Expand Up @@ -821,8 +857,8 @@ out:
// Send any messages ready for send until the shutdown channel
// is closed.
select {
case details := <-c.sendPostChan:
c.handleSendPostMessage(details)
case jReq := <-c.sendPostChan:
c.handleSendPostMessage(jReq)

case <-c.shutdown:
break out
Expand All @@ -834,8 +870,8 @@ out:
cleanup:
for {
select {
case details := <-c.sendPostChan:
details.jsonRequest.responseChan <- &Response{
case jReq := <-c.sendPostChan:
jReq.responseChan <- &Response{
result: nil,
err: ErrClientShutdown,
}
Expand All @@ -852,18 +888,17 @@ cleanup:
// sendPostRequest sends the passed HTTP request to the RPC server using the
// HTTP client associated with the client. It is backed by a buffered channel,
// so it will not block until the send channel is full.
func (c *Client) sendPostRequest(httpReq *http.Request, jReq *jsonRequest) {
func (c *Client) sendPostRequest(jReq *jsonRequest) {
// Don't send the message if shutting down.
select {
case <-c.shutdown:
jReq.responseChan <- &Response{result: nil, err: ErrClientShutdown}
default:
}

c.sendPostChan <- &sendPostDetails{
jsonRequest: jReq,
httpRequest: httpReq,
}
log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)

c.sendPostChan <- jReq
}

// newFutureError returns a new future result channel that already has the
Expand All @@ -885,42 +920,6 @@ func ReceiveFuture(f chan *Response) ([]byte, error) {
return r.result, r.err
}

// sendPost sends the passed request to the server by issuing an HTTP POST
// request using the provided response channel for the reply. Typically a new
// connection is opened and closed for each command when using this method,
// however, the underlying HTTP client might coalesce multiple commands
// depending on several factors including the remote server configuration.
func (c *Client) sendPost(jReq *jsonRequest) {
// Generate a request to the configured RPC server.
protocol := "http"
if !c.config.DisableTLS {
protocol = "https"
}
url := protocol + "://" + c.config.Host
bodyReader := bytes.NewReader(jReq.marshalledJSON)
httpReq, err := http.NewRequest("POST", url, bodyReader)
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
}
httpReq.Close = true
httpReq.Header.Set("Content-Type", "application/json")
for key, value := range c.config.ExtraHeaders {
httpReq.Header.Set(key, value)
}

// Configure basic access authorization.
user, pass, err := c.config.getAuth()
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
}
httpReq.SetBasicAuth(user, pass)

log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
c.sendPostRequest(httpReq, jReq)
}

// sendRequest sends the passed json request to the associated server using the
// provided response channel for the reply. It handles both websocket and HTTP
// POST mode depending on the configuration of the client.
Expand All @@ -935,7 +934,7 @@ func (c *Client) sendRequest(jReq *jsonRequest) {
log.Warn(err)
}
} else {
c.sendPost(jReq)
c.sendPostRequest(jReq)
}
return
}
Expand Down Expand Up @@ -1428,7 +1427,7 @@ func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error
ntfnHandlers: ntfnHandlers,
ntfnState: newNotificationState(),
sendChan: make(chan []byte, sendBufferSize),
sendPostChan: make(chan *sendPostDetails, sendPostBufferSize),
sendPostChan: make(chan *jsonRequest, sendPostBufferSize),
connEstablished: connEstablished,
disconnect: make(chan struct{}),
shutdown: make(chan struct{}),
Expand Down Expand Up @@ -1642,7 +1641,7 @@ func (c *Client) sendAsync() FutureGetBulkResult {
marshalledJSON: marshalledRequest,
responseChan: responseChan,
}
c.sendPost(&request)
c.sendPostRequest(&request)
return responseChan
}

Expand Down