Skip to content

Commit

Permalink
Add context to logger
Browse files Browse the repository at this point in the history
This commit adds a `LoggerWithContext` interface that extends the
`Logger` interface by a method `PrintfWithContext` that, when
implemented, is called instead of the `Printf` method of the `Logger`
interface.

The purpose of `PrintfWithContext` is to receive the current context
under which the logging happens. Notice that this doesn't always
have to be request-scoped, i.e. an actual API call from a user. It may
also be from an internal state or process, e.g. Bulk processor or node
health.

Close #1541
  • Loading branch information
olivere committed Oct 14, 2021
1 parent 68349f0 commit d01a467
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 29 deletions.
10 changes: 5 additions & 5 deletions bulk_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (w *bulkWorker) work(ctx context.Context) {
w.flushAckC <- struct{}{}
}
if err != nil {
w.p.c.errorf("elastic: bulk processor %q was unable to perform work: %v", w.p.name, err)
w.p.c.errorf(ctx, "elastic: bulk processor %q was unable to perform work: %v", w.p.name, err)
if !stop {
waitForActive := func() {
// Add back pressure to prevent Add calls from filling up the request queue
Expand Down Expand Up @@ -556,7 +556,7 @@ func (w *bulkWorker) commit(ctx context.Context) error {
}
// notifyFunc will be called if retry fails
notifyFunc := func(err error) {
w.p.c.errorf("elastic: bulk processor %q failed but may retry: %v", w.p.name, err)
w.p.c.errorf(ctx, "elastic: bulk processor %q failed but may retry: %v", w.p.name, err)
}

id := atomic.AddInt64(&w.p.executionId, 1)
Expand All @@ -580,7 +580,7 @@ func (w *bulkWorker) commit(ctx context.Context) error {
err := RetryNotify(commitFunc, w.p.backoff, notifyFunc)
w.updateStats(res)
if err != nil {
w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err)
w.p.c.errorf(ctx, "elastic: bulk processor %q failed: %v", w.p.name, err)
}

// Invoke after callback
Expand All @@ -599,14 +599,14 @@ func (w *bulkWorker) waitForActiveConnection(ready chan<- struct{}) {

client := w.p.c
stopReconnC := w.p.stopReconnC
w.p.c.errorf("elastic: bulk processor %q is waiting for an active connection", w.p.name)
w.p.c.errorf(context.Background(), "elastic: bulk processor %q is waiting for an active connection", w.p.name)

// loop until a health check finds at least 1 active connection or the reconnection channel is closed
for {
select {
case _, ok := <-stopReconnC:
if !ok {
w.p.c.errorf("elastic: bulk processor %q active connection check interrupted", w.p.name)
w.p.c.errorf(context.Background(), "elastic: bulk processor %q active connection check interrupted", w.p.name)
return
}
case <-t.C:
Expand Down
63 changes: 39 additions & 24 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

const (
// Version is the current version of Elastic.
Version = "7.0.29"
Version = "7.0.30"

// DefaultURL is the default endpoint of Elasticsearch on the local machine.
// It is used e.g. when initializing a new Client without a specific URL.
Expand Down Expand Up @@ -84,6 +84,9 @@ const (
)

var (
// nilByte is used in JSON marshal/unmarshal
nilByte = []byte("null")

// ErrNoClient is raised when no Elasticsearch node is available.
ErrNoClient = errors.New("no Elasticsearch node available")

Expand Down Expand Up @@ -798,7 +801,7 @@ func (c *Client) Start() {
c.running = true
c.mu.Unlock()

c.infof("elastic: client started")
c.infof(context.Background(), "elastic: client started")
}

// Stop stops the background processes that the client is running,
Expand Down Expand Up @@ -828,27 +831,39 @@ func (c *Client) Stop() {
c.running = false
c.mu.Unlock()

c.infof("elastic: client stopped")
c.infof(context.Background(), "elastic: client stopped")
}

// errorf logs to the error log.
func (c *Client) errorf(format string, args ...interface{}) {
func (c *Client) errorf(ctx context.Context, format string, args ...interface{}) {
if c.errorlog != nil {
c.errorlog.Printf(format, args...)
if logger, ok := c.errorlog.(LoggerWithContext); ok {
logger.PrintfWithContext(ctx, format, args...)
} else {
c.errorlog.Printf(format, args...)
}
}
}

// infof logs informational messages.
func (c *Client) infof(format string, args ...interface{}) {
func (c *Client) infof(ctx context.Context, format string, args ...interface{}) {
if c.infolog != nil {
c.infolog.Printf(format, args...)
if logger, ok := c.infolog.(LoggerWithContext); ok {
logger.PrintfWithContext(ctx, format, args...)
} else {
c.infolog.Printf(format, args...)
}
}
}

// tracef logs to the trace log.
func (c *Client) tracef(format string, args ...interface{}) {
func (c *Client) tracef(ctx context.Context, format string, args ...interface{}) {
if c.tracelog != nil {
c.tracelog.Printf(format, args...)
if logger, ok := c.tracelog.(LoggerWithContext); ok {
logger.PrintfWithContext(ctx, format, args...)
} else {
c.tracelog.Printf(format, args...)
}
}
}

Expand All @@ -857,7 +872,7 @@ func (c *Client) dumpRequest(r *http.Request) {
if c.tracelog != nil {
out, err := httputil.DumpRequestOut(r, true)
if err == nil {
c.tracef("%s\n", string(out))
c.tracef(r.Context(), "%s\n", string(out))
}
}
}
Expand All @@ -867,7 +882,7 @@ func (c *Client) dumpResponse(resp *http.Response) {
if c.tracelog != nil {
out, err := httputil.DumpResponse(resp, true)
if err == nil {
c.tracef("%s\n", string(out))
c.tracef(context.Background(), "%s\n", string(out))
}
}
}
Expand Down Expand Up @@ -1055,7 +1070,7 @@ func (c *Client) updateConns(conns []*conn) {
}
if !found {
// New connection didn't exist, so add it to our list of new conns.
c.infof("elastic: %s joined the cluster", conn.URL())
c.infof(context.Background(), "elastic: %s joined the cluster", conn.URL())
newConns = append(newConns, conn)
}
}
Expand Down Expand Up @@ -1147,19 +1162,19 @@ func (c *Client) healthcheck(parentCtx context.Context, timeout time.Duration, f
// Wait for the Goroutine (or its timeout)
select {
case <-ctx.Done(): // timeout
c.errorf("elastic: %s is dead", conn.URL())
c.errorf(ctx, "elastic: %s is dead", conn.URL())
conn.MarkAsDead()
case err := <-errc:
if err != nil {
c.errorf("elastic: %s is dead", conn.URL())
c.errorf(ctx, "elastic: %s is dead", conn.URL())
conn.MarkAsDead()
break
}
if status >= 200 && status < 300 {
conn.MarkAsAlive()
} else {
conn.MarkAsDead()
c.errorf("elastic: %s is dead [status=%d]", conn.URL(), status)
c.errorf(ctx, "elastic: %s is dead [status=%d]", conn.URL(), status)
}
}
}
Expand Down Expand Up @@ -1256,7 +1271,7 @@ func (c *Client) next() (*conn, error) {
// So we are marking them as alive--if sniffing is disabled.
// They'll then be picked up in the next call to PerformRequest.
if !c.snifferEnabled {
c.errorf("elastic: all %d nodes marked as dead; resurrecting them to prevent deadlock", len(c.conns))
c.errorf(context.Background(), "elastic: all %d nodes marked as dead; resurrecting them to prevent deadlock", len(c.conns))
for _, conn := range c.conns {
conn.MarkAsAlive()
}
Expand Down Expand Up @@ -1380,13 +1395,13 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
continue // try again
}
if err != nil {
c.errorf("elastic: cannot get connection from pool")
c.errorf(ctx, "elastic: cannot get connection from pool")
return nil, err
}

req, err = NewRequest(opt.Method, conn.URL()+pathWithParams)
if err != nil {
c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(opt.Method), conn.URL()+pathWithParams, err)
c.errorf(ctx, "elastic: cannot create request for %s %s: %v", strings.ToUpper(opt.Method), conn.URL()+pathWithParams, err)
return nil, err
}
if basicAuth {
Expand Down Expand Up @@ -1415,7 +1430,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
if opt.Body != nil {
err = req.SetBody(opt.Body, gzipEnabled)
if err != nil {
c.errorf("elastic: couldn't set body %+v for request: %v", opt.Body, err)
c.errorf(ctx, "elastic: couldn't set body %+v for request: %v", opt.Body, err)
return nil, err
}
}
Expand All @@ -1433,12 +1448,12 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
n++
wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err)
if rerr != nil {
c.errorf("elastic: %s is dead", conn.URL())
c.errorf(ctx, "elastic: %s is dead", conn.URL())
conn.MarkAsDead()
return nil, rerr
}
if !ok {
c.errorf("elastic: %s is dead", conn.URL())
c.errorf(ctx, "elastic: %s is dead", conn.URL())
conn.MarkAsDead()
return nil, err
}
Expand All @@ -1450,7 +1465,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
n++
wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err)
if rerr != nil {
c.errorf("elastic: %s is dead", conn.URL())
c.errorf(ctx, "elastic: %s is dead", conn.URL())
conn.MarkAsDead()
return nil, rerr
}
Expand All @@ -1473,7 +1488,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
if len(res.Header["Warning"]) > 0 {
c.deprecationlog((*http.Request)(req), res)
for _, warning := range res.Header["Warning"] {
c.errorf("Deprecation warning: %s", warning)
c.errorf(ctx, "Deprecation warning: %s", warning)
}
}

Expand All @@ -1497,7 +1512,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
}

duration := time.Now().UTC().Sub(start)
c.infof("%s %s [status:%d, request:%.3fs]",
c.infof(ctx, "%s %s [status:%d, request:%.3fs]",
strings.ToUpper(opt.Method),
req.URL,
resp.StatusCode,
Expand Down
8 changes: 8 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@

package elastic

import "context"

// Logger specifies the interface for all log operations.
type Logger interface {
Printf(format string, v ...interface{})
}

// LoggerWithContext extends the Logger interface by a context.
type LoggerWithContext interface {
Logger
PrintfWithContext(ctx context.Context, format string, v ...interface{})
}

0 comments on commit d01a467

Please sign in to comment.