Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to close idle connections for dead nodes #1507

Draft
wants to merge 5 commits into
base: release-branch.v7
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 58 additions & 6 deletions aws/v4/aws_v4.go
Expand Up @@ -34,6 +34,52 @@ func NewV4SigningClientWithHTTPClient(creds *credentials.Credentials, region str
}
}

// NewV4SigningClientWithOptions returns a configured *http.Client
// that will sign all requests with AWS V4 Signing.
func NewV4SigningClientWithOptions(opts ...SigningClientOption) *http.Client {
tr := &Transport{}
for _, o := range opts {
o(tr)
}
if tr.client == nil {
tr.client = http.DefaultClient
}
return &http.Client{
Transport: tr,
}
}

// SigningClientOption specifies options to be used with NewV4SigningClientWithOptions.
type SigningClientOption func(*Transport)

// WithHTTPClient configures the http.Client to be used in Transport.
func WithHTTPClient(client *http.Client) SigningClientOption {
return func(tr *Transport) {
tr.client = client
}
}

// WithCredentials configures the AWS credentials to be used in Transport.
func WithCredentials(creds *credentials.Credentials) SigningClientOption {
return func(tr *Transport) {
tr.creds = creds
}
}

// WithSigner configures the AWS signer to be used in Transport.
func WithSigner(signer *v4.Signer) SigningClientOption {
return func(tr *Transport) {
tr.signer = signer
}
}

// WithRegion configures the AWS region to be used in Transport, e.g. eu-west-1.
func WithRegion(region string) SigningClientOption {
return func(tr *Transport) {
tr.region = region
}
}

