Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce "recording" config #737

Merged
merged 1 commit into from Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -23,6 +23,8 @@ endif::[]

https://github.com/elastic/apm-agent-go/compare/v1.7.2...master[View commits]

- Add "recording" config option, to dynamically disable event recording {pull}737[(#737)]

[[release-notes-1.x]]
=== Go Agent version 1.x

Expand Down
17 changes: 17 additions & 0 deletions config.go
Expand Up @@ -46,6 +46,7 @@ const (
envEnvironment = "ELASTIC_APM_ENVIRONMENT"
envSpanFramesMinDuration = "ELASTIC_APM_SPAN_FRAMES_MIN_DURATION"
envActive = "ELASTIC_APM_ACTIVE"
envRecording = "ELASTIC_APM_RECORDING"
envAPIRequestSize = "ELASTIC_APM_API_REQUEST_SIZE"
envAPIRequestTime = "ELASTIC_APM_API_REQUEST_TIME"
envAPIBufferSize = "ELASTIC_APM_API_BUFFER_SIZE"
Expand Down Expand Up @@ -252,6 +253,10 @@ func initialActive() (bool, error) {
return configutil.ParseBoolEnv(envActive, true)
}

func initialRecording() (bool, error) {
return configutil.ParseBoolEnv(envRecording, true)
}

func initialDisabledMetrics() wildcard.Matchers {
return configutil.ParseWildcardPatternsEnv(envDisableMetrics, nil)
}
Expand Down Expand Up @@ -341,6 +346,17 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string]
cfg.maxSpans = value
})
}
case envRecording:
recording, err := strconv.ParseBool(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.recording = recording
})
}
case envTransactionSampleRate:
sampler, err := parseSampleRate(k, v)
if err != nil {
Expand Down Expand Up @@ -438,6 +454,7 @@ type instrumentationConfig struct {
// set the initial entry in instrumentationConfig.local, in order to properly reset
// to the local value, even if the default is the zero value.
type instrumentationConfigValues struct {
recording bool
captureBody CaptureBodyMode
captureHeaders bool
maxSpans int
Expand Down
54 changes: 54 additions & 0 deletions config_test.go
Expand Up @@ -245,3 +245,57 @@ func TestTracerConfigWatcherPrecedence(t *testing.T) {
}
}
}

func TestTracerCentralConfigRecording(t *testing.T) {
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()

changes := make(chan apmconfig.Change)
watcherFunc := apmtest.WatchConfigFunc(func(ctx context.Context, params apmconfig.WatchParams) <-chan apmconfig.Change {
return changes
})
tracer.SetLogger(apmtest.NewTestLogger(t))
tracer.SetConfigWatcher(watcherFunc)
tracer.SetMetricsInterval(0) // disable periodic gathering

checkRecording := func(expected bool) {
defer tracer.ResetPayloads()
tracer.StartTransaction("name", "type").End()
tracer.Flush(nil)
if expected {
require.True(t, tracer.Recording())
tracer.SendMetrics(nil)
payloads := tracer.Payloads()
require.NotEmpty(t, payloads.Metrics)
require.NotEmpty(t, payloads.Transactions)
} else {
require.False(t, tracer.Recording())
// testTracerMetricsNotRecording enables periodic
// gathering, checks that no gathering takes place
// (because we're expected not to be recording),
// and then disable periodic gathering again.
testTracerMetricsNotRecording(t, tracer)
payloads := tracer.Payloads()
require.Empty(t, payloads.Transactions)
}
}
updateRemoteConfig := func(attrs map[string]string) {
// We send twice as a means of waiting for the first change to be applied.
for i := 0; i < 2; i++ {
changes <- apmconfig.Change{Attrs: attrs}
}
}

// Initially local config is in effect.
checkRecording(true)

updateRemoteConfig(map[string]string{"recording": "false"})
checkRecording(false)

updateRemoteConfig(map[string]string{"recording": "true"})
tracer.SetRecording(false) // not effective, remote config in effect
checkRecording(true)

updateRemoteConfig(map[string]string{})
checkRecording(false) // local config in effect now
}
14 changes: 14 additions & 0 deletions docs/configuration.asciidoc
Expand Up @@ -212,6 +212,20 @@ You must use the query bar to filter for a specific environment in versions prio
Enable or disable the agent. If set to false, then the Go agent does not send
any data to the Elastic APM server, and instrumentation overhead is minimized.

[float]
[[config-recording]]
=== `ELASTIC_APM_RECORDING`

[options="header"]
|============
| Environment | Default | Example
| `ELASTIC_APM_RECORDING` | true | `false`
|============

Enable or disable recording of events. If set to false, then the Go agent does
axw marked this conversation as resolved.
Show resolved Hide resolved
send any events to the Elastic APM server, and instrumentation overhead is
minimized, but the agent will continue to poll the server for configuration changes.

[float]
[[config-global-labels]]
=== `ELASTIC_APM_GLOBAL_LABELS`
Expand Down
1 change: 1 addition & 0 deletions env_test.go
Expand Up @@ -334,6 +334,7 @@ func TestTracerActiveEnv(t *testing.T) {
tracer, transport := transporttest.NewRecorderTracer()
defer tracer.Close()
assert.False(t, tracer.Active())
assert.False(t, tracer.Recording()) // inactive => not recording

tx := tracer.StartTransaction("name", "type")
tx.End()
Expand Down
52 changes: 32 additions & 20 deletions error.go
Expand Up @@ -92,10 +92,12 @@ func (t *Tracer) NewError(err error) *Error {
e := t.newError()
e.cause = err
e.err = err.Error()
rand.Read(e.ID[:]) // ignore error, can't do anything about it
initException(&e.exception, err, e.stackTraceLimit)
if len(e.exception.stacktrace) == 0 {
e.SetStacktrace(2)
if e.recording {
rand.Read(e.ID[:]) // ignore error, can't do anything about it
initException(&e.exception, err, e.stackTraceLimit)
if len(e.exception.stacktrace) == 0 {
e.SetStacktrace(2)
}
}
return e
}
Expand All @@ -108,20 +110,22 @@ func (t *Tracer) NewError(err error) *Error {
// If r.Message is empty, "[EMPTY]" will be used.
func (t *Tracer) NewErrorLog(r ErrorLogRecord) *Error {
e := t.newError()
e.log = ErrorLogRecord{
Message: truncateString(r.Message),
MessageFormat: truncateString(r.MessageFormat),
Level: truncateString(r.Level),
LoggerName: truncateString(r.LoggerName),
}
if e.log.Message == "" {
e.log.Message = "[EMPTY]"
}
e.cause = r.Error
e.err = e.log.Message
rand.Read(e.ID[:]) // ignore error, can't do anything about it
if r.Error != nil {
initException(&e.exception, r.Error, e.stackTraceLimit)
if e.recording {
e.log = ErrorLogRecord{
Message: truncateString(r.Message),
MessageFormat: truncateString(r.MessageFormat),
Level: truncateString(r.Level),
LoggerName: truncateString(r.LoggerName),
}
if e.log.Message == "" {
e.log.Message = "[EMPTY]"
}
rand.Read(e.ID[:]) // ignore error, can't do anything about it
if r.Error != nil {
initException(&e.exception, r.Error, e.stackTraceLimit)
}
}
return e
}
Expand All @@ -137,11 +141,14 @@ func (t *Tracer) newError() *Error {
},
}
}
e.Timestamp = time.Now()

instrumentationConfig := t.instrumentationConfig()
e.Context.captureHeaders = instrumentationConfig.captureHeaders
e.stackTraceLimit = instrumentationConfig.stackTraceLimit
e.recording = instrumentationConfig.recording
if e.recording {
e.Timestamp = time.Now()
e.Context.captureHeaders = instrumentationConfig.captureHeaders
e.stackTraceLimit = instrumentationConfig.stackTraceLimit
}

return &Error{ErrorData: e}
}
Expand All @@ -166,6 +173,7 @@ type Error struct {
// When the error is sent, its ErrorData field will be set to nil.
type ErrorData struct {
tracer *Tracer
recording bool
stackTraceLimit int
exception exceptionData
log ErrorLogRecord
Expand Down Expand Up @@ -306,7 +314,11 @@ func (e *Error) Send() {
if e == nil || e.sent() {
return
}
e.ErrorData.enqueue()
if e.recording {
e.ErrorData.enqueue()
} else {
e.reset()
}
e.ErrorData = nil
}

Expand Down
16 changes: 16 additions & 0 deletions error_test.go
Expand Up @@ -244,6 +244,22 @@ func TestErrorNilError(t *testing.T) {
assert.EqualError(t, e, "")
}

func TestErrorNotRecording(t *testing.T) {
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()
tracer.SetRecording(false)

e := tracer.NewError(errors.New("boom"))
require.NotNil(t, e)
require.NotNil(t, e.ErrorData)
e.Send()
require.Nil(t, e.ErrorData)
tracer.Flush(nil)

payloads := tracer.Payloads()
require.Empty(t, payloads.Errors)
}

func TestErrorTransactionSampled(t *testing.T) {
_, _, errors := apmtest.WithTransaction(func(ctx context.Context) {
apm.TransactionFromContext(ctx).Type = "foo"
Expand Down
51 changes: 50 additions & 1 deletion metrics_test.go
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/stretchr/testify/require"

"go.elastic.co/apm"
"go.elastic.co/apm/apmtest"
"go.elastic.co/apm/model"
"go.elastic.co/apm/transport/transporttest"
)
Expand Down Expand Up @@ -281,7 +282,7 @@ func TestTracerMetricsBuffered(t *testing.T) {
}
}

func TestTracerMetricsDisable(t *testing.T) {
func TestTracerDisableMetrics(t *testing.T) {
os.Setenv("ELASTIC_APM_DISABLE_METRICS", "golang.heap.*, system.memory.*, system.process.*")
defer os.Unsetenv("ELASTIC_APM_DISABLE_METRICS")

Expand All @@ -302,6 +303,54 @@ func TestTracerMetricsDisable(t *testing.T) {
assert.EqualValues(t, expected, actual)
}

func TestTracerMetricsNotRecording(t *testing.T) {
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()
tracer.SetRecording(false)
testTracerMetricsNotRecording(t, tracer)
}

func testTracerMetricsNotRecording(t *testing.T, tracer *apmtest.RecordingTracer) {
done := make(chan struct{})
defer close(done)

gathered := make(chan struct{})
tracer.RegisterMetricsGatherer(apm.GatherMetricsFunc(
func(ctx context.Context, m *apm.Metrics) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
case gathered <- struct{}{}:
}
return nil
},
))

tracer.SetMetricsInterval(time.Millisecond)
defer tracer.SetMetricsInterval(0) // disable at end

sent := make(chan struct{})
go func() {
defer close(sent)
tracer.SendMetrics(nil) // unblocked by tracer.Close
}()

// Because the tracer is configured to not record,
// the metrics gatherer should never be called.
select {
case <-time.After(100 * time.Millisecond):
case <-sent:
t.Fatal("expected SendMetrics to block")
case <-gathered:
t.Fatal("unexpected metrics gatherer call")
}

tracer.Flush(nil) // empty queue, should not block
payloads := tracer.Payloads()
require.Empty(t, payloads.Metrics)
}

// busyWork does meaningless work for the specified duration,
// so we can observe CPU usage.
func busyWork(d time.Duration) int {
Expand Down
2 changes: 1 addition & 1 deletion module/apmecho/middleware.go
Expand Up @@ -64,7 +64,7 @@ type middleware struct {

func (m *middleware) handle(c echo.Context) error {
req := c.Request()
if !m.tracer.Active() || m.requestIgnorer(req) {
if !m.tracer.Recording() || m.requestIgnorer(req) {
return m.handler(c)
}
name := req.Method + " " + c.Path()
Expand Down
2 changes: 1 addition & 1 deletion module/apmechov4/middleware.go
Expand Up @@ -66,7 +66,7 @@ type middleware struct {

func (m *middleware) handle(c echo.Context) error {
req := c.Request()
if !m.tracer.Active() || m.requestIgnorer(req) {
if !m.tracer.Recording() || m.requestIgnorer(req) {
return m.handler(c)
}
name := req.Method + " " + c.Path()
Expand Down
2 changes: 1 addition & 1 deletion module/apmgin/middleware.go
Expand Up @@ -69,7 +69,7 @@ type routeInfo struct {
}

func (m *middleware) handle(c *gin.Context) {
if !m.tracer.Active() || m.requestIgnorer(c.Request) {
if !m.tracer.Recording() || m.requestIgnorer(c.Request) {
c.Next()
return
}
Expand Down
2 changes: 1 addition & 1 deletion module/apmgrpc/server.go
Expand Up @@ -63,7 +63,7 @@ func NewUnaryServerInterceptor(o ...ServerOption) grpc.UnaryServerInterceptor {
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
if !opts.tracer.Active() || opts.requestIgnorer(info) {
if !opts.tracer.Recording() || opts.requestIgnorer(info) {
return handler(ctx, req)
}
tx, ctx := startTransaction(ctx, opts.tracer, info.FullMethod)
Expand Down
2 changes: 1 addition & 1 deletion module/apmhttp/handler.go
Expand Up @@ -67,7 +67,7 @@ type handler struct {
// ServeHTTP delegates to h.Handler, tracing the transaction with
// h.Tracer, or apm.DefaultTracer if h.Tracer is nil.
func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if !h.tracer.Active() || h.requestIgnorer(req) {
if !h.tracer.Recording() || h.requestIgnorer(req) {
h.handler.ServeHTTP(w, req)
return
}
Expand Down
2 changes: 1 addition & 1 deletion module/apmhttprouter/handler.go
Expand Up @@ -38,7 +38,7 @@ import (
func Wrap(h httprouter.Handle, route string, o ...Option) httprouter.Handle {
opts := gatherOptions(o...)
return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
if !opts.tracer.Active() || opts.requestIgnorer(req) {
if !opts.tracer.Recording() || opts.requestIgnorer(req) {
h(w, req, p)
return
}
Expand Down
2 changes: 1 addition & 1 deletion module/apmlogrus/hook.go
Expand Up @@ -87,7 +87,7 @@ func (h *Hook) Levels() []logrus.Level {
// Fire reports the log entry as an error to the APM Server.
func (h *Hook) Fire(entry *logrus.Entry) error {
tracer := h.tracer()
if !tracer.Active() {
if !tracer.Recording() {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion module/apmrestful/filter.go
Expand Up @@ -51,7 +51,7 @@ type filter struct {
}

func (f *filter) filter(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
if !f.tracer.Active() || f.requestIgnorer(req.Request) {
if !f.tracer.Recording() || f.requestIgnorer(req.Request) {
chain.ProcessFilter(req, resp)
return
}
Expand Down