Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record {transaction,sample}.sample_rate #804

Merged
merged 2 commits into from Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions config.go
Expand Up @@ -388,6 +388,7 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string]
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.sampler = sampler
cfg.extendedSampler, _ = sampler.(ExtendedSampler)
})
}
default:
Expand Down Expand Up @@ -479,6 +480,7 @@ type instrumentationConfigValues struct {
recording bool
captureBody CaptureBodyMode
captureHeaders bool
extendedSampler ExtendedSampler
maxSpans int
sampler Sampler
spanFramesMinDuration time.Duration
Expand Down
8 changes: 8 additions & 0 deletions model/marshal_fastjson.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions model/model.go
Expand Up @@ -205,6 +205,10 @@ type Transaction struct {
// it to true.
Sampled *bool `json:"sampled,omitempty"`

// SampleRate holds the sample rate in effect when the trace was started,
// if known. This is used by the server to aggregate transaction metrics.
SampleRate *float64 `json:"sample_rate,omitempty"`

// SpanCount holds statistics on spans within a transaction.
SpanCount SpanCount `json:"span_count"`
}
Expand Down Expand Up @@ -254,6 +258,10 @@ type Span struct {
// ParentID holds the ID of the span's parent (span or transaction).
ParentID SpanID `json:"parent_id,omitempty"`

// SampleRate holds the sample rate in effect when the trace was started,
// if known. This is used by the server to aggregate span metrics.
SampleRate *float64 `json:"sample_rate,omitempty"`

// Context holds contextual information relating to the span.
Context *SpanContext `json:"context,omitempty"`

Expand Down
6 changes: 6 additions & 0 deletions modelwriter.go
Expand Up @@ -109,6 +109,9 @@ func (w *modelWriter) buildModelTransaction(out *model.Transaction, tx *Transact
if !sampled {
out.Sampled = &notSampled
}
if tx.traceContext.State.haveSampleRate {
out.SampleRate = &tx.traceContext.State.sampleRate
}

out.ParentID = model.SpanID(td.parentSpan)
out.Name = truncateString(td.Name)
Expand Down Expand Up @@ -137,6 +140,9 @@ func (w *modelWriter) buildModelSpan(out *model.Span, span *Span, sd *SpanData)
out.ID = model.SpanID(span.traceContext.Span)
out.TraceID = model.TraceID(span.traceContext.Trace)
out.TransactionID = model.SpanID(span.transactionID)
if span.traceContext.State.haveSampleRate {
out.SampleRate = &span.traceContext.State.sampleRate
}

out.ParentID = model.SpanID(sd.parentID)
out.Name = truncateString(sd.Name)
Expand Down
2 changes: 1 addition & 1 deletion module/apmgrpc/client_test.go
Expand Up @@ -93,7 +93,7 @@ func testClientSpan(t *testing.T, traceparentHeaders ...string) {
}
assert.Equal(t, clientSpans[0].TraceID, serverTransactions[1].TraceID)
assert.Equal(t, clientSpans[0].ID, serverTransactions[1].ParentID)
assert.Equal(t, "server_span", serverSpans[0].Name) // no tracestate
assert.Equal(t, "es=s:1", serverSpans[0].Name) // automatically created tracestate
assert.Equal(t, "vendor=tracestate", serverSpans[1].Name)

traceparentValue := apmhttp.FormatTraceparentHeader(apm.TraceContext{
Expand Down
3 changes: 2 additions & 1 deletion module/apmot/harness_test.go
Expand Up @@ -115,5 +115,6 @@ func (harnessAPIProbe) SameSpanContext(span opentracing.Span, sc opentracing.Spa
if !ok {
return false
}
return ctx1.traceContext == ctx2.traceContext
return ctx1.traceContext.Trace == ctx2.traceContext.Trace &&
ctx1.traceContext.Span == ctx2.traceContext.Span
}
50 changes: 46 additions & 4 deletions sampler.go
Expand Up @@ -35,6 +35,37 @@ type Sampler interface {
Sample(TraceContext) bool
}

// ExtendedSampler may be implemented by Samplers, providing
// a method for sampling and returning an extended SampleResult.
//
// TODO(axw) in v2.0.0, replace the Sampler interface with this.
type ExtendedSampler interface {
// SampleExtended indicates whether or not a transaction
// should be sampled, and the sampling rate in effect at
// the time. This method will be invoked by calls to
// Tracer.StartTransaction for the root of a trace, so it
// must be goroutine-safe, and should avoid synchronization
// as far as possible.
SampleExtended(SampleParams) SampleResult
}

// SampleParams holds parameters for SampleExtended.
type SampleParams struct {
// TraceContext holds the newly-generated TraceContext
// for the root transaction which is being sampled.
TraceContext TraceContext
}

// SampleResult holds information about a sampling decision.
type SampleResult struct {
// Sampled holds the sampling decision.
Sampled bool

// SampleRate holds the sample rate in effect at the
// time of the sampling decision.
SampleRate float64
}

// NewRatioSampler returns a new Sampler with the given ratio
//
// A ratio of 1.0 samples 100% of transactions, a ratio of 0.5
Expand All @@ -51,16 +82,27 @@ func NewRatioSampler(r float64) Sampler {
x.SetUint64(math.MaxUint64)
x.Mul(&x, big.NewFloat(r))
ceil, _ := x.Uint64()
return ratioSampler{ceil}
return ratioSampler{r, ceil}
}

type ratioSampler struct {
ceil uint64
ratio float64
ceil uint64
}

// Sample samples the transaction according to the configured
// ratio and pseudo-random source.
func (s ratioSampler) Sample(c TraceContext) bool {
v := binary.BigEndian.Uint64(c.Span[:])
return v > 0 && v-1 < s.ceil
return s.SampleExtended(SampleParams{TraceContext: c}).Sampled
}

// SampleExtended samples the transaction according to the configured
// ratio and pseudo-random source.
func (s ratioSampler) SampleExtended(args SampleParams) SampleResult {
v := binary.BigEndian.Uint64(args.TraceContext.Span[:])
result := SampleResult{
Sampled: v > 0 && v-1 < s.ceil,
SampleRate: s.ratio,
}
return result
}
20 changes: 20 additions & 0 deletions sampler_test.go
Expand Up @@ -86,3 +86,23 @@ func TestRatioSamplerNever(t *testing.T) {
Span: apm.SpanID{255, 255, 255, 255, 255, 255, 255, 255},
}))
}

func TestRatioSamplerExtended(t *testing.T) {
s := apm.NewRatioSampler(0.5).(apm.ExtendedSampler)

result := s.SampleExtended(apm.SampleParams{
TraceContext: apm.TraceContext{Span: apm.SpanID{255, 0, 0, 0, 0, 0, 0, 0}},
})
assert.Equal(t, apm.SampleResult{
Sampled: false,
SampleRate: 0.5,
axw marked this conversation as resolved.
Show resolved Hide resolved
}, result)

result = s.SampleExtended(apm.SampleParams{
TraceContext: apm.TraceContext{Span: apm.SpanID{1, 0, 0, 0, 0, 0, 0, 0}},
})
assert.Equal(t, apm.SampleResult{
Sampled: true,
SampleRate: 0.5,
}, result)
}
22 changes: 22 additions & 0 deletions span_test.go
Expand Up @@ -146,3 +146,25 @@ func TestTracerStartSpanIDSpecified(t *testing.T) {
require.Len(t, spans, 1)
assert.Equal(t, model.SpanID(spanID), spans[0].ID)
}

func TestSpanSampleRate(t *testing.T) {
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()
tracer.SetSampler(apm.NewRatioSampler(0.5555))

tx := tracer.StartTransactionOptions("name", "type", apm.TransactionOptions{
// Use a known transaction ID for deterministic sampling.
TransactionID: apm.SpanID{1, 2, 3, 4, 5, 6, 7, 8},
})
s1 := tx.StartSpan("name", "type", nil)
s2 := tx.StartSpan("name", "type", s1)
s2.End()
s1.End()
tx.End()
tracer.Flush(nil)

payloads := tracer.Payloads()
assert.Equal(t, 0.556, *payloads.Transactions[0].SampleRate)
assert.Equal(t, 0.556, *payloads.Spans[0].SampleRate)
assert.Equal(t, 0.556, *payloads.Spans[1].SampleRate)
}
78 changes: 76 additions & 2 deletions tracecontext.go
Expand Up @@ -22,11 +22,17 @@ import (
"encoding/hex"
"fmt"
"regexp"
"strconv"
"strings"
"unicode"

"github.com/pkg/errors"
)

const (
elasticTracestateVendorKey = "es"
)

var (
errZeroTraceID = errors.New("zero trace-id is invalid")
errZeroSpanID = errors.New("zero span-id is invalid")
Expand Down Expand Up @@ -152,6 +158,13 @@ func (o TraceOptions) WithRecorded(recorded bool) TraceOptions {
// TraceState holds vendor-specific state for a trace.
type TraceState struct {
head *TraceStateEntry

// Fields related to parsing the Elastic ("es") tracestate entry.
//
// These must not be modified after NewTraceState returns.
parseElasticTracestateError error
haveSampleRate bool
sampleRate float64
}

// NewTraceState returns a TraceState based on entries.
Expand All @@ -167,9 +180,55 @@ func NewTraceState(entries ...TraceStateEntry) TraceState {
}
last = &e
}
for _, e := range entries {
if e.Key != elasticTracestateVendorKey {
continue
}
out.parseElasticTracestateError = out.parseElasticTracestate(e)
break
}
return out
}

// parseElasticTracestate parses an Elastic ("es") tracestate entry.
//
// Per https://github.com/elastic/apm/blob/master/specs/agents/tracing-distributed-tracing.md,
// the "es" tracestate value format is: "key:value;key:value...". Unknown keys are ignored.
func (s *TraceState) parseElasticTracestate(e TraceStateEntry) error {
if err := e.Validate(); err != nil {
return err
}
value := e.Value
for value != "" {
kv := value
end := strings.IndexRune(value, ';')
if end >= 0 {
kv = value[:end]
value = value[end+1:]
} else {
value = ""
}
sep := strings.IndexRune(kv, ':')
if sep == -1 {
return errors.New("malformed 'es' tracestate entry")
}
k, v := kv[:sep], kv[sep+1:]
switch k {
case "s":
sampleRate, err := strconv.ParseFloat(v, 64)
if err != nil {
return err
}
if sampleRate < 0 || sampleRate > 1 {
return fmt.Errorf("sample rate %q out of range", v)
}
s.sampleRate = sampleRate
s.haveSampleRate = true
}
}
return nil
}

// String returns s as a comma-separated list of key-value pairs.
func (s TraceState) String() string {
if s.head == nil {
Expand Down Expand Up @@ -199,8 +258,16 @@ func (s TraceState) Validate() error {
if i == 32 {
return errors.New("tracestate contains more than the maximum allowed number of entries, 32")
}
if err := e.Validate(); err != nil {
return errors.Wrapf(err, "invalid tracestate entry at position %d", i)
if e.Key == elasticTracestateVendorKey {
// s.parseElasticTracestateError holds a general e.Validate error if any
// occurred, or any other error specific to the Elastic tracestate format.
if err := s.parseElasticTracestateError; err != nil {
return errors.Wrapf(err, "invalid tracestate entry at position %d", i)
}
} else {
if err := e.Validate(); err != nil {
return errors.Wrapf(err, "invalid tracestate entry at position %d", i)
}
}
if prev, ok := recorded[e.Key]; ok {
return fmt.Errorf("duplicate tracestate key %q at positions %d and %d", e.Key, prev, i)
Expand Down Expand Up @@ -261,3 +328,10 @@ func (e *TraceStateEntry) validateValue() error {
}
return nil
}

func formatElasticTracestateValue(sampleRate float64) string {
// 0 -> "s:0"
// 1 -> "s:1"
// 0.5555 -> "s:0.555" (any rounding should be applied prior)
return fmt.Sprintf("s:%.3g", sampleRate)
}
11 changes: 11 additions & 0 deletions tracecontext_test.go
Expand Up @@ -105,3 +105,14 @@ func TestTraceStateInvalidValueCharacter(t *testing.T) {
`invalid tracestate entry at position 0: invalid value for key "oy": value contains invalid character '\x00'`)
}
}

func TestTraceStateInvalidElasticEntry(t *testing.T) {
ts := apm.NewTraceState(apm.TraceStateEntry{Key: "es", Value: "foo"})
assert.EqualError(t, ts.Validate(), `invalid tracestate entry at position 0: malformed 'es' tracestate entry`)

ts = apm.NewTraceState(apm.TraceStateEntry{Key: "es", Value: "s:foo"})
assert.EqualError(t, ts.Validate(), `invalid tracestate entry at position 0: strconv.ParseFloat: parsing "foo": invalid syntax`)

ts = apm.NewTraceState(apm.TraceStateEntry{Key: "es", Value: "s:1.5"})
assert.EqualError(t, ts.Validate(), `invalid tracestate entry at position 0: sample rate "1.5" out of range`)
}
2 changes: 2 additions & 0 deletions tracer.go
Expand Up @@ -412,6 +412,7 @@ func newTracer(opts TracerOptions) *Tracer {
})
t.setLocalInstrumentationConfig(envTransactionSampleRate, func(cfg *instrumentationConfigValues) {
cfg.sampler = opts.sampler
cfg.extendedSampler, _ = opts.sampler.(ExtendedSampler)
})
t.setLocalInstrumentationConfig(envSpanFramesMinDuration, func(cfg *instrumentationConfigValues) {
cfg.spanFramesMinDuration = opts.spanFramesMinDuration
Expand Down Expand Up @@ -664,6 +665,7 @@ func (t *Tracer) SetRecording(r bool) {
func (t *Tracer) SetSampler(s Sampler) {
t.setLocalInstrumentationConfig(envTransactionSampleRate, func(cfg *instrumentationConfigValues) {
cfg.sampler = s
cfg.extendedSampler, _ = s.(ExtendedSampler)
})
}

Expand Down
26 changes: 24 additions & 2 deletions transaction.go
Expand Up @@ -97,8 +97,30 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran
}

if root {
sampler := instrumentationConfig.sampler
if sampler == nil || sampler.Sample(tx.traceContext) {
var result SampleResult
if instrumentationConfig.extendedSampler != nil {
result = instrumentationConfig.extendedSampler.SampleExtended(SampleParams{
TraceContext: tx.traceContext,
})
if !result.Sampled {
// Special case: for unsampled transactions we
// report a sample rate of 0, so that we do not
// count them in aggregations in the server.
// This is necessary to avoid overcounting, as
// we will scale the sampled transactions.
result.SampleRate = 0
}
sampleRate := round(1000*result.SampleRate) / 1000
tx.traceContext.State = NewTraceState(TraceStateEntry{
Key: elasticTracestateVendorKey,
Value: formatElasticTracestateValue(sampleRate),
})
} else if instrumentationConfig.sampler != nil {
result.Sampled = instrumentationConfig.sampler.Sample(tx.traceContext)
} else {
result.Sampled = true
}
if result.Sampled {
o := tx.traceContext.Options.WithRecorded(true)
tx.traceContext.Options = o
}
Expand Down