Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context to logger #1542

Draft
wants to merge 1 commit into
base: release-branch.v7
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions bulk_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (w *bulkWorker) work(ctx context.Context) {
w.flushAckC <- struct{}{}
}
if err != nil {
w.p.c.errorf("elastic: bulk processor %q was unable to perform work: %v", w.p.name, err)
w.p.c.errorf(ctx, "elastic: bulk processor %q was unable to perform work: %v", w.p.name, err)
if !stop {
waitForActive := func() {
// Add back pressure to prevent Add calls from filling up the request queue
Expand Down Expand Up @@ -556,7 +556,7 @@ func (w *bulkWorker) commit(ctx context.Context) error {
}
// notifyFunc will be called if retry fails
notifyFunc := func(err error) {
w.p.c.errorf("elastic: bulk processor %q failed but may retry: %v", w.p.name, err)
w.p.c.errorf(ctx, "elastic: bulk processor %q failed but may retry: %v", w.p.name, err)
}

id := atomic.AddInt64(&w.p.executionId, 1)
Expand All @@ -580,7 +580,7 @@ func (w *bulkWorker) commit(ctx context.Context) error {
err := RetryNotify(commitFunc, w.p.backoff, notifyFunc)
w.updateStats(res)
if err != nil {
w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err)
w.p.c.errorf(ctx, "elastic: bulk processor %q failed: %v", w.p.name, err)
}

// Invoke after callback
Expand All @@ -599,14 +599,14 @@ func (w *bulkWorker) waitForActiveConnection(ready chan<- struct{}) {

client := w.p.c
stopReconnC := w.p.stopReconnC
w.p.c.errorf("elastic: bulk processor %q is waiting for an active connection", w.p.name)
w.p.c.errorf(context.Background(), "elastic: bulk processor %q is waiting for an active connection", w.p.name)

// loop until a health check finds at least 1 active connection or the reconnection channel is closed
for {
select {
case _, ok := <-stopReconnC:
if !ok {
w.p.c.errorf("elastic: bulk processor %q active connection check interrupted", w.p.name)
w.p.c.errorf(context.Background(), "elastic: bulk processor %q active connection check interrupted", w.p.name)
return
}
case <-t.C:
Expand Down
63 changes: 39 additions & 24 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

const (
// Version is the current version of Elastic.
Version = "7.0.29"
Version = "7.0.30"

// DefaultURL is the default endpoint of Elasticsearch on the local machine.
// It is used e.g. when initializing a new Client without a specific URL.
Expand Down Expand Up @@ -84,6 +84,9 @@ const (
)

var (
// nilByte is used in JSON marshal/unmarshal
nilByte = []byte("null")

// ErrNoClient is raised when no Elasticsearch node is available.
ErrNoClient = errors.New("no Elasticsearch node available")

Expand Down Expand Up @@ -798,7 +801,7 @@ func (c *Client) Start() {
c.running = true
c.mu.Unlock()

c.infof("elastic: client started")
c.infof(context.Background(), "elastic: client started")
}

// Stop stops the background processes that the client is running,
Expand Down Expand Up @@ -828,27 +831,39 @@ func (c *Client) Stop() {
c.running = false
c.mu.Unlock()

c.infof("elastic: client stopped")
c.infof(context.Background(), "elastic: client stopped")
}

// errorf logs to the error log.
func (c *Client) errorf(format string, args ...interface{}) {
func (c *Client) errorf(ctx context.Context, format string, args ...interface{}) {
if c.errorlog != nil {
c.errorlog.Printf(format, args...)
if logger, ok := c.errorlog.(LoggerWithContext); ok {
logger.PrintfWithContext(ctx, format, args...)
} else {
c.errorlog.Printf(format, args...)
}
}
}

// infof logs informational messages.
func (c *Client) infof(format string, args ...interface{}) {
func (c *Client) infof(ctx context.Context, format string, args ...interface{}) {
if c.infolog != nil {
c.infolog.Printf(format, args...)
if logger, ok := c.infolog.(LoggerWithContext); ok {
logger.PrintfWithContext(ctx, format, args...)
} else {
c.infolog.Printf(format, args...)
}
}
}

// tracef logs to the trace log.
func (c *Client) tracef(format string, args ...interface{}) {
func (c *Client) tracef(ctx context.Context, format string, args ...interface{}) {
if c.tracelog != nil {
c.tracelog.Printf(format, args...)
if logger, ok := c.tracelog.(LoggerWithContext); ok {
logger.PrintfWithContext(ctx, format, args...)
} else {
c.tracelog.Printf(format, args...)
}
}
}

Expand All @@ -857,7 +872,7 @@ func (c *Client) dumpRequest(r *http.Request) {
if c.tracelog != nil {
out, err := httputil.DumpRequestOut(r, true)
if err == nil {
c.tracef("%s\n", string(out))
c.tracef(r.Context(), "%s\n", string(out))
}
}
}
Expand All @@ -867,7 +882,7 @@ func (c *Client) dumpResponse(resp *http.Response) {
if c.tracelog != nil {
out, err := httputil.DumpResponse(resp, true)
if err == nil {
c.tracef("%s\n", string(out))
c.tracef(context.Background(), "%s\n", string(out))
}
}
}
Expand Down Expand Up @@ -1055,7 +1070,7 @@ func (c *Client) updateConns(conns []*conn) {
}
if !found {
// New connection didn't exist, so add it to our list of new conns.
c.infof("elastic: %s joined the cluster", conn.URL())
c.infof(context.Background(), "elastic: %s joined the cluster", conn.URL())
newConns = append(newConns, conn)
}
}
Expand Down Expand Up @@ -1147,19 +1162,19 @@ func (c *Client) healthcheck(parentCtx context.Context, timeout time.Duration, f
// Wait for the Goroutine (or its timeout)
select {
case <-ctx.Done(): // timeout
c.errorf("elastic: %s is dead", conn.URL())
c.errorf(ctx, "elastic: %s is dead", conn.URL())
conn.MarkAsDead()
case err := <-errc:
if err != nil {
c.errorf("elastic: %s is dead", conn.URL())
c.errorf(ctx, "elastic: %s is dead", conn.URL())
conn.MarkAsDead()
break
}
if status >= 200 && status < 300 {
conn.MarkAsAlive()
} else {
conn.MarkAsDead()
c.errorf("elastic: %s is dead [status=%d]", conn.URL(), status)
c.errorf(ctx, "elastic: %s is dead [status=%d]", conn.URL(), status)
}
}
}
Expand Down Expand Up @@ -1256,7 +1271,7 @@ func (c *Client) next() (*conn, error) {
// So we are marking them as alive--if sniffing is disabled.
// They'll then be picked up in the next call to PerformRequest.
if !c.snifferEnabled {
c.errorf("elastic: all %d nodes marked as dead; resurrecting them to prevent deadlock", len(c.conns))
c.errorf(context.Background(), "elastic: all %d nodes marked as dead; resurrecting them to prevent deadlock", len(c.conns))
for _, conn := range c.conns {
conn.MarkAsAlive()
}
Expand Down Expand Up @@ -1380,13 +1395,13 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
continue // try again
}
if err != nil {
c.errorf("elastic: cannot get connection from pool")
c.errorf(ctx, "elastic: cannot get connection from pool")
return nil, err
}

req, err = NewRequest(opt.Method, conn.URL()+pathWithParams)
if err != nil {
c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(opt.Method), conn.URL()+pathWithParams, err)
c.errorf(ctx, "elastic: cannot create request for %s %s: %v", strings.ToUpper(opt.Method), conn.URL()+pathWithParams, err)
return nil, err
}
if basicAuth {
Expand Down Expand Up @@ -1415,7 +1430,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
if opt.Body != nil {
err = req.SetBody(opt.Body, gzipEnabled)
if err != nil {
c.errorf("elastic: couldn't set body %+v for request: %v", opt.Body, err)
c.errorf(ctx, "elastic: couldn't set body %+v for request: %v", opt.Body, err)
return nil, err
}
}
Expand All @@ -1433,12 +1448,12 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
n++
wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err)
if rerr != nil {
c.errorf("elastic: %s is dead", conn.URL())
c.errorf(ctx, "elastic: %s is dead", conn.URL())
conn.MarkAsDead()
return nil, rerr
}
if !ok {
c.errorf("elastic: %s is dead", conn.URL())
c.errorf(ctx, "elastic: %s is dead", conn.URL())
conn.MarkAsDead()
return nil, err
}
Expand All @@ -1450,7 +1465,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
n++
wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err)
if rerr != nil {
c.errorf("elastic: %s is dead", conn.URL())
c.errorf(ctx, "elastic: %s is dead", conn.URL())
conn.MarkAsDead()
return nil, rerr
}
Expand All @@ -1473,7 +1488,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
if len(res.Header["Warning"]) > 0 {
c.deprecationlog((*http.Request)(req), res)
for _, warning := range res.Header["Warning"] {
c.errorf("Deprecation warning: %s", warning)
c.errorf(ctx, "Deprecation warning: %s", warning)
}
}

Expand All @@ -1497,7 +1512,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
}

duration := time.Now().UTC().Sub(start)
c.infof("%s %s [status:%d, request:%.3fs]",
c.infof(ctx, "%s %s [status:%d, request:%.3fs]",
strings.ToUpper(opt.Method),
req.URL,
resp.StatusCode,
Expand Down
8 changes: 8 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@

package elastic

import "context"

// Logger specifies the interface for all log operations.
type Logger interface {
Printf(format string, v ...interface{})
}

// LoggerWithContext extends the Logger interface by a context.
type LoggerWithContext interface {
Logger

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think about LoggerWithContext implements Logger interface is necessarily.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise you can’t specify the logger in a backwards compatible manner with SetErrorLog etc.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right :)

PrintfWithContext(ctx context.Context, format string, v ...interface{})
}