// Transport is a RoundTripper that will sign requests with AWS V4 Signing
type Transport struct {
client *http.Client
Expand All @@ -49,6 +95,7 @@ func (st Transport) RoundTrip(req *http.Request) (*http.Response, error) {
return st.client.Do(req)
}

// TODO(oe) Do we still need this? Can we use signer.DisableURIPathEscaping = true instead?
if strings.Contains(req.URL.RawPath, "%2C") {
// Escaping path
req.URL.RawPath = url.PathEscape(req.URL.RawPath)
Expand All @@ -57,25 +104,30 @@ func (st Transport) RoundTrip(req *http.Request) (*http.Response, error) {
now := time.Now().UTC()
req.Header.Set("Date", now.Format(time.RFC3339))

var err error
switch req.Body {
case nil:
_, err = st.signer.Sign(req, nil, "es", st.region, now)
_, err := st.signer.Sign(req, nil, "es", st.region, now)
if err != nil {
return nil, err
}
default:
switch body := req.Body.(type) {
case io.ReadSeeker:
_, err = st.signer.Sign(req, body, "es", st.region, now)
_, err := st.signer.Sign(req, body, "es", st.region, now)
if err != nil {
return nil, err
}
default:
buf, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil, err
}
req.Body = ioutil.NopCloser(bytes.NewReader(buf))
_, err = st.signer.Sign(req, bytes.NewReader(buf), "es", st.region, time.Now().UTC())
if err != nil {
return nil, err
}
}
}
if err != nil {
return nil, err
}
return st.client.Do(req)
}
35 changes: 33 additions & 2 deletions client.go
Expand Up @@ -43,7 +43,7 @@ const (
// for a response from Elasticsearch on startup, i.e. when creating a
// client. After the client is started, a shorter timeout is commonly used
// (its default is specified in DefaultHealthcheckTimeout).
DefaultHealthcheckTimeoutStartup = 5 * time.Second
DefaultHealthcheckTimeoutStartup = 10 * time.Second

// DefaultHealthcheckTimeout specifies the time a running client waits for
// a response from Elasticsearch. Notice that the healthcheck timeout
Expand Down Expand Up @@ -148,6 +148,7 @@ type Client struct {
retrier Retrier // strategy for retries
retryStatusCodes []int // HTTP status codes where to retry automatically (with retrier)
headers http.Header // a list of default headers to add to each request
closeIdleConnsForDeadConn bool // enable to call CloseIdleConnections when we find a dead node
}

// NewClient creates a new client to work with Elasticsearch.
Expand Down Expand Up @@ -472,6 +473,18 @@ func configToOptions(cfg *config.Config) ([]ClientOptionFunc, error) {
return options, nil
}

// SetCloseIdleConnections, when enabled, will call CloseIdleConnections
// whenever we find a dead connection in PerformRequest. This might help
// to fix issues with e.g. AWS Elasticsearch Service that automatically
// changes its configuration and leads Go net/http to use cached HTTP
// connection when it shouldn't.
func SetCloseIdleConnections(enabled bool) ClientOptionFunc {
return func(c *Client) error {
c.closeIdleConnsForDeadConn = enabled
return nil
}
}

// SetHttpClient can be used to specify the http.Client to use when making
// HTTP requests to Elasticsearch.
func SetHttpClient(httpClient Doer) ClientOptionFunc {
Expand Down Expand Up @@ -594,7 +607,7 @@ func SetHealthcheck(enabled bool) ClientOptionFunc {
}

// SetHealthcheckTimeoutStartup sets the timeout for the initial health check.
// The default timeout is 5 seconds (see DefaultHealthcheckTimeoutStartup).
// The default timeout is 10 seconds (see DefaultHealthcheckTimeoutStartup).
// Notice that timeouts for subsequent health checks can be modified with
// SetHealthcheckTimeout.
func SetHealthcheckTimeoutStartup(timeout time.Duration) ClientOptionFunc {
Expand Down Expand Up @@ -1326,6 +1339,21 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
retryStatusCodes = opt.RetryStatusCodes
}
defaultHeaders := c.headers
closeIdleConns := func() {}
if c.closeIdleConnsForDeadConn {
// If we're e.g. on AWS, we should make sure to close idle connections.
// That might happen when the AWS Elasticsearch domain is re-configured.
// Closing idle connections makes sure that net/http creates a
// new HTTP connection instead of re-using one from the cache.
closeIdleConns = func() {
type idleCloser interface {
CloseIdleConnections()
}
if ic, ok := c.c.(idleCloser); ok {
ic.CloseIdleConnections()
}
}
}
c.mu.RUnlock()

// retry returns true if statusCode indicates the request is to be retried
Expand Down Expand Up @@ -1434,11 +1462,13 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err)
if rerr != nil {
c.errorf("elastic: %s is dead", conn.URL())
closeIdleConns()
conn.MarkAsDead()
return nil, rerr
}
if !ok {
c.errorf("elastic: %s is dead", conn.URL())
closeIdleConns()
conn.MarkAsDead()
return nil, err
}
Expand All @@ -1451,6 +1481,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err)
if rerr != nil {
c.errorf("elastic: %s is dead", conn.URL())
closeIdleConns()
conn.MarkAsDead()
return nil, rerr
}
Expand Down
107 changes: 104 additions & 3 deletions client_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -77,6 +78,9 @@ func TestClientDefaults(t *testing.T) {
if client.sendGetBodyAs != "GET" {
t.Errorf("expected sendGetBodyAs to be GET; got: %q", client.sendGetBodyAs)
}
if client.closeIdleConnsForDeadConn != false {
t.Errorf("expected closeIdleConnsForDeadConn to be false; got: %v", client.closeIdleConnsForDeadConn)
}
}

func TestClientWithoutURL(t *testing.T) {
Expand Down Expand Up @@ -1430,9 +1434,10 @@ func TestPerformRequestOnNoConnectionsWithHealthcheckRevival(t *testing.T) {

// failingTransport will run a fail callback if it sees a given URL path prefix.
type failingTransport struct {
path string // path prefix to look for
fail func(*http.Request) (*http.Response, error) // call when path prefix is found
next http.RoundTripper // next round-tripper (use http.DefaultTransport if nil)
path string // path prefix to look for
fail func(*http.Request) (*http.Response, error) // call when path prefix is found
next http.RoundTripper // next round-tripper (use http.DefaultTransport if nil)
closeIdleConns func() // callback for CloseIdleConnections
}

// RoundTrip implements a failing transport.
Expand All @@ -1446,6 +1451,12 @@ func (tr *failingTransport) RoundTrip(r *http.Request) (*http.Response, error) {
return http.DefaultTransport.RoundTrip(r)
}

func (tr *failingTransport) CloseIdleConnections() {
if tr.closeIdleConns != nil {
tr.closeIdleConns()
}
}

func TestPerformRequestRetryOnHttpError(t *testing.T) {
var numFailedReqs int
fail := func(r *http.Request) (*http.Response, error) {
Expand Down Expand Up @@ -1556,6 +1567,96 @@ func TestPerformRequestOnSpecifiedHttpStatusCodes(t *testing.T) {
}
}

func TestPerformRequestCloseIdleConnectionsEnabled(t *testing.T) {
var (
numCallsRoundTripper int64
numCallsCloseIdleConns int64
)
tr := &failingTransport{
path: "/fail",
fail: func(r *http.Request) (*http.Response, error) {
// Called with every retry
atomic.AddInt64(&numCallsRoundTripper, 1)
return http.DefaultTransport.RoundTrip(r)
},
closeIdleConns: func() {
// Called when a connection is marked as dead
atomic.AddInt64(&numCallsCloseIdleConns, 1)
},
}
httpClient := &http.Client{Transport: tr}

client, err := NewClient(
SetURL("http://127.0.0.1:9201"),
SetHttpClient(httpClient),
SetMaxRetries(5),
SetSniff(false),
SetHealthcheck(false),
SetCloseIdleConnections(true), // <- call CloseIdleConnections for dead nodes
)
if err != nil {
t.Fatal(err)
}

// Make a request, so that the connection is marked as dead.
client.PerformRequest(context.TODO(), PerformRequestOptions{
Method: "GET",
Path: "/fail",
})

if want, have := int64(5), numCallsRoundTripper; want != have {
t.Errorf("expected %d calls to RoundTripper; got: %d", want, have)
}
if want, have := int64(1), numCallsCloseIdleConns; want != have {
t.Errorf("expected %d calls to CloseIdleConns; got: %d", want, have)
}
}

func TestPerformRequestCloseIdleConnectionsDisabled(t *testing.T) {
var (
numCallsRoundTripper int64
numCallsCloseIdleConns int64
)
tr := &failingTransport{
path: "/fail",
fail: func(r *http.Request) (*http.Response, error) {
// Called with every retry
atomic.AddInt64(&numCallsRoundTripper, 1)
return http.DefaultTransport.RoundTrip(r)
},
closeIdleConns: func() {
// Called when a connection is marked as dead
atomic.AddInt64(&numCallsCloseIdleConns, 1)
},
}
httpClient := &http.Client{Transport: tr}

client, err := NewClient(
SetURL("http://127.0.0.1:9201"),
SetHttpClient(httpClient),
SetMaxRetries(5),
SetSniff(false),
SetHealthcheck(false),
SetCloseIdleConnections(false), // <- do NOT call CloseIdleConnections for dead nodes
)
if err != nil {
t.Fatal(err)
}

// Make a request, so that the connection is marked as dead.
client.PerformRequest(context.TODO(), PerformRequestOptions{
Method: "GET",
Path: "/fail",
})

if want, have := int64(5), numCallsRoundTripper; want != have {
t.Errorf("expected %d calls to RoundTripper; got: %d", want, have)
}
if want, have := int64(0), numCallsCloseIdleConns; want != have {
t.Errorf("expected %d calls to CloseIdleConns; got: %d", want, have)
}
}

// failingBody will return an error when json.Marshal is called on it.
type failingBody struct{}

Expand Down
1 change: 1 addition & 0 deletions recipes/aws-es-client/.gitignore
@@ -0,0 +1 @@
/aws-es-client
11 changes: 11 additions & 0 deletions recipes/aws-es-client/go.mod
@@ -0,0 +1,11 @@
module github.com/olivere/elastic/recipes/aws-es-client

go 1.16

require (
github.com/aws/aws-sdk-go v1.39.2
github.com/olivere/elastic/v7 v7.0.26
github.com/olivere/env v1.1.0
)

replace github.com/olivere/elastic/v7 => ../..