From 6e8ca3f1c886fa59b328008dfad60866679b60fe Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 26 Mar 2020 13:20:12 +0800 Subject: [PATCH] Introduce "recording" config We introduce the ELASTIC_APM_RECORDING configuration. This is a boolean configuration that defaults to true, controlling whether events are recorded and sent. When recording is true there should be no change; when false: - Transactions will always be recorded as "unsampled", and will be silently discarded when ended, without affecting tracer statistics - Spans will all be dropped by virtue of transactions all being unsampled - Captured errors will not have details filled in, and will be silently discarded when "sent" without affecting tracer statistics - Breakdown metrics will not be updated - Metrics gathering will be disabled Recording can be updated via central config and by using the new Tracer.SetRecording method. We also introduce a new Tracer.Recording method which reports whether events are being recorded. If the tracer is inactive (Tracer.Active returns false), then Tracer.Recording will also return false. This new method can be used by instrumentation to avoid expensive instrumentation paths when recording is disabled. We have updated all provided instrumentation modules to use the new Tracer.Recording method instead of Tracer.Active. --- CHANGELOG.asciidoc | 2 + config.go | 17 +++++ config_test.go | 54 ++++++++++++++ docs/configuration.asciidoc | 14 ++++ env_test.go | 1 + error.go | 52 ++++++++------ error_test.go | 16 +++++ metrics_test.go | 51 +++++++++++++- module/apmecho/middleware.go | 2 +- module/apmechov4/middleware.go | 2 +- module/apmgin/middleware.go | 2 +- module/apmgrpc/server.go | 2 +- module/apmhttp/handler.go | 2 +- module/apmhttprouter/handler.go | 2 +- module/apmlogrus/hook.go | 2 +- module/apmrestful/filter.go | 2 +- module/apmzap/core.go | 4 +- module/apmzerolog/writer.go | 2 +- profiling.go | 4 ++ tracer.go | 120 +++++++++++++++++++++++--------- transaction.go | 40 +++++++---- transaction_test.go | 19 +++++ 22 files changed, 333 insertions(+), 79 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index f6cd5aa4b..ddad2652f 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -23,6 +23,8 @@ endif::[] https://github.com/elastic/apm-agent-go/compare/v1.7.2...master[View commits] +- Add "recording" config option, to dynamically disable event recording {pull}737[(#737)] + [[release-notes-1.x]] === Go Agent version 1.x diff --git a/config.go b/config.go index 10f86e26b..9168312b1 100644 --- a/config.go +++ b/config.go @@ -46,6 +46,7 @@ const ( envEnvironment = "ELASTIC_APM_ENVIRONMENT" envSpanFramesMinDuration = "ELASTIC_APM_SPAN_FRAMES_MIN_DURATION" envActive = "ELASTIC_APM_ACTIVE" + envRecording = "ELASTIC_APM_RECORDING" envAPIRequestSize = "ELASTIC_APM_API_REQUEST_SIZE" envAPIRequestTime = "ELASTIC_APM_API_REQUEST_TIME" envAPIBufferSize = "ELASTIC_APM_API_BUFFER_SIZE" @@ -252,6 +253,10 @@ func initialActive() (bool, error) { return configutil.ParseBoolEnv(envActive, true) } +func initialRecording() (bool, error) { + return configutil.ParseBoolEnv(envRecording, true) +} + func initialDisabledMetrics() wildcard.Matchers { return configutil.ParseWildcardPatternsEnv(envDisableMetrics, nil) } @@ -341,6 +346,17 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string] cfg.maxSpans = value }) } + case envRecording: + recording, err := strconv.ParseBool(v) + if err != nil { + errorf("central config failure: failed to parse %s: %s", k, err) + delete(attrs, k) + continue + } else { + updates = append(updates, func(cfg *instrumentationConfig) { + cfg.recording = recording + }) + } case envTransactionSampleRate: sampler, err := parseSampleRate(k, v) if err != nil { @@ -438,6 +454,7 @@ type instrumentationConfig struct { // set the initial entry in instrumentationConfig.local, in order to properly reset // to the local value, even if the default is the zero value. type instrumentationConfigValues struct { + recording bool captureBody CaptureBodyMode captureHeaders bool maxSpans int diff --git a/config_test.go b/config_test.go index 4c5efff7b..621fac09c 100644 --- a/config_test.go +++ b/config_test.go @@ -245,3 +245,57 @@ func TestTracerConfigWatcherPrecedence(t *testing.T) { } } } + +func TestTracerCentralConfigRecording(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + + changes := make(chan apmconfig.Change) + watcherFunc := apmtest.WatchConfigFunc(func(ctx context.Context, params apmconfig.WatchParams) <-chan apmconfig.Change { + return changes + }) + tracer.SetLogger(apmtest.NewTestLogger(t)) + tracer.SetConfigWatcher(watcherFunc) + tracer.SetMetricsInterval(0) // disable periodic gathering + + checkRecording := func(expected bool) { + defer tracer.ResetPayloads() + tracer.StartTransaction("name", "type").End() + tracer.Flush(nil) + if expected { + require.True(t, tracer.Recording()) + tracer.SendMetrics(nil) + payloads := tracer.Payloads() + require.NotEmpty(t, payloads.Metrics) + require.NotEmpty(t, payloads.Transactions) + } else { + require.False(t, tracer.Recording()) + // testTracerMetricsNotRecording enables periodic + // gathering, checks that no gathering takes place + // (because we're expected not to be recording), + // and then disable periodic gathering again. + testTracerMetricsNotRecording(t, tracer) + payloads := tracer.Payloads() + require.Empty(t, payloads.Transactions) + } + } + updateRemoteConfig := func(attrs map[string]string) { + // We send twice as a means of waiting for the first change to be applied. + for i := 0; i < 2; i++ { + changes <- apmconfig.Change{Attrs: attrs} + } + } + + // Initially local config is in effect. + checkRecording(true) + + updateRemoteConfig(map[string]string{"recording": "false"}) + checkRecording(false) + + updateRemoteConfig(map[string]string{"recording": "true"}) + tracer.SetRecording(false) // not effective, remote config in effect + checkRecording(true) + + updateRemoteConfig(map[string]string{}) + checkRecording(false) // local config in effect now +} diff --git a/docs/configuration.asciidoc b/docs/configuration.asciidoc index e827afbf9..cca5f036d 100644 --- a/docs/configuration.asciidoc +++ b/docs/configuration.asciidoc @@ -212,6 +212,20 @@ You must use the query bar to filter for a specific environment in versions prio Enable or disable the agent. If set to false, then the Go agent does not send any data to the Elastic APM server, and instrumentation overhead is minimized. +[float] +[[config-recording]] +=== `ELASTIC_APM_RECORDING` + +[options="header"] +|============ +| Environment | Default | Example +| `ELASTIC_APM_RECORDING` | true | `false` +|============ + +Enable or disable recording of events. If set to false, then the Go agent does +send any events to the Elastic APM server, and instrumentation overhead is +minimized, but the agent will continue to poll the server for configuration changes. + [float] [[config-global-labels]] === `ELASTIC_APM_GLOBAL_LABELS` diff --git a/env_test.go b/env_test.go index a241f2b0d..63a6e78d1 100644 --- a/env_test.go +++ b/env_test.go @@ -334,6 +334,7 @@ func TestTracerActiveEnv(t *testing.T) { tracer, transport := transporttest.NewRecorderTracer() defer tracer.Close() assert.False(t, tracer.Active()) + assert.False(t, tracer.Recording()) // inactive => not recording tx := tracer.StartTransaction("name", "type") tx.End() diff --git a/error.go b/error.go index fcfd1b665..e9d284ccc 100644 --- a/error.go +++ b/error.go @@ -92,10 +92,12 @@ func (t *Tracer) NewError(err error) *Error { e := t.newError() e.cause = err e.err = err.Error() - rand.Read(e.ID[:]) // ignore error, can't do anything about it - initException(&e.exception, err, e.stackTraceLimit) - if len(e.exception.stacktrace) == 0 { - e.SetStacktrace(2) + if e.recording { + rand.Read(e.ID[:]) // ignore error, can't do anything about it + initException(&e.exception, err, e.stackTraceLimit) + if len(e.exception.stacktrace) == 0 { + e.SetStacktrace(2) + } } return e } @@ -108,20 +110,22 @@ func (t *Tracer) NewError(err error) *Error { // If r.Message is empty, "[EMPTY]" will be used. func (t *Tracer) NewErrorLog(r ErrorLogRecord) *Error { e := t.newError() - e.log = ErrorLogRecord{ - Message: truncateString(r.Message), - MessageFormat: truncateString(r.MessageFormat), - Level: truncateString(r.Level), - LoggerName: truncateString(r.LoggerName), - } - if e.log.Message == "" { - e.log.Message = "[EMPTY]" - } e.cause = r.Error e.err = e.log.Message - rand.Read(e.ID[:]) // ignore error, can't do anything about it - if r.Error != nil { - initException(&e.exception, r.Error, e.stackTraceLimit) + if e.recording { + e.log = ErrorLogRecord{ + Message: truncateString(r.Message), + MessageFormat: truncateString(r.MessageFormat), + Level: truncateString(r.Level), + LoggerName: truncateString(r.LoggerName), + } + if e.log.Message == "" { + e.log.Message = "[EMPTY]" + } + rand.Read(e.ID[:]) // ignore error, can't do anything about it + if r.Error != nil { + initException(&e.exception, r.Error, e.stackTraceLimit) + } } return e } @@ -137,11 +141,14 @@ func (t *Tracer) newError() *Error { }, } } - e.Timestamp = time.Now() instrumentationConfig := t.instrumentationConfig() - e.Context.captureHeaders = instrumentationConfig.captureHeaders - e.stackTraceLimit = instrumentationConfig.stackTraceLimit + e.recording = instrumentationConfig.recording + if e.recording { + e.Timestamp = time.Now() + e.Context.captureHeaders = instrumentationConfig.captureHeaders + e.stackTraceLimit = instrumentationConfig.stackTraceLimit + } return &Error{ErrorData: e} } @@ -166,6 +173,7 @@ type Error struct { // When the error is sent, its ErrorData field will be set to nil. type ErrorData struct { tracer *Tracer + recording bool stackTraceLimit int exception exceptionData log ErrorLogRecord @@ -306,7 +314,11 @@ func (e *Error) Send() { if e == nil || e.sent() { return } - e.ErrorData.enqueue() + if e.recording { + e.ErrorData.enqueue() + } else { + e.reset() + } e.ErrorData = nil } diff --git a/error_test.go b/error_test.go index 51a06503d..1d513fef2 100644 --- a/error_test.go +++ b/error_test.go @@ -244,6 +244,22 @@ func TestErrorNilError(t *testing.T) { assert.EqualError(t, e, "") } +func TestErrorNotRecording(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + tracer.SetRecording(false) + + e := tracer.NewError(errors.New("boom")) + require.NotNil(t, e) + require.NotNil(t, e.ErrorData) + e.Send() + require.Nil(t, e.ErrorData) + tracer.Flush(nil) + + payloads := tracer.Payloads() + require.Empty(t, payloads.Errors) +} + func TestErrorTransactionSampled(t *testing.T) { _, _, errors := apmtest.WithTransaction(func(ctx context.Context) { apm.TransactionFromContext(ctx).Type = "foo" diff --git a/metrics_test.go b/metrics_test.go index abc38edac..c6ad28e77 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -35,6 +35,7 @@ import ( "github.com/stretchr/testify/require" "go.elastic.co/apm" + "go.elastic.co/apm/apmtest" "go.elastic.co/apm/model" "go.elastic.co/apm/transport/transporttest" ) @@ -281,7 +282,7 @@ func TestTracerMetricsBuffered(t *testing.T) { } } -func TestTracerMetricsDisable(t *testing.T) { +func TestTracerDisableMetrics(t *testing.T) { os.Setenv("ELASTIC_APM_DISABLE_METRICS", "golang.heap.*, system.memory.*, system.process.*") defer os.Unsetenv("ELASTIC_APM_DISABLE_METRICS") @@ -302,6 +303,54 @@ func TestTracerMetricsDisable(t *testing.T) { assert.EqualValues(t, expected, actual) } +func TestTracerMetricsNotRecording(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + tracer.SetRecording(false) + testTracerMetricsNotRecording(t, tracer) +} + +func testTracerMetricsNotRecording(t *testing.T, tracer *apmtest.RecordingTracer) { + done := make(chan struct{}) + defer close(done) + + gathered := make(chan struct{}) + tracer.RegisterMetricsGatherer(apm.GatherMetricsFunc( + func(ctx context.Context, m *apm.Metrics) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-done: + case gathered <- struct{}{}: + } + return nil + }, + )) + + tracer.SetMetricsInterval(time.Millisecond) + defer tracer.SetMetricsInterval(0) // disable at end + + sent := make(chan struct{}) + go func() { + defer close(sent) + tracer.SendMetrics(nil) // unblocked by tracer.Close + }() + + // Because the tracer is configured to not record, + // the metrics gatherer should never be called. + select { + case <-time.After(100 * time.Millisecond): + case <-sent: + t.Fatal("expected SendMetrics to block") + case <-gathered: + t.Fatal("unexpected metrics gatherer call") + } + + tracer.Flush(nil) // empty queue, should not block + payloads := tracer.Payloads() + require.Empty(t, payloads.Metrics) +} + // busyWork does meaningless work for the specified duration, // so we can observe CPU usage. func busyWork(d time.Duration) int { diff --git a/module/apmecho/middleware.go b/module/apmecho/middleware.go index c68c99817..3a4eca13b 100644 --- a/module/apmecho/middleware.go +++ b/module/apmecho/middleware.go @@ -64,7 +64,7 @@ type middleware struct { func (m *middleware) handle(c echo.Context) error { req := c.Request() - if !m.tracer.Active() || m.requestIgnorer(req) { + if !m.tracer.Recording() || m.requestIgnorer(req) { return m.handler(c) } name := req.Method + " " + c.Path() diff --git a/module/apmechov4/middleware.go b/module/apmechov4/middleware.go index f184e2fd5..5ede3d346 100644 --- a/module/apmechov4/middleware.go +++ b/module/apmechov4/middleware.go @@ -66,7 +66,7 @@ type middleware struct { func (m *middleware) handle(c echo.Context) error { req := c.Request() - if !m.tracer.Active() || m.requestIgnorer(req) { + if !m.tracer.Recording() || m.requestIgnorer(req) { return m.handler(c) } name := req.Method + " " + c.Path() diff --git a/module/apmgin/middleware.go b/module/apmgin/middleware.go index 7cf8ff53e..9b847dbf4 100644 --- a/module/apmgin/middleware.go +++ b/module/apmgin/middleware.go @@ -69,7 +69,7 @@ type routeInfo struct { } func (m *middleware) handle(c *gin.Context) { - if !m.tracer.Active() || m.requestIgnorer(c.Request) { + if !m.tracer.Recording() || m.requestIgnorer(c.Request) { c.Next() return } diff --git a/module/apmgrpc/server.go b/module/apmgrpc/server.go index 755fd4a06..7fb288c2b 100644 --- a/module/apmgrpc/server.go +++ b/module/apmgrpc/server.go @@ -63,7 +63,7 @@ func NewUnaryServerInterceptor(o ...ServerOption) grpc.UnaryServerInterceptor { info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (resp interface{}, err error) { - if !opts.tracer.Active() || opts.requestIgnorer(info) { + if !opts.tracer.Recording() || opts.requestIgnorer(info) { return handler(ctx, req) } tx, ctx := startTransaction(ctx, opts.tracer, info.FullMethod) diff --git a/module/apmhttp/handler.go b/module/apmhttp/handler.go index 8aecf24ba..d786ee46f 100644 --- a/module/apmhttp/handler.go +++ b/module/apmhttp/handler.go @@ -67,7 +67,7 @@ type handler struct { // ServeHTTP delegates to h.Handler, tracing the transaction with // h.Tracer, or apm.DefaultTracer if h.Tracer is nil. func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - if !h.tracer.Active() || h.requestIgnorer(req) { + if !h.tracer.Recording() || h.requestIgnorer(req) { h.handler.ServeHTTP(w, req) return } diff --git a/module/apmhttprouter/handler.go b/module/apmhttprouter/handler.go index 19718ae60..f1b1c790d 100644 --- a/module/apmhttprouter/handler.go +++ b/module/apmhttprouter/handler.go @@ -38,7 +38,7 @@ import ( func Wrap(h httprouter.Handle, route string, o ...Option) httprouter.Handle { opts := gatherOptions(o...) return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { - if !opts.tracer.Active() || opts.requestIgnorer(req) { + if !opts.tracer.Recording() || opts.requestIgnorer(req) { h(w, req, p) return } diff --git a/module/apmlogrus/hook.go b/module/apmlogrus/hook.go index a2bb2912d..02db188ce 100644 --- a/module/apmlogrus/hook.go +++ b/module/apmlogrus/hook.go @@ -87,7 +87,7 @@ func (h *Hook) Levels() []logrus.Level { // Fire reports the log entry as an error to the APM Server. func (h *Hook) Fire(entry *logrus.Entry) error { tracer := h.tracer() - if !tracer.Active() { + if !tracer.Recording() { return nil } diff --git a/module/apmrestful/filter.go b/module/apmrestful/filter.go index c8a2a897c..ba09ea056 100644 --- a/module/apmrestful/filter.go +++ b/module/apmrestful/filter.go @@ -51,7 +51,7 @@ type filter struct { } func (f *filter) filter(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) { - if !f.tracer.Active() || f.requestIgnorer(req.Request) { + if !f.tracer.Recording() || f.requestIgnorer(req.Request) { chain.ProcessFilter(req, resp) return } diff --git a/module/apmzap/core.go b/module/apmzap/core.go index 5d84d8ed1..60d5fb6f9 100644 --- a/module/apmzap/core.go +++ b/module/apmzap/core.go @@ -87,7 +87,7 @@ func (c *Core) With(fields []zapcore.Field) zapcore.Core { // Check checks if the entry should be logged, and adds c to checked if so. func (c *Core) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry { - if entry.Level < zapcore.ErrorLevel || !c.tracer().Active() { + if entry.Level < zapcore.ErrorLevel || !c.tracer().Recording() { return checked } return checked.AddCore(entry, c) @@ -122,7 +122,7 @@ func (c *contextCore) With(fields []zapcore.Field) zapcore.Core { } func (c *contextCore) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry { - if entry.Level < zapcore.ErrorLevel || !c.core.tracer().Active() { + if entry.Level < zapcore.ErrorLevel || !c.core.tracer().Recording() { return checked } return checked.AddCore(entry, c) diff --git a/module/apmzerolog/writer.go b/module/apmzerolog/writer.go index 964e03126..0c9b0879d 100644 --- a/module/apmzerolog/writer.go +++ b/module/apmzerolog/writer.go @@ -108,7 +108,7 @@ func (w *Writer) WriteLevel(level zerolog.Level, p []byte) (int, error) { return len(p), nil } tracer := w.tracer() - if !tracer.Active() { + if !tracer.Recording() { return len(p), nil } var logRecord logRecord diff --git a/profiling.go b/profiling.go index 7cc3fe9af..42f2bb946 100644 --- a/profiling.go +++ b/profiling.go @@ -136,6 +136,10 @@ func (state *profilingState) start(ctx context.Context, logger Logger, metadata if logger != nil && ctx.Err() == nil { logger.Errorf("failed to send %s profile: %s", state.profileType, err) } + return + } + if logger != nil { + logger.Debugf("sent %s profile", state.profileType) } }() } diff --git a/tracer.go b/tracer.go index 3170e2f22..867f13252 100644 --- a/tracer.go +++ b/tracer.go @@ -108,6 +108,7 @@ type TracerOptions struct { spanFramesMinDuration time.Duration stackTraceLimit int active bool + recording bool configWatcher apmconfig.Watcher breakdownMetrics bool propagateLegacyHeader bool @@ -192,6 +193,11 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { active = true } + recording, err := initialRecording() + if failed(err) { + recording = true + } + centralConfigEnabled, err := initialCentralConfigEnabled() if failed(err) { centralConfigEnabled = true @@ -246,6 +252,7 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { opts.spanFramesMinDuration = spanFramesMinDuration opts.stackTraceLimit = stackTraceLimit opts.active = active + opts.recording = recording opts.propagateLegacyHeader = propagateLegacyHeader if opts.Transport == nil { opts.Transport = transport.Default @@ -379,6 +386,9 @@ func newTracer(opts TracerOptions) *Tracer { t.breakdownMetrics.enabled = opts.breakdownMetrics // Initialise local transaction config. + t.setLocalInstrumentationConfig(envRecording, func(cfg *instrumentationConfigValues) { + cfg.recording = opts.recording + }) t.setLocalInstrumentationConfig(envCaptureBody, func(cfg *instrumentationConfigValues) { cfg.captureBody = opts.captureBody }) @@ -409,6 +419,7 @@ func newTracer(opts TracerOptions) *Tracer { go t.loop() t.configCommands <- func(cfg *tracerConfig) { + cfg.recording = opts.recording cfg.cpuProfileInterval = opts.cpuProfileInterval cfg.cpuProfileDuration = opts.cpuProfileDuration cfg.heapProfileInterval = opts.heapProfileInterval @@ -433,6 +444,7 @@ func newTracer(opts TracerOptions) *Tracer { // tracerConfig holds the tracer's runtime configuration, which may be modified // by sending a tracerConfigCommand to the tracer's configCommands channel. type tracerConfig struct { + recording bool requestSize int requestDuration time.Duration metricsInterval time.Duration @@ -476,6 +488,15 @@ func (t *Tracer) Flush(abort <-chan struct{}) { } } +// Recording reports whether the tracer is recording events. Instrumentation +// may use this to avoid creating transactions, spans, and metrics when the +// tracer is configured to not record. +// +// Recording will also return false if the tracer is inactive. +func (t *Tracer) Recording() bool { + return t.instrumentationConfig().recording && t.Active() +} + // Active reports whether the tracer is active. If the tracer is inactive, // no transactions or errors will be sent to the Elastic APM server. func (t *Tracer) Active() bool { @@ -596,6 +617,21 @@ func (t *Tracer) sendConfigCommand(cmd tracerConfigCommand) { } } +// SetRecording enables or disables recording of future events. +// +// SetRecording does not affect in-flight events. +func (t *Tracer) SetRecording(r bool) { + t.setLocalInstrumentationConfig(envRecording, func(cfg *instrumentationConfigValues) { + // Update instrumentation config to disable transactions and errors. + cfg.recording = r + }) + t.sendConfigCommand(func(cfg *tracerConfig) { + // Consult t.instrumentationConfig() as local config may not be in effect, + // or there may have been a concurrent change to instrumentation config. + cfg.recording = t.instrumentationConfig().recording + }) +} + // SetSampler sets the sampler the tracer. // // It is valid to pass nil, in which case all transactions will be sampled. @@ -763,6 +799,46 @@ func (t *Tracer) loop() { stats: &stats, } + handleTracerConfigCommand := func(cmd tracerConfigCommand) { + var oldMetricsInterval time.Duration + if cfg.recording { + oldMetricsInterval = cfg.metricsInterval + } + cmd(&cfg) + var metricsInterval, cpuProfileInterval, cpuProfileDuration, heapProfileInterval time.Duration + if cfg.recording { + metricsInterval = cfg.metricsInterval + cpuProfileInterval = cfg.cpuProfileInterval + cpuProfileDuration = cfg.cpuProfileDuration + heapProfileInterval = cfg.heapProfileInterval + } + + cpuProfilingState.updateConfig(cpuProfileInterval, cpuProfileDuration) + heapProfilingState.updateConfig(heapProfileInterval, 0) + if !gatheringMetrics && metricsInterval != oldMetricsInterval { + if metricsTimerStart.IsZero() { + if metricsInterval > 0 { + metricsTimer.Reset(metricsInterval) + metricsTimerStart = time.Now() + } + } else { + if metricsInterval <= 0 { + metricsTimerStart = time.Time{} + if !metricsTimer.Stop() { + <-metricsTimer.C + } + } else { + alreadyPassed := time.Since(metricsTimerStart) + if alreadyPassed >= metricsInterval { + metricsTimer.Reset(0) + } else { + metricsTimer.Reset(metricsInterval - alreadyPassed) + } + } + } + } + } + for { var gatherMetrics bool select { @@ -771,32 +847,7 @@ func (t *Tracer) loop() { iochanReader.CloseRead(io.EOF) return case cmd := <-t.configCommands: - oldMetricsInterval := cfg.metricsInterval - cmd(&cfg) - cpuProfilingState.updateConfig(cfg.cpuProfileInterval, cfg.cpuProfileDuration) - heapProfilingState.updateConfig(cfg.heapProfileInterval, 0) - if !gatheringMetrics && cfg.metricsInterval != oldMetricsInterval { - if metricsTimerStart.IsZero() { - if cfg.metricsInterval > 0 { - metricsTimer.Reset(cfg.metricsInterval) - metricsTimerStart = time.Now() - } - } else { - if cfg.metricsInterval <= 0 { - metricsTimerStart = time.Time{} - if !metricsTimer.Stop() { - <-metricsTimer.C - } - } else { - alreadyPassed := time.Since(metricsTimerStart) - if alreadyPassed >= cfg.metricsInterval { - metricsTimer.Reset(0) - } else { - metricsTimer.Reset(cfg.metricsInterval - alreadyPassed) - } - } - } - } + handleTracerConfigCommand(cmd) continue case cw := <-t.configWatcher: if configChanges != nil { @@ -833,6 +884,9 @@ func (t *Tracer) loop() { } else { t.updateRemoteConfig(cfg.logger, lastConfigChange, change.Attrs) lastConfigChange = change.Attrs + handleTracerConfigCommand(func(cfg *tracerConfig) { + cfg.recording = t.instrumentationConfig().recording + }) } continue case event := <-t.events: @@ -859,18 +913,20 @@ func (t *Tracer) loop() { metricsTimerStart = time.Time{} gatherMetrics = !gatheringMetrics case sentMetrics = <-t.forceSendMetrics: - if !metricsTimerStart.IsZero() { - if !metricsTimer.Stop() { - <-metricsTimer.C + if cfg.recording { + if !metricsTimerStart.IsZero() { + if !metricsTimer.Stop() { + <-metricsTimer.C + } + metricsTimerStart = time.Time{} } - metricsTimerStart = time.Time{} + gatherMetrics = !gatheringMetrics } - gatherMetrics = !gatheringMetrics case <-gatheredMetrics: modelWriter.writeMetrics(&metrics) gatheringMetrics = false flushRequest = true - if cfg.metricsInterval > 0 { + if cfg.recording && cfg.metricsInterval > 0 { metricsTimerStart = time.Now() metricsTimer.Reset(cfg.metricsInterval) } diff --git a/transaction.go b/transaction.go index 1e2ce56e9..5536919cd 100644 --- a/transaction.go +++ b/transaction.go @@ -53,8 +53,20 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran } tx := &Transaction{tracer: t, TransactionData: td} - tx.Name = name - tx.Type = transactionType + // Take a snapshot of config that should apply to all spans within the + // transaction. + instrumentationConfig := t.instrumentationConfig() + tx.recording = instrumentationConfig.recording + if !tx.recording { + return tx + } + + tx.maxSpans = instrumentationConfig.maxSpans + tx.spanFramesMinDuration = instrumentationConfig.spanFramesMinDuration + tx.stackTraceLimit = instrumentationConfig.stackTraceLimit + tx.Context.captureHeaders = instrumentationConfig.captureHeaders + tx.propagateLegacyHeader = instrumentationConfig.propagateLegacyHeader + tx.breakdownMetricsEnabled = t.breakdownMetrics.enabled var root bool if opts.TraceContext.Trace.Validate() == nil { @@ -84,16 +96,6 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran } } - // Take a snapshot of config that should apply to all spans within the - // transaction. - instrumentationConfig := t.instrumentationConfig() - tx.maxSpans = instrumentationConfig.maxSpans - tx.spanFramesMinDuration = instrumentationConfig.spanFramesMinDuration - tx.stackTraceLimit = instrumentationConfig.stackTraceLimit - tx.Context.captureHeaders = instrumentationConfig.captureHeaders - tx.breakdownMetricsEnabled = t.breakdownMetrics.enabled - tx.propagateLegacyHeader = instrumentationConfig.propagateLegacyHeader - if root { sampler := instrumentationConfig.sampler if sampler == nil || sampler.Sample(tx.traceContext) { @@ -108,6 +110,9 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran // applications may end up being sampled at a very high rate. tx.traceContext.Options = opts.TraceContext.Options } + + tx.Name = name + tx.Type = transactionType tx.timestamp = opts.Start if tx.timestamp.IsZero() { tx.timestamp = time.Now() @@ -234,10 +239,14 @@ func (tx *Transaction) End() { if tx.ended() { return } - if tx.Duration < 0 { - tx.Duration = time.Since(tx.timestamp) + if tx.recording { + if tx.Duration < 0 { + tx.Duration = time.Since(tx.timestamp) + } + tx.enqueue() + } else { + tx.reset(tx.tracer) } - tx.enqueue() tx.TransactionData = nil } @@ -291,6 +300,7 @@ type TransactionData struct { // Result holds the transaction result. Result string + recording bool maxSpans int spanFramesMinDuration time.Duration stackTraceLimit int diff --git a/transaction_test.go b/transaction_test.go index 935cd82b4..4692ff0d3 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -156,6 +156,25 @@ func TestTransactionContextNotSampled(t *testing.T) { assert.Nil(t, payloads.Transactions[0].Context) } +func TestTransactionNotRecording(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + tracer.SetRecording(false) + tracer.SetSampler(samplerFunc(func(apm.TraceContext) bool { + panic("should not be called") + })) + + tx := tracer.StartTransaction("name", "type") + require.NotNil(t, tx) + require.NotNil(t, tx.TransactionData) + tx.End() + require.Nil(t, tx.TransactionData) + tracer.Flush(nil) + + payloads := tracer.Payloads() + require.Empty(t, payloads.Transactions) +} + func BenchmarkTransaction(b *testing.B) { tracer, err := apm.NewTracer("service", "") require.NoError(b, err)