Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

trace: add buffer limit #824

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 48 additions & 1 deletion trace/config.go
Expand Up @@ -16,25 +16,72 @@ package trace

import "go.opencensus.io/trace/internal"

// Default limits for the number of attributes, message events and links on each span
// in order to prevent unbounded memory increase for long-running spans.
// These defaults can be overriden with trace.ApplyConfig.
// These defaults can also be overriden per-span by using trace.StartOptions
// when creating a new span.
// TODO: Add an annnoation limit when priorities are implemented.
const (
DefaultMaxAttributes = 32
DefaultMaxMessageEvents = 128
DefaultMaxLinks = 32
)

// Config represents the global tracing configuration.
type Config struct {
// DefaultSampler is the default sampler used when creating new spans.
DefaultSampler Sampler

// IDGenerator is for internal use only.
IDGenerator internal.IDGenerator

// The below config options must be set with a GlobalOption.
// maxAttributes sets a global limit on the number of attributes.
maxAttributes int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure these really belong in trace.Config. The simpler thing to do would be to just have them as package-vars. The reason to have them in trace.Config would be to be able to change them at arbitrary times at runtime in a goroutine-safe manner. But I can't really think of a compelling reason you'd want to do that. @rakyll interested to know what do you think of this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with @rakyll offline. We think the best approach is to remove the GlobalOption type and instead rely on 0 to mean "don't change" (just like it does for the other fields of Config. If the user wants to not buffer anything (alway drop) they can always set the field to -1 explicitly.

// maxMessageEvents sets a global limit on the number of message events.
maxMessageEvents int
// maxLinks sets a global limit on the number of links.
maxLinks int
}

// GlobalOption apply changes to global tracing configuration.
type GlobalOption func(*Config)

// WithDefaultMaxAttributes sets the default limit on the number of attributes.
func WithDefaultMaxAttributes(limit int) GlobalOption {
return func(c *Config) {
c.maxAttributes = limit
}
}

// WithDefaultMaxMessageEvents sets the default limit on the number of message events.
func WithDefaultMaxMessageEvents(limit int) GlobalOption {
return func(c *Config) {
c.maxMessageEvents = limit
}
}

// WithDefaultMaxLinks sets the default limit on the number of links.
func WithDefaultMaxLinks(limit int) GlobalOption {
return func(c *Config) {
c.maxLinks = limit
}
}

// ApplyConfig applies changes to the global tracing configuration.
//
// Fields not provided in the given config are going to be preserved.
func ApplyConfig(cfg Config) {
func ApplyConfig(cfg Config, o ...GlobalOption) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where it gets tricky:

With the current trace.ApplyConfig implementation:

  1. cfg is a required parameter and is expected to be a partial update (preserving what was already set).
  2. We want limits set to <= 0 to mean "disabled".
  3. Go's zero value for an int is 0, so if someone changes one limit (or even just the DefaultSampler), the others would incorrectly become disabled.

To handle this, I added in a new variadic GlobalOption (which doesn't break the API), but I think the cfg arg should be deprecated in favor of all variadic options. I'm sure a lot of clients currently use this, though.

Another option: we could keep the cfg struct arg, add *int fields in for the limits, and add a trace.Limit function that takes an int and returns an *int, but I don't like that API as much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could keep the type int and make the fields public and just say that if you set 0 it has no effect. Since setting to -1 has exactly the same effect as setting to 0 I don't think we lose anything by doing this.

c := *config.Load().(*Config)
if cfg.DefaultSampler != nil {
c.DefaultSampler = cfg.DefaultSampler
}
if cfg.IDGenerator != nil {
c.IDGenerator = cfg.IDGenerator
}
for _, op := range o {
op(&c)
}
config.Store(&c)
}
9 changes: 9 additions & 0 deletions trace/export.go
Expand Up @@ -73,4 +73,13 @@ type SpanData struct {
Status
Links []Link
HasRemoteParent bool

// TODO: Record these drops with stats.Record / stats.Int64.

// DroppedAttributes contains the number of dropped attributes in this span.
DroppedAttributes int
// DroppedMessageEvents contains the number of dropped message events in this span.
DroppedMessageEvents int
// DroppedLinks contains the number of dropped links in this span.
DroppedLinks int
}
123 changes: 97 additions & 26 deletions trace/trace.go
Expand Up @@ -46,6 +46,12 @@ type Span struct {
endOnce sync.Once

executionTracerTaskEnd func() // ends the execution tracer span

// The maximum limits for internal span data.
// These are set with trace.ApplyConfig and can be overriden by trace.StartOptions.
maxAttributes int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to consider the existing uses of trace.StartOptions struct in oc{http,grpc}.{Client,Server}Handler.

One thing we could do is instead of making these StartOptions, why not just expose them as public fields and allow the users to change them if desired?

I'm also happy to defer the work to add per-Span overrides to a future PR if you'd prefer. I think having the global limits only would be a good start.

maxMessageEvents int
maxLinks int
}

// IsRecordingEvents returns true if events are being recorded for this span.
Expand Down Expand Up @@ -125,6 +131,16 @@ type StartOptions struct {
// SpanKind represents the kind of a span. If none is set,
// SpanKindUnspecified is used.
SpanKind int

// MaxAttributes sets a span limit on the number of attributes (overrides
// global trace config).
MaxAttributes int
// WithMaxMessageEvents sets a span limit on the number of message events
// (overrides global trace config).
MaxMessageEvents int
// WithMaxLinks sets a span limit on the number of links (overrides
// global trace config).
MaxLinks int
}

// StartOption apply changes to StartOptions.
Expand All @@ -145,20 +161,37 @@ func WithSampler(sampler Sampler) StartOption {
}
}

// WithMaxAttributes sets a span limit on the number of attributes (overrides global trace config).
func WithMaxAttributes(max int) StartOption {
return func(o *StartOptions) {
o.MaxAttributes = max
}
}

// WithMaxMessageEvents sets a span limit on the number of message events (overrides global trace config).
func WithMaxMessageEvents(max int) StartOption {
return func(o *StartOptions) {
o.MaxMessageEvents = max
}
}

// WithMaxLinks sets a span limit on the number of links (overrides global trace config).
func WithMaxLinks(max int) StartOption {
return func(o *StartOptions) {
o.MaxLinks = max
}
}

// StartSpan starts a new child span of the current span in the context. If
// there is no span in the context, creates a new trace and span.
//
// Returned context contains the newly created span. You can use it to
// propagate the returned span in process.
func StartSpan(ctx context.Context, name string, o ...StartOption) (context.Context, *Span) {
var opts StartOptions
func StartSpan(ctx context.Context, name string, opts ...StartOption) (context.Context, *Span) {
var parent SpanContext
if p := FromContext(ctx); p != nil {
parent = p.spanContext
}
for _, op := range o {
op(&opts)
}
span := startSpanInternal(name, parent != SpanContext{}, parent, false, opts)

ctx, end := startExecutionTracerTask(ctx, name)
Expand All @@ -173,23 +206,34 @@ func StartSpan(ctx context.Context, name string, o ...StartOption) (context.Cont
//
// Returned context contains the newly created span. You can use it to
// propagate the returned span in process.
func StartSpanWithRemoteParent(ctx context.Context, name string, parent SpanContext, o ...StartOption) (context.Context, *Span) {
var opts StartOptions
for _, op := range o {
op(&opts)
}
func StartSpanWithRemoteParent(ctx context.Context, name string, parent SpanContext, opts ...StartOption) (context.Context, *Span) {
span := startSpanInternal(name, parent != SpanContext{}, parent, true, opts)
ctx, end := startExecutionTracerTask(ctx, name)
span.executionTracerTaskEnd = end
return NewContext(ctx, span), span
}

func startSpanInternal(name string, hasParent bool, parent SpanContext, remoteParent bool, o StartOptions) *Span {
span := &Span{}
span.spanContext = parent

func startSpanInternal(name string, hasParent bool, parent SpanContext, remoteParent bool, opts []StartOption) *Span {
cfg := config.Load().(*Config)

// Load the global config default limits, which can be modified with trace.ApplyConfig.
o := StartOptions{
MaxAttributes: cfg.maxAttributes,
MaxMessageEvents: cfg.maxMessageEvents,
MaxLinks: cfg.maxLinks,
}
// Now overlay any StartOption overrides (e.g. per-span limits).
for _, op := range opts {
op(&o)
}

span := &Span{
spanContext: parent,
maxAttributes: o.MaxAttributes,
maxLinks: o.MaxLinks,
maxMessageEvents: o.MaxMessageEvents,
}

if !hasParent {
span.spanContext.TraceID = cfg.IDGenerator.NewTraceID()
}
Expand Down Expand Up @@ -313,32 +357,44 @@ func (s *Span) SetStatus(status Status) {
//
// Existing attributes whose keys appear in the attributes parameter are overwritten.
func (s *Span) AddAttributes(attributes ...Attribute) {
if !s.IsRecordingEvents() {
if !s.IsRecordingEvents() || s.maxAttributes <= 0 {
return
}
s.mu.Lock()
if s.data.Attributes == nil {
s.data.Attributes = make(map[string]interface{})
}
copyAttributes(s.data.Attributes, attributes)
if drops := copyAttributes(s.data.Attributes, attributes, s.maxAttributes); drops > 0 {
s.data.DroppedAttributes += drops
}
s.mu.Unlock()
}

// copyAttributes copies a slice of Attributes into a map.
func copyAttributes(m map[string]interface{}, attributes []Attribute) {
// It returns how many were dropped (after hitting the given max limit).
func copyAttributes(m map[string]interface{}, attributes []Attribute, max int) int {
var drops int
for _, a := range attributes {
if len(m) >= max {
if _, ok := m[a.key]; !ok {
// If the attribute map hit max capacity, only allow existing key updates.
drops++
continue
}
}
m[a.key] = a.value
}
return drops
}

func (s *Span) lazyPrintfInternal(attributes []Attribute, format string, a ...interface{}) {
now := time.Now()
msg := fmt.Sprintf(format, a...)
var m map[string]interface{}
s.mu.Lock()
if len(attributes) != 0 {
if len(attributes) != 0 && s.maxAttributes > 0 {
m = make(map[string]interface{})
copyAttributes(m, attributes)
copyAttributes(m, attributes, s.maxAttributes)
}
s.data.Annotations = append(s.data.Annotations, Annotation{
Time: now,
Expand All @@ -352,9 +408,9 @@ func (s *Span) printStringInternal(attributes []Attribute, str string) {
now := time.Now()
var a map[string]interface{}
s.mu.Lock()
if len(attributes) != 0 {
if len(attributes) != 0 && s.maxAttributes > 0 {
a = make(map[string]interface{})
copyAttributes(a, attributes)
copyAttributes(a, attributes, s.maxAttributes)
}
s.data.Annotations = append(s.data.Annotations, Annotation{
Time: now,
Expand Down Expand Up @@ -388,11 +444,15 @@ func (s *Span) Annotatef(attributes []Attribute, format string, a ...interface{}
// event (this allows to identify a message between the sender and receiver).
// For example, this could be a sequence id.
func (s *Span) AddMessageSendEvent(messageID, uncompressedByteSize, compressedByteSize int64) {
if !s.IsRecordingEvents() {
if !s.IsRecordingEvents() || s.maxMessageEvents <= 0 {
return
}
now := time.Now()
s.mu.Lock()
if l := len(s.data.MessageEvents); l > 0 && l >= s.maxMessageEvents {
s.data.MessageEvents = s.data.MessageEvents[1:]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally once we hit the max number of attributes, we would not allocate any more when you add attributes. I believe as written, we will allocate even when we have reached the maximum.

How about copying the message events to the beginning of the slice:

copy(s.data.MessageEvents[0:l-1], s.data.MessageEvents[1:])
s.data.MessageEvents = s.data.MessageEvents[0:l-1]

Then the subsequent append should never allocate once we reach the max. Would be really awesome to run some benchmarks before and after this change. Not sure any of the existing ones would work since they don't add lots of attributes/events.

s.data.DroppedMessageEvents++
}
s.data.MessageEvents = append(s.data.MessageEvents, MessageEvent{
Time: now,
EventType: MessageEventTypeSent,
Expand All @@ -410,11 +470,15 @@ func (s *Span) AddMessageSendEvent(messageID, uncompressedByteSize, compressedBy
// event (this allows to identify a message between the sender and receiver).
// For example, this could be a sequence id.
func (s *Span) AddMessageReceiveEvent(messageID, uncompressedByteSize, compressedByteSize int64) {
if !s.IsRecordingEvents() {
if !s.IsRecordingEvents() || s.maxMessageEvents <= 0 {
return
}
now := time.Now()
s.mu.Lock()
if l := len(s.data.MessageEvents); l > 0 && l >= s.maxMessageEvents {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

s.data.MessageEvents = s.data.MessageEvents[1:]
s.data.DroppedMessageEvents++
}
s.data.MessageEvents = append(s.data.MessageEvents, MessageEvent{
Time: now,
EventType: MessageEventTypeRecv,
Expand All @@ -427,10 +491,14 @@ func (s *Span) AddMessageReceiveEvent(messageID, uncompressedByteSize, compresse

// AddLink adds a link to the span.
func (s *Span) AddLink(l Link) {
if !s.IsRecordingEvents() {
if !s.IsRecordingEvents() || s.maxLinks <= 0 {
return
}
s.mu.Lock()
if l := len(s.data.Links); l > 0 && l >= s.maxLinks {
s.data.Links = s.data.Links[1:]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

s.data.DroppedLinks++
}
s.data.Links = append(s.data.Links, l)
s.mu.Unlock()
}
Expand Down Expand Up @@ -463,8 +531,11 @@ func init() {
gen.spanIDInc |= 1

config.Store(&Config{
DefaultSampler: ProbabilitySampler(defaultSamplingProbability),
IDGenerator: gen,
DefaultSampler: ProbabilitySampler(defaultSamplingProbability),
IDGenerator: gen,
maxAttributes: DefaultMaxAttributes,
maxMessageEvents: DefaultMaxMessageEvents,
maxLinks: DefaultMaxLinks,
})
}

Expand Down