Skip to content

Commit

Permalink
More central configuration enablement
Browse files Browse the repository at this point in the history
Enable central configuration for:
 - stack_frames_min_duration
 - stack_trace_limit
  • Loading branch information
axw committed Apr 1, 2020
1 parent 45f723b commit f25d46b
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 99 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -24,6 +24,7 @@ endif::[]
https://github.com/elastic/apm-agent-go/compare/v1.7.2...master[View commits]
- Add "recording" config option, to dynamically disable event recording {pull}737[(#737)]
- Enable central configuration of "stack_frames_min_duration" and "stack_trace_limit" {pull}742[(#742)]
[[release-notes-1.x]]
=== Go Agent version 1.x
Expand Down
22 changes: 22 additions & 0 deletions config.go
Expand Up @@ -357,6 +357,28 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string]
cfg.recording = recording
})
}
case envSpanFramesMinDuration:
duration, err := configutil.ParseDuration(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.spanFramesMinDuration = duration
})
}
case envStackTraceLimit:
limit, err := strconv.Atoi(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.stackTraceLimit = limit
})
}
case envTransactionSampleRate:
sampler, err := parseSampleRate(k, v)
if err != nil {
Expand Down
159 changes: 60 additions & 99 deletions config_test.go
Expand Up @@ -24,59 +24,80 @@ import (
"net/http/httptest"
"net/url"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.elastic.co/apm"
"go.elastic.co/apm/apmconfig"
"go.elastic.co/apm/apmtest"
"go.elastic.co/apm/transport"
"go.elastic.co/apm/transport/transporttest"
)

func TestTracerCentralConfigUpdate(t *testing.T) {
run := func(configKey, configValue string, isRemote func(*apm.Tracer) bool) {
run := func(configKey, configValue string, isRemote func(*apmtest.RecordingTracer) bool) {
t.Run(configKey, func(t *testing.T) {
response, _ := json.Marshal(map[string]string{configKey: configValue})
testTracerCentralConfigUpdate(t, string(response), isRemote)
})
}
run("transaction_sample_rate", "0", func(tracer *apm.Tracer) bool {
run("transaction_sample_rate", "0", func(tracer *apmtest.RecordingTracer) bool {
return !tracer.StartTransaction("name", "type").Sampled()
})
run("transaction_max_spans", "0", func(tracer *apm.Tracer) bool {
run("transaction_max_spans", "0", func(tracer *apmtest.RecordingTracer) bool {
return tracer.StartTransaction("name", "type").StartSpan("name", "type", nil).Dropped()
})
run("capture_body", "all", func(tracer *apm.Tracer) bool {
run("capture_body", "all", func(tracer *apmtest.RecordingTracer) bool {
req, _ := http.NewRequest("POST", "/", strings.NewReader("..."))
capturer := tracer.CaptureHTTPRequestBody(req)
return capturer != nil
})
run("recording", "false", func(tracer *apmtest.RecordingTracer) bool {
return !tracer.Recording()
})
run("span_frames_min_duration", "1ms", func(tracer *apmtest.RecordingTracer) bool {
tracer.ResetPayloads()

tx := tracer.StartTransaction("name", "type")
span := tx.StartSpan("name", "type", nil)
span.Duration = 1 * time.Millisecond
span.End()
tx.End()

tracer.Flush(nil)
payloads := tracer.Payloads()
assert.Len(t, payloads.Spans, 1)
return len(payloads.Spans[0].Stacktrace) > 0
})
run("stack_trace_limit", "1", func(tracer *apmtest.RecordingTracer) bool {
tracer.ResetPayloads()
tracer.NewError(errors.New("boom")).Send()
tracer.Flush(nil)
payloads := tracer.Payloads()
assert.Len(t, payloads.Errors, 1)
return len(payloads.Errors[0].Exception.Stacktrace) == 1
})
}

func testTracerCentralConfigUpdate(t *testing.T, serverResponse string, isRemote func(*apm.Tracer) bool) {
// This test server will respond initially with config that
// disables sampling, and subsequently responses will indicate
// lack of agent config, causing the agent to revert to local
// config.
responded := make(chan struct{})
var responses int
func testTracerCentralConfigUpdate(t *testing.T, serverResponse string, isRemote func(*apmtest.RecordingTracer) bool) {
type response struct {
etag string
body string
}
responses := make(chan response)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
assert.Equal(t, "/config/v1/agents", req.URL.Path)
w.Header().Set("Cache-Control", "max-age=1")
if responses == 0 {
w.Header().Set("Etag", `"foo"`)
w.Write([]byte(serverResponse))
} else {
w.Header().Set("Etag", `"bar"`)
w.Write([]byte(`{}`))
}
responses++
select {
case responded <- struct{}{}:
case response := <-responses:
w.Header().Set("Etag", strconv.Quote(response.etag)) // `"foo"`)
w.Write([]byte(response.body)) //[]byte(serverResponse))
case <-req.Context().Done():
}
}))
Expand All @@ -87,9 +108,17 @@ func testTracerCentralConfigUpdate(t *testing.T, serverResponse string, isRemote
require.NoError(t, err)
httpTransport.SetServerURL(serverURL)

tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: httpTransport})
tracer := &apmtest.RecordingTracer{}
var testTransport struct {
apmconfig.Watcher
*transporttest.RecorderTransport
}
testTransport.Watcher = httpTransport
testTransport.RecorderTransport = &tracer.RecorderTransport

tracer.Tracer, err = apm.NewTracerOptions(apm.TracerOptions{Transport: &testTransport})
require.NoError(t, err)
defer tracer.Close()
defer tracer.Tracer.Close()

// This test can be run in parallel with others after creating the tracer,
// but not before, because we depend on NewTracerOptions picking up default
Expand All @@ -100,40 +129,26 @@ func testTracerCentralConfigUpdate(t *testing.T, serverResponse string, isRemote
assert.False(t, isRemote(tracer))

timeout := time.After(10 * time.Second)
for {
// There's a time window between the server responding
// and the agent updating the config, so we spin until
// it's updated.
remote := isRemote(tracer)
if !remote {
break
}

// We each response payload twice, which causes us to block until
// the first one is fully consumed.
for i := 0; i < 2; i++ {
select {
case <-time.After(10 * time.Millisecond):
case responses <- response{etag: "foo", body: serverResponse}:
case <-timeout:
t.Fatal("timed out waiting for config update")
}
}
// We wait for 2 responses so that we know we've unblocked the
// 2nd response, and that the 2nd response has been fully consumed.
assert.True(t, isRemote(tracer))

for i := 0; i < 2; i++ {
select {
case <-responded:
case responses <- response{etag: "bar", body: "{}"}:
case <-timeout:
t.Fatal("timed out waiting for config update")
}
}
for {
remote := isRemote(tracer)
if !remote {
break
}
select {
case <-time.After(10 * time.Millisecond):
case <-timeout:
t.Fatal("timed out waiting for config to revert")
}
}
assert.False(t, isRemote(tracer))
}

func TestTracerCentralConfigUpdateDisabled(t *testing.T) {
Expand Down Expand Up @@ -245,57 +260,3 @@ func TestTracerConfigWatcherPrecedence(t *testing.T) {
}
}
}

func TestTracerCentralConfigRecording(t *testing.T) {
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()

changes := make(chan apmconfig.Change)
watcherFunc := apmtest.WatchConfigFunc(func(ctx context.Context, params apmconfig.WatchParams) <-chan apmconfig.Change {
return changes
})
tracer.SetLogger(apmtest.NewTestLogger(t))
tracer.SetConfigWatcher(watcherFunc)
tracer.SetMetricsInterval(0) // disable periodic gathering

checkRecording := func(expected bool) {
defer tracer.ResetPayloads()
tracer.StartTransaction("name", "type").End()
tracer.Flush(nil)
if expected {
require.True(t, tracer.Recording())
tracer.SendMetrics(nil)
payloads := tracer.Payloads()
require.NotEmpty(t, payloads.Metrics)
require.NotEmpty(t, payloads.Transactions)
} else {
require.False(t, tracer.Recording())
// testTracerMetricsNotRecording enables periodic
// gathering, checks that no gathering takes place
// (because we're expected not to be recording),
// and then disable periodic gathering again.
testTracerMetricsNotRecording(t, tracer)
payloads := tracer.Payloads()
require.Empty(t, payloads.Transactions)
}
}
updateRemoteConfig := func(attrs map[string]string) {
// We send twice as a means of waiting for the first change to be applied.
for i := 0; i < 2; i++ {
changes <- apmconfig.Change{Attrs: attrs}
}
}

// Initially local config is in effect.
checkRecording(true)

updateRemoteConfig(map[string]string{"recording": "false"})
checkRecording(false)

updateRemoteConfig(map[string]string{"recording": "true"})
tracer.SetRecording(false) // not effective, remote config in effect
checkRecording(true)

updateRemoteConfig(map[string]string{})
checkRecording(false) // local config in effect now
}

0 comments on commit f25d46b

Please sign in to comment.