Skip to content

Commit

Permalink
Merge pull request #737 from axw/recording
Browse files Browse the repository at this point in the history
Introduce "recording" config
  • Loading branch information
axw committed Apr 1, 2020
2 parents 30724e0 + 6e8ca3f commit 45f723b
Show file tree
Hide file tree
Showing 22 changed files with 333 additions and 79 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -23,6 +23,8 @@ endif::[]
https://github.com/elastic/apm-agent-go/compare/v1.7.2...master[View commits]
- Add "recording" config option, to dynamically disable event recording {pull}737[(#737)]
[[release-notes-1.x]]
=== Go Agent version 1.x
Expand Down
17 changes: 17 additions & 0 deletions config.go
Expand Up @@ -46,6 +46,7 @@ const (
envEnvironment = "ELASTIC_APM_ENVIRONMENT"
envSpanFramesMinDuration = "ELASTIC_APM_SPAN_FRAMES_MIN_DURATION"
envActive = "ELASTIC_APM_ACTIVE"
envRecording = "ELASTIC_APM_RECORDING"
envAPIRequestSize = "ELASTIC_APM_API_REQUEST_SIZE"
envAPIRequestTime = "ELASTIC_APM_API_REQUEST_TIME"
envAPIBufferSize = "ELASTIC_APM_API_BUFFER_SIZE"
Expand Down Expand Up @@ -252,6 +253,10 @@ func initialActive() (bool, error) {
return configutil.ParseBoolEnv(envActive, true)
}

func initialRecording() (bool, error) {
return configutil.ParseBoolEnv(envRecording, true)
}

func initialDisabledMetrics() wildcard.Matchers {
return configutil.ParseWildcardPatternsEnv(envDisableMetrics, nil)
}
Expand Down Expand Up @@ -341,6 +346,17 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string]
cfg.maxSpans = value
})
}
case envRecording:
recording, err := strconv.ParseBool(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.recording = recording
})
}
case envTransactionSampleRate:
sampler, err := parseSampleRate(k, v)
if err != nil {
Expand Down Expand Up @@ -438,6 +454,7 @@ type instrumentationConfig struct {
// set the initial entry in instrumentationConfig.local, in order to properly reset
// to the local value, even if the default is the zero value.
type instrumentationConfigValues struct {
recording bool
captureBody CaptureBodyMode
captureHeaders bool
maxSpans int
Expand Down
54 changes: 54 additions & 0 deletions config_test.go
Expand Up @@ -245,3 +245,57 @@ func TestTracerConfigWatcherPrecedence(t *testing.T) {
}
}
}

func TestTracerCentralConfigRecording(t *testing.T) {
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()

changes := make(chan apmconfig.Change)
watcherFunc := apmtest.WatchConfigFunc(func(ctx context.Context, params apmconfig.WatchParams) <-chan apmconfig.Change {
return changes
})
tracer.SetLogger(apmtest.NewTestLogger(t))
tracer.SetConfigWatcher(watcherFunc)
tracer.SetMetricsInterval(0) // disable periodic gathering

checkRecording := func(expected bool) {
defer tracer.ResetPayloads()
tracer.StartTransaction("name", "type").End()
tracer.Flush(nil)
if expected {
require.True(t, tracer.Recording())
tracer.SendMetrics(nil)
payloads := tracer.Payloads()
require.NotEmpty(t, payloads.Metrics)
require.NotEmpty(t, payloads.Transactions)
} else {
require.False(t, tracer.Recording())
// testTracerMetricsNotRecording enables periodic
// gathering, checks that no gathering takes place
// (because we're expected not to be recording),
// and then disable periodic gathering again.
testTracerMetricsNotRecording(t, tracer)
payloads := tracer.Payloads()
require.Empty(t, payloads.Transactions)
}
}
updateRemoteConfig := func(attrs map[string]string) {
// We send twice as a means of waiting for the first change to be applied.
for i := 0; i < 2; i++ {
changes <- apmconfig.Change{Attrs: attrs}
}
}

// Initially local config is in effect.
checkRecording(true)

updateRemoteConfig(map[string]string{"recording": "false"})
checkRecording(false)

updateRemoteConfig(map[string]string{"recording": "true"})
tracer.SetRecording(false) // not effective, remote config in effect
checkRecording(true)

updateRemoteConfig(map[string]string{})
checkRecording(false) // local config in effect now
}
14 changes: 14 additions & 0 deletions docs/configuration.asciidoc
Expand Up @@ -212,6 +212,20 @@ You must use the query bar to filter for a specific environment in versions prio
Enable or disable the agent. If set to false, then the Go agent does not send
any data to the Elastic APM server, and instrumentation overhead is minimized.

[float]
[[config-recording]]
=== `ELASTIC_APM_RECORDING`

[options="header"]
|============
| Environment | Default | Example
| `ELASTIC_APM_RECORDING` | true | `false`
|============

Enable or disable recording of events. If set to false, then the Go agent does
send any events to the Elastic APM server, and instrumentation overhead is
minimized, but the agent will continue to poll the server for configuration changes.

[float]
[[config-global-labels]]
=== `ELASTIC_APM_GLOBAL_LABELS`
Expand Down
1 change: 1 addition & 0 deletions env_test.go
Expand Up @@ -334,6 +334,7 @@ func TestTracerActiveEnv(t *testing.T) {
tracer, transport := transporttest.NewRecorderTracer()
defer tracer.Close()
assert.False(t, tracer.Active())
assert.False(t, tracer.Recording()) // inactive => not recording

tx := tracer.StartTransaction("name", "type")
tx.End()
Expand Down
52 changes: 32 additions & 20 deletions error.go
Expand Up @@ -92,10 +92,12 @@ func (t *Tracer) NewError(err error) *Error {
e := t.newError()
e.cause = err
e.err = err.Error()
rand.Read(e.ID[:]) // ignore error, can't do anything about it
initException(&e.exception, err, e.stackTraceLimit)
if len(e.exception.stacktrace) == 0 {
e.SetStacktrace(2)
if e.recording {
rand.Read(e.ID[:]) // ignore error, can't do anything about it
initException(&e.exception, err, e.stackTraceLimit)
if len(e.exception.stacktrace) == 0 {
e.SetStacktrace(2)
}
}
return e
}
Expand All @@ -108,20 +110,22 @@ func (t *Tracer) NewError(err error) *Error {
// If r.Message is empty, "[EMPTY]" will be used.
func (t *Tracer) NewErrorLog(r ErrorLogRecord) *Error {
e := t.newError()
e.log = ErrorLogRecord{
Message: truncateString(r.Message),
MessageFormat: truncateString(r.MessageFormat),
Level: truncateString(r.Level),
LoggerName: truncateString(r.LoggerName),
}
if e.log.Message == "" {
e.log.Message = "[EMPTY]"
}
e.cause = r.Error
e.err = e.log.Message
rand.Read(e.ID[:]) // ignore error, can't do anything about it
if r.Error != nil {
initException(&e.exception, r.Error, e.stackTraceLimit)
if e.recording {
e.log = ErrorLogRecord{
Message: truncateString(r.Message),
MessageFormat: truncateString(r.MessageFormat),
Level: truncateString(r.Level),
LoggerName: truncateString(r.LoggerName),
}
if e.log.Message == "" {
e.log.Message = "[EMPTY]"
}
rand.Read(e.ID[:]) // ignore error, can't do anything about it
if r.Error != nil {
initException(&e.exception, r.Error, e.stackTraceLimit)
}
}
return e
}
Expand All @@ -137,11 +141,14 @@ func (t *Tracer) newError() *Error {
},
}
}
e.Timestamp = time.Now()

instrumentationConfig := t.instrumentationConfig()
e.Context.captureHeaders = instrumentationConfig.captureHeaders
e.stackTraceLimit = instrumentationConfig.stackTraceLimit
e.recording = instrumentationConfig.recording
if e.recording {
e.Timestamp = time.Now()
e.Context.captureHeaders = instrumentationConfig.captureHeaders
e.stackTraceLimit = instrumentationConfig.stackTraceLimit
}

return &Error{ErrorData: e}
}
Expand All @@ -166,6 +173,7 @@ type Error struct {
// When the error is sent, its ErrorData field will be set to nil.
type ErrorData struct {
tracer *Tracer
recording bool
stackTraceLimit int
exception exceptionData
log ErrorLogRecord
Expand Down Expand Up @@ -306,7 +314,11 @@ func (e *Error) Send() {
if e == nil || e.sent() {
return
}
e.ErrorData.enqueue()
if e.recording {
e.ErrorData.enqueue()
} else {
e.reset()
}
e.ErrorData = nil
}

Expand Down
16 changes: 16 additions & 0 deletions error_test.go
Expand Up @@ -244,6 +244,22 @@ func TestErrorNilError(t *testing.T) {
assert.EqualError(t, e, "")
}

func TestErrorNotRecording(t *testing.T) {
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()
tracer.SetRecording(false)

e := tracer.NewError(errors.New("boom"))
require.NotNil(t, e)
require.NotNil(t, e.ErrorData)
e.Send()
require.Nil(t, e.ErrorData)
tracer.Flush(nil)

payloads := tracer.Payloads()
require.Empty(t, payloads.Errors)
}

func TestErrorTransactionSampled(t *testing.T) {
_, _, errors := apmtest.WithTransaction(func(ctx context.Context) {
apm.TransactionFromContext(ctx).Type = "foo"
Expand Down
51 changes: 50 additions & 1 deletion metrics_test.go
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/stretchr/testify/require"

"go.elastic.co/apm"
"go.elastic.co/apm/apmtest"
"go.elastic.co/apm/model"
"go.elastic.co/apm/transport/transporttest"
)
Expand Down Expand Up @@ -281,7 +282,7 @@ func TestTracerMetricsBuffered(t *testing.T) {
}
}

func TestTracerMetricsDisable(t *testing.T) {
func TestTracerDisableMetrics(t *testing.T) {
os.Setenv("ELASTIC_APM_DISABLE_METRICS", "golang.heap.*, system.memory.*, system.process.*")
defer os.Unsetenv("ELASTIC_APM_DISABLE_METRICS")

Expand All @@ -302,6 +303,54 @@ func TestTracerMetricsDisable(t *testing.T) {
assert.EqualValues(t, expected, actual)
}

func TestTracerMetricsNotRecording(t *testing.T) {
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()
tracer.SetRecording(false)
testTracerMetricsNotRecording(t, tracer)
}

func testTracerMetricsNotRecording(t *testing.T, tracer *apmtest.RecordingTracer) {
done := make(chan struct{})
defer close(done)

gathered := make(chan struct{})
tracer.RegisterMetricsGatherer(apm.GatherMetricsFunc(
func(ctx context.Context, m *apm.Metrics) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
case gathered <- struct{}{}:
}
return nil
},
))

tracer.SetMetricsInterval(time.Millisecond)
defer tracer.SetMetricsInterval(0) // disable at end

sent := make(chan struct{})
go func() {
defer close(sent)
tracer.SendMetrics(nil) // unblocked by tracer.Close
}()

// Because the tracer is configured to not record,
// the metrics gatherer should never be called.
select {
case <-time.After(100 * time.Millisecond):
case <-sent:
t.Fatal("expected SendMetrics to block")
case <-gathered:
t.Fatal("unexpected metrics gatherer call")
}

tracer.Flush(nil) // empty queue, should not block
payloads := tracer.Payloads()
require.Empty(t, payloads.Metrics)
}

// busyWork does meaningless work for the specified duration,
// so we can observe CPU usage.
func busyWork(d time.Duration) int {
Expand Down
2 changes: 1 addition & 1 deletion module/apmecho/middleware.go
Expand Up @@ -64,7 +64,7 @@ type middleware struct {

func (m *middleware) handle(c echo.Context) error {
req := c.Request()
if !m.tracer.Active() || m.requestIgnorer(req) {
if !m.tracer.Recording() || m.requestIgnorer(req) {
return m.handler(c)
}
name := req.Method + " " + c.Path()
Expand Down
2 changes: 1 addition & 1 deletion module/apmechov4/middleware.go
Expand Up @@ -66,7 +66,7 @@ type middleware struct {

func (m *middleware) handle(c echo.Context) error {
req := c.Request()
if !m.tracer.Active() || m.requestIgnorer(req) {
if !m.tracer.Recording() || m.requestIgnorer(req) {
return m.handler(c)
}
name := req.Method + " " + c.Path()
Expand Down
2 changes: 1 addition & 1 deletion module/apmgin/middleware.go
Expand Up @@ -69,7 +69,7 @@ type routeInfo struct {
}

func (m *middleware) handle(c *gin.Context) {
if !m.tracer.Active() || m.requestIgnorer(c.Request) {
if !m.tracer.Recording() || m.requestIgnorer(c.Request) {
c.Next()
return
}
Expand Down
2 changes: 1 addition & 1 deletion module/apmgrpc/server.go
Expand Up @@ -63,7 +63,7 @@ func NewUnaryServerInterceptor(o ...ServerOption) grpc.UnaryServerInterceptor {
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
if !opts.tracer.Active() || opts.requestIgnorer(info) {
if !opts.tracer.Recording() || opts.requestIgnorer(info) {
return handler(ctx, req)
}
tx, ctx := startTransaction(ctx, opts.tracer, info.FullMethod)
Expand Down
2 changes: 1 addition & 1 deletion module/apmhttp/handler.go
Expand Up @@ -67,7 +67,7 @@ type handler struct {
// ServeHTTP delegates to h.Handler, tracing the transaction with
// h.Tracer, or apm.DefaultTracer if h.Tracer is nil.
func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if !h.tracer.Active() || h.requestIgnorer(req) {
if !h.tracer.Recording() || h.requestIgnorer(req) {
h.handler.ServeHTTP(w, req)
return
}
Expand Down
2 changes: 1 addition & 1 deletion module/apmhttprouter/handler.go
Expand Up @@ -38,7 +38,7 @@ import (
func Wrap(h httprouter.Handle, route string, o ...Option) httprouter.Handle {
opts := gatherOptions(o...)
return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
if !opts.tracer.Active() || opts.requestIgnorer(req) {
if !opts.tracer.Recording() || opts.requestIgnorer(req) {
h(w, req, p)
return
}
Expand Down
2 changes: 1 addition & 1 deletion module/apmlogrus/hook.go
Expand Up @@ -87,7 +87,7 @@ func (h *Hook) Levels() []logrus.Level {
// Fire reports the log entry as an error to the APM Server.
func (h *Hook) Fire(entry *logrus.Entry) error {
tracer := h.tracer()
if !tracer.Active() {
if !tracer.Recording() {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion module/apmrestful/filter.go
Expand Up @@ -51,7 +51,7 @@ type filter struct {
}

func (f *filter) filter(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
if !f.tracer.Active() || f.requestIgnorer(req.Request) {
if !f.tracer.Recording() || f.requestIgnorer(req.Request) {
chain.ProcessFilter(req, resp)
return
}
Expand Down

0 comments on commit 45f723b

Please sign in to comment.