From be4c1aafdc87298ef31e599282185f798b4453cd Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 20 Apr 2021 15:36:26 -0400 Subject: [PATCH] tracing: reduce memory pressure throughout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit attempts to reduce the memory overhead of tracing by doing a few things, guided mostly by BenchmarkTracing and kv95 (see results below). In decreasing order of impact: - We no longer materialize recordings when the trace is "empty" or non-verbose. When traces straddle RPC boundaries, we serialize the recording and to send it over the wire. For "empty" traces (ones with no structured recordings or log messages), this is needlessly allocation-heavy. We end up shipping the trace skeleton (parent-child span relationships, and the operation names + metadata that identify each span). Not only does this allocate within pkg/util/tracing, it also incurs allocations within gRPC where all this serialization is taking place. This commit takes the (opinionated) stance that we can avoid materializing these recordings altogether. We do this by having each span hold onto a reference to the rootspan, and updating an atomic value on the rootspan whenever anything is recorded. When materializing the recording, from any span in the tree, we can consult the root span to see if the trace was non-empty, and allocate if so. - We pre-allocate a small list of child pointer slots for spans to refer to, instead of allocating every time a child span is created. - Span tags aren't rendered unless the span is verbose, but we were still collecting them previously in order to render them if verbosity was toggled. Toggling an active span's verbosity rarely every happens, so now we don't allocate for tags unless the span is already verbose. This also lets us avoid the WithTags option, which allocates even if the span will end up dropping tags. - We were previously allocating SpanMeta, despite the objects being extremely shortlived in practice (they existed primarily to pass on the parent span's metadata to the newly created child spans). We now pass SpanMetas by value. - We can make use of explicit carriers when extracting SpanMeta from remote spans, instead of accessing data through the Carrier interface. This helps reduce SpanMeta allocations, at the cost of duplicating some code. - metadata.MD, used to carry span metadata across gRPC, is also relatively short-lived (existing only for the duration of the RPC). Its API is also relatively allocation-heavy (improved with https://github.com/grpc/grpc-go/pull/4360), allocating for every key being written. Tracing has a very specific usage pattern (adding to the metadata.MD only the trace and span ID), so we can pool our allocations here. - We can slightly improve lock contention around the tracing registry by locking only when we're dealing with rootspans. We can also avoid computing the span duration outside the critical area. --- Before this PR, comparing traced scans vs. not: name old time/op new time/op delta Tracing/Cockroach/Scan1-24 403µs ± 3% 415µs ± 1% +2.82% (p=0.000 n=10+9) Tracing/MultinodeCockroach/Scan1-24 878µs ± 1% 975µs ± 6% +11.07% (p=0.000 n=10+10) name old alloc/op new alloc/op delta Tracing/Cockroach/Scan1-24 23.2kB ± 2% 29.8kB ± 2% +28.64% (p=0.000 n=10+10) Tracing/MultinodeCockroach/Scan1-24 76.5kB ± 5% 95.1kB ± 1% +24.31% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Tracing/Cockroach/Scan1-24 235 ± 2% 258 ± 1% +9.50% (p=0.000 n=10+9) Tracing/MultinodeCockroach/Scan1-24 760 ± 1% 891 ± 1% +17.20% (p=0.000 n=9+10) After this PR: name old time/op new time/op delta Tracing/Cockroach/Scan1-24 437µs ± 4% 443µs ± 3% ~ (p=0.315 n=10+10) Tracing/MultinodeCockroach/Scan1-24 925µs ± 2% 968µs ± 1% +4.62% (p=0.000 n=10+10) name old alloc/op new alloc/op delta Tracing/Cockroach/Scan1-24 23.3kB ± 3% 26.3kB ± 2% +13.24% (p=0.000 n=10+10) Tracing/MultinodeCockroach/Scan1-24 77.0kB ± 4% 81.7kB ± 3% +6.08% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Tracing/Cockroach/Scan1-24 236 ± 1% 241 ± 1% +2.45% (p=0.000 n=9+9) Tracing/MultinodeCockroach/Scan1-24 758 ± 1% 793 ± 2% +4.67% (p=0.000 n=10+10) --- kv95/enc=false/nodes=1/cpu=32 across a few runs also shows a modest improvement before and after this PR, using a sampling rate of 10%: 36929.02 v. 37415.52 (reads/s) +1.30% 1944.38 v. 1968.94 (writes/s) +1.24% Release note: None --- pkg/rpc/context.go | 2 +- pkg/sql/sem/builtins/builtins.go | 32 ++---- pkg/util/tracing/alloc_test.go | 8 -- pkg/util/tracing/crdbspan.go | 129 ++++++++++++++++++---- pkg/util/tracing/grpc_interceptor.go | 91 +++++++++++---- pkg/util/tracing/grpc_interceptor_test.go | 26 ++++- pkg/util/tracing/recording.go | 2 +- pkg/util/tracing/span.go | 7 +- pkg/util/tracing/span_inner.go | 23 ++-- pkg/util/tracing/span_options.go | 48 +++----- pkg/util/tracing/span_test.go | 43 +++++--- pkg/util/tracing/tracer.go | 82 +++++++++----- pkg/util/tracing/tracer_test.go | 13 ++- 13 files changed, 339 insertions(+), 167 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 48aae0f07897..41b4b3676dab 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -728,7 +728,7 @@ func (ctx *Context) grpcDialOptions( // is in setupSpanForIncomingRPC(). // tagger := func(span *tracing.Span) { - span.SetTag("node", ctx.NodeID.String()) + span.SetTag("node", ctx.NodeID.Get().String()) } unaryInterceptors = append(unaryInterceptors, tracing.ClientInterceptor(tracer, tagger)) diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index ae8d7e17d4ec..01872f029c68 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -59,7 +59,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/duration" @@ -3732,30 +3731,17 @@ may increase either contention or retry errors, or both.`, traceID := uint64(*(args[0].(*tree.DInt))) verbosity := bool(*(args[1].(*tree.DBool))) - const query = `SELECT span_id - FROM crdb_internal.node_inflight_trace_spans - WHERE trace_id = $1 - AND parent_span_id = 0` - - ie := ctx.InternalExecutor.(sqlutil.InternalExecutor) - row, err := ie.QueryRowEx( - ctx.Ctx(), - "crdb_internal.set_trace_verbose", - ctx.Txn, - sessiondata.NoSessionDataOverride, - query, - traceID, - ) - if err != nil { + var rootSpan *tracing.Span + if err := ctx.Settings.Tracer.VisitSpans(func(span *tracing.Span) error { + if span.TraceID() == traceID && rootSpan == nil { + rootSpan = span + } + + return nil + }); err != nil { return nil, err } - if row == nil { - return tree.DBoolFalse, nil - } - rootSpanID := uint64(*row[0].(*tree.DInt)) - - rootSpan, found := ctx.Settings.Tracer.GetActiveSpanFromID(rootSpanID) - if !found { + if rootSpan == nil { // not found return tree.DBoolFalse, nil } diff --git a/pkg/util/tracing/alloc_test.go b/pkg/util/tracing/alloc_test.go index 2556e982f0cd..9eeda06d9314 100644 --- a/pkg/util/tracing/alloc_test.go +++ b/pkg/util/tracing/alloc_test.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/logtags" - "github.com/opentracing/opentracing-go" ) // BenchmarkTracer_StartSpanCtx primarily helps keep @@ -35,10 +34,6 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) { staticLogTags := logtags.Buffer{} staticLogTags.Add("foo", "bar") - staticTag := opentracing.Tag{ - Key: "statictag", - Value: "staticvalue", - } b.ResetTimer() parSp := tr.StartSpan("one-off", WithForceRealSpan()) @@ -55,9 +50,6 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) { {"real,logtag", []SpanOption{ WithForceRealSpan(), WithLogTags(&staticLogTags), }}, - {"real,tag", []SpanOption{ - WithForceRealSpan(), WithTags(staticTag), - }}, {"real,autoparent", []SpanOption{ WithForceRealSpan(), WithParentAndAutoCollection(parSp), }}, diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index c9771b050233..50400e7bdc02 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -29,6 +29,13 @@ import ( // crdbSpan is a span for internal crdb usage. This is used to power SQL session // tracing. type crdbSpan struct { + rootSpan *crdbSpan // root span of the containing trace; could be itself + // traceEmpty indicates whether or not the trace rooted at this span + // (provided it is a root span) contains any recordings or baggage. All + // spans hold a reference to the rootSpan; this field is accessed + // through that reference. + traceEmpty int32 // accessed atomically, through markTraceAsNonEmpty and inAnEmptyTrace + traceID uint64 // probabilistically unique spanID uint64 // probabilistically unique parentSpanID uint64 @@ -76,16 +83,16 @@ type crdbSpanMu struct { // children contains the list of child spans started after this Span // started recording. - children []*crdbSpan + children childSpanRefs // remoteSpan contains the list of remote child span recordings that // were manually imported. remoteSpans []tracingpb.RecordedSpan } - // tags are only set when recording. These are tags that have been added to - // this Span, and will be appended to the tags in logTags when someone - // needs to actually observe the total set of tags that is a part of this - // Span. + // tags are only captured when recording. These are tags that have been + // added to this Span, and will be appended to the tags in logTags when + // someone needs to actually observe the total set of tags that is a part of + // this Span. // TODO(radu): perhaps we want a recording to capture all the tags (even // those that were set before recording started)? tags opentracing.Tags @@ -94,6 +101,55 @@ type crdbSpanMu struct { baggage map[string]string } +type childSpanRefs struct { + preAllocated [4]*crdbSpan + overflow []*crdbSpan +} + +func (c *childSpanRefs) len() int { + l := 0 + for i := 0; i < len(c.preAllocated); i++ { + if c.preAllocated[i] == nil { + return l + } + l++ + } + + return len(c.overflow) + l +} + +func (c *childSpanRefs) add(ref *crdbSpan) { + for i := 0; i < len(c.preAllocated); i++ { + if c.preAllocated[i] == nil { + c.preAllocated[i] = ref + return + } + } + + // Only record the child if the parent still has room. + if len(c.overflow) < maxChildrenPerSpan-len(c.preAllocated) { + c.overflow = append(c.overflow, ref) + } +} + +func (c *childSpanRefs) get(idx int) *crdbSpan { + if idx < len(c.preAllocated) { + ref := c.preAllocated[idx] + if ref == nil { + panic(fmt.Sprintf("idx %d out of bounds", idx)) + } + return ref + } + return c.overflow[idx-len(c.preAllocated)] +} + +func (c *childSpanRefs) reset() { + for i := 0; i < len(c.preAllocated); i++ { + c.preAllocated[i] = nil + } + c.overflow = nil +} + func newSizeLimitedBuffer(limit int64) sizeLimitedBuffer { return sizeLimitedBuffer{ limit: limit, @@ -151,8 +207,7 @@ func (s *crdbSpan) resetRecording() { s.mu.recording.logs.Reset() s.mu.recording.structured.Reset() s.mu.recording.dropped = false - - s.mu.recording.children = nil + s.mu.recording.children.reset() s.mu.recording.remoteSpans = nil } @@ -178,8 +233,6 @@ func (s *crdbSpan) getRecording(everyoneIsV211 bool, wantTags bool) Recording { return nil // noop span } - s.mu.Lock() - if !everyoneIsV211 { // The cluster may contain nodes that are running v20.2. Unfortunately that // version can easily crash when a peer returns a recording that that node @@ -189,16 +242,27 @@ func (s *crdbSpan) getRecording(everyoneIsV211 bool, wantTags bool) Recording { // // TODO(tbg): remove this in the v21.2 cycle. if s.recordingType() == RecordingOff { - s.mu.Unlock() return nil } } - // The capacity here is approximate since we don't know how many grandchildren - // there are. - result := make(Recording, 0, 1+len(s.mu.recording.children)+len(s.mu.recording.remoteSpans)) + // Return early (without allocating) if the trace is empty, i.e. there are + // no recordings or baggage. If the trace is verbose, we'll still recurse in + // order to pick up all the operations that were part of the trace, despite + // nothing having any actual data in them. + if s.recordingType() != RecordingVerbose && s.inAnEmptyTrace() { + return nil + } + + s.mu.Lock() + // The capacity here is approximate since we don't know how many + // grandchildren there are. + result := make(Recording, 0, 1+s.mu.recording.children.len()+len(s.mu.recording.remoteSpans)) // Shallow-copy the children so we can process them without the lock. - children := s.mu.recording.children + var children []*crdbSpan + for i := 0; i < s.mu.recording.children.len(); i++ { + children = append(children, s.mu.recording.children.get(i)) + } result = append(result, s.getRecordingLocked(wantTags)) result = append(result, s.mu.recording.remoteSpans...) s.mu.Unlock() @@ -218,6 +282,11 @@ func (s *crdbSpan) getRecording(everyoneIsV211 bool, wantTags bool) Recording { } func (s *crdbSpan) importRemoteSpans(remoteSpans []tracingpb.RecordedSpan) { + if len(remoteSpans) == 0 { + return + } + + s.markTraceAsNonEmpty() // Change the root of the remote recording to be a child of this Span. This is // usually already the case, except with DistSQL traces where remote // processors run in spans that FollowFrom an RPC Span that we don't collect. @@ -229,6 +298,11 @@ func (s *crdbSpan) importRemoteSpans(remoteSpans []tracingpb.RecordedSpan) { } func (s *crdbSpan) setTagLocked(key string, value interface{}) { + if s.recordingType() != RecordingVerbose { + // Don't bother storing tags if we're unlikely to retrieve them. + return + } + if s.mu.tags == nil { s.mu.tags = make(opentracing.Tags) } @@ -266,10 +340,21 @@ type sizable interface { Size() int } +// inAnEmptyTrace indicates whether or not the containing trace is "empty" (i.e. +// has any recordings or baggage). +func (s *crdbSpan) inAnEmptyTrace() bool { + val := atomic.LoadInt32(&s.rootSpan.traceEmpty) + return val == 0 +} + +func (s *crdbSpan) markTraceAsNonEmpty() { + atomic.StoreInt32(&s.rootSpan.traceEmpty, 1) +} + func (s *crdbSpan) recordInternal(payload sizable, buffer *sizeLimitedBuffer) { + s.markTraceAsNonEmpty() s.mu.Lock() defer s.mu.Unlock() - size := int64(payload.Size()) if size > buffer.limit { // The incoming payload alone blows past the memory limit. Let's just @@ -291,6 +376,7 @@ func (s *crdbSpan) recordInternal(payload sizable, buffer *sizeLimitedBuffer) { } func (s *crdbSpan) setBaggageItemAndTag(restrictedKey, value string) { + s.markTraceAsNonEmpty() s.mu.Lock() defer s.mu.Unlock() s.setBaggageItemLocked(restrictedKey, value) @@ -405,11 +491,9 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan { func (s *crdbSpan) addChild(child *crdbSpan) { s.mu.Lock() - // Only record the child if the parent still has room. - if len(s.mu.recording.children) < maxChildrenPerSpan { - s.mu.recording.children = append(s.mu.recording.children, child) - } - s.mu.Unlock() + defer s.mu.Unlock() + + s.mu.recording.children.add(child) } // setVerboseRecursively sets the verbosity of the crdbSpan appropriately and @@ -422,7 +506,10 @@ func (s *crdbSpan) setVerboseRecursively(to bool) { } s.mu.Lock() - children := s.mu.recording.children + var children []*crdbSpan + for i := 0; i < s.mu.recording.children.len(); i++ { + children = append(children, s.mu.recording.children.get(i)) + } s.mu.Unlock() for _, child := range children { diff --git a/pkg/util/tracing/grpc_interceptor.go b/pkg/util/tracing/grpc_interceptor.go index 4dba82d78d10..260e337b3043 100644 --- a/pkg/util/tracing/grpc_interceptor.go +++ b/pkg/util/tracing/grpc_interceptor.go @@ -15,6 +15,7 @@ import ( "io" "runtime" "strings" + "sync" "sync/atomic" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" @@ -41,6 +42,14 @@ func (w metadataCarrier) Set(key, val string) { // blindly lowercase the key (which is guaranteed to work in the // Inject/Extract sense per the OpenTracing spec). key = strings.ToLower(key) + if (key == fieldNameTraceID || key == fieldNameSpanID) && len(w.MD[key]) == 1 && w.MD[key][0] == "0" { + // If the keys being set are tracing internal ones, and the metadata.MD + // we're using is one retrieved from grpcMetadataPool, overwrite it + // in-place to avoid allocating. + w.MD[key][0] = val + return + } + w.MD[key] = append(w.MD[key], val) } @@ -57,7 +66,7 @@ func (w metadataCarrier) ForEach(fn func(key, val string) error) error { return nil } -func extractSpanMeta(ctx context.Context, tracer *Tracer) (*SpanMeta, error) { +func extractSpanMeta(ctx context.Context, tracer *Tracer) (SpanMeta, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.New(nil) @@ -68,12 +77,12 @@ func extractSpanMeta(ctx context.Context, tracer *Tracer) (*SpanMeta, error) { // spanInclusionFuncForServer is used as a SpanInclusionFunc for the server-side // of RPCs, deciding for which operations the gRPC opentracing interceptor should // create a span. -func spanInclusionFuncForServer(t *Tracer, spanMeta *SpanMeta) bool { +func spanInclusionFuncForServer(t *Tracer, spanMeta SpanMeta) bool { // If there is an incoming trace on the RPC (spanMeta) or the tracer is // configured to always trace, return true. The second part is particularly // useful for calls coming through the HTTP->RPC gateway (i.e. the AdminUI), // where client is never tracing. - return spanMeta != nil || t.AlwaysTrace() + return !spanMeta.Empty() || t.AlwaysTrace() } // setSpanTags sets one or more tags on the given span according to the @@ -129,9 +138,10 @@ func ServerInterceptor(tracer *Tracer) grpc.UnaryServerInterceptor { ctx, serverSpan := tracer.StartSpanCtx( ctx, info.FullMethod, - WithTags(gRPCComponentTag, ext.SpanKindRPCServer), WithParentAndManualCollection(spanMeta), ) + serverSpan.SetTag(gRPCComponentTag.Key, gRPCComponentTag.Value) + serverSpan.SetTag(ext.SpanKindRPCServer.Key, ext.SpanKindRPCServer.Value) defer serverSpan.Finish() resp, err = handler(ctx, req) @@ -172,9 +182,10 @@ func StreamServerInterceptor(tracer *Tracer) grpc.StreamServerInterceptor { ctx, serverSpan := tracer.StartSpanCtx( ss.Context(), info.FullMethod, - WithTags(gRPCComponentTag, ext.SpanKindRPCServer), WithParentAndManualCollection(spanMeta), ) + serverSpan.SetTag(gRPCComponentTag.Key, gRPCComponentTag.Value) + serverSpan.SetTag(ext.SpanKindRPCServer.Key, ext.SpanKindRPCServer.Value) defer serverSpan.Finish() ss = &tracingServerStream{ ServerStream: ss, @@ -211,14 +222,9 @@ func spanInclusionFuncForClient(parent *Span) bool { return parent != nil && !parent.i.isNoop() } -func injectSpanMeta(ctx context.Context, tracer *Tracer, clientSpan *Span) context.Context { - md, ok := metadata.FromOutgoingContext(ctx) - if !ok { - md = metadata.New(nil) - } else { - md = md.Copy() - } - +func injectSpanMetaInto( + ctx context.Context, md metadata.MD, tracer *Tracer, clientSpan *Span, +) context.Context { if err := tracer.InjectMetaInto(clientSpan.Meta(), metadataCarrier{md}); err != nil { // We have no better place to record an error than the Span itself. clientSpan.Recordf("error: %s", err) @@ -226,6 +232,27 @@ func injectSpanMeta(ctx context.Context, tracer *Tracer, clientSpan *Span) conte return metadata.NewOutgoingContext(ctx, md) } +var grpcMetadataPool = sync.Pool{ + New: func() interface{} { + // We pre-allocate metadata.MD with the two fields we'll always set in + // our interceptors. + m := map[string]string{fieldNameSpanID: "0", fieldNameTraceID: "0"} + return metadata.New(m) + }, +} + +func getPooledMetadata() (md metadata.MD, putBack func(md metadata.MD)) { + md = grpcMetadataPool.Get().(metadata.MD) + putBack = func(md metadata.MD) { + if md.Len() == 2 && len(md[fieldNameTraceID]) == 1 && len(md[fieldNameSpanID]) == 1 { + md[fieldNameTraceID][0] = "0" + md[fieldNameSpanID][0] = "0" + grpcMetadataPool.Put(md) + } + } + return md, putBack +} + // ClientInterceptor returns a grpc.UnaryClientInterceptor suitable // for use in a grpc.Dial call. // @@ -259,12 +286,25 @@ func ClientInterceptor(tracer *Tracer, init func(*Span)) grpc.UnaryClientInterce clientSpan := tracer.StartSpan( method, WithParentAndAutoCollection(parent), - WithTags(gRPCComponentTag, ext.SpanKindRPCClient), ) + clientSpan.SetTag(gRPCComponentTag.Key, gRPCComponentTag.Value) + clientSpan.SetTag(ext.SpanKindRPCClient.Key, ext.SpanKindRPCClient.Value) init(clientSpan) defer clientSpan.Finish() - ctx = injectSpanMeta(ctx, tracer, clientSpan) - err := invoker(ctx, method, req, resp, cc, opts...) + + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + var putBack func(metadata.MD) + md, putBack = getPooledMetadata() + defer putBack(md) + } else { + md = md.Copy() + } + // invokeCtx (wraps around the incoming ctx) holds onto a reference to + // the (likely) pooled metadata.MD. Seeing as how invokeCtx's scope + // extends as far as our use of the metadata, it's safe to pool here. + invokeCtx := injectSpanMetaInto(ctx, md, tracer, clientSpan) + err := invoker(invokeCtx, method, req, resp, cc, opts...) if err != nil { setSpanTags(clientSpan, err, true) clientSpan.Recordf("error: %s", err) @@ -308,11 +348,24 @@ func StreamClientInterceptor(tracer *Tracer, init func(*Span)) grpc.StreamClient clientSpan := tracer.StartSpan( method, WithParentAndAutoCollection(parent), - WithTags(gRPCComponentTag, ext.SpanKindRPCClient), ) + clientSpan.SetTag(gRPCComponentTag.Key, gRPCComponentTag.Value) + clientSpan.SetTag(ext.SpanKindRPCClient.Key, ext.SpanKindRPCClient.Value) init(clientSpan) - ctx = injectSpanMeta(ctx, tracer, clientSpan) - cs, err := streamer(ctx, desc, cc, method, opts...) + + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + // We pre-allocate metadata.MD with the two fields we'll always set + // in our interceptors. We can't use pooled metadata.MD objects here + // because we have no way of guaranteeing that the client stream + // won't read the metadata we're putting into streamerCtx below. + m := map[string]string{fieldNameSpanID: "0", fieldNameTraceID: "0"} + md = metadata.New(m) + } else { + md = md.Copy() + } + streamerCtx := injectSpanMetaInto(ctx, md, tracer, clientSpan) + cs, err := streamer(streamerCtx, desc, cc, method, opts...) if err != nil { clientSpan.Recordf("error: %s", err) setSpanTags(clientSpan, err, true) diff --git a/pkg/util/tracing/grpc_interceptor_test.go b/pkg/util/tracing/grpc_interceptor_test.go index 01a080953291..3b7be0ea0ba1 100644 --- a/pkg/util/tracing/grpc_interceptor_test.go +++ b/pkg/util/tracing/grpc_interceptor_test.go @@ -29,6 +29,23 @@ import ( "google.golang.org/grpc" ) +// testStructuredImpl is a testing implementation of Structured event. +type testStructuredImpl struct { + *types.StringValue +} + +var _ tracing.Structured = &testStructuredImpl{} + +func (t *testStructuredImpl) String() string { + return fmt.Sprintf("structured=%s", t.Value) +} + +func newTestStructured(s string) *testStructuredImpl { + return &testStructuredImpl{ + &types.StringValue{Value: s}, + } +} + // TestGRPCInterceptors verifies that the streaming and unary tracing // interceptors work as advertised. We expect to see a span on the client side // and a span on the server side. @@ -38,7 +55,7 @@ func TestGRPCInterceptors(t *testing.T) { const ( k = "test-baggage-key" v = "test-baggage-value" - magicValue = 150 + magicValue = "magic-value" ) checkForSpanAndReturnRecording := func(ctx context.Context) (*types.Any, error) { @@ -54,7 +71,7 @@ func TestGRPCInterceptors(t *testing.T) { return nil, errors.Newf("expected %v, got %v instead", v, actV) } - sp.RecordStructured(&types.Int32Value{Value: magicValue}) + sp.RecordStructured(newTestStructured(magicValue)) sp.SetVerbose(true) // want the tags recs := sp.GetRecording() sp.SetVerbose(false) @@ -187,6 +204,7 @@ func TestGRPCInterceptors(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { ctx, sp := tr.StartSpanCtx(context.Background(), tc.name, tracing.WithForceRealSpan()) + sp.SetVerbose(true) // to set the tags recAny, err := tc.do(ctx) require.NoError(t, err) var rec tracingpb.RecordedSpan @@ -195,7 +213,6 @@ func TestGRPCInterceptors(t *testing.T) { sp.ImportRemoteSpans([]tracingpb.RecordedSpan{rec}) sp.Finish() var n int - sp.SetVerbose(true) // to get the tags finalRecs := sp.GetRecording() sp.SetVerbose(false) for _, rec := range finalRecs { @@ -217,7 +234,8 @@ func TestGRPCInterceptors(t *testing.T) { span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s tags: component=gRPC span.kind=client test-baggage-key=test-baggage-value span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s - tags: component=gRPC span.kind=server test-baggage-key=test-baggage-value`, tc.name) + tags: component=gRPC span.kind=server test-baggage-key=test-baggage-value + event: structured=magic-value`, tc.name) require.NoError(t, tracing.TestingCheckRecordedSpans(finalRecs, exp)) }) } diff --git a/pkg/util/tracing/recording.go b/pkg/util/tracing/recording.go index eab64ea8f2d3..2192aaa15629 100644 --- a/pkg/util/tracing/recording.go +++ b/pkg/util/tracing/recording.go @@ -476,7 +476,7 @@ func TestingCheckRecordedSpans(rec Recording, expected string) error { Context: 4, } diffText, _ := difflib.GetUnifiedDiffString(diff) - return errors.Newf("unexpected diff:\n%s\n%s", diffText, rec.String()) + return errors.Newf("unexpected diff:\n%s\n\nrecording:\n%s", diffText, rec.String()) } return nil } diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index 596f0458e71b..6242dffde36f 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -113,7 +113,7 @@ func (sp *Span) ImportRemoteSpans(remoteSpans []tracingpb.RecordedSpan) { // boundaries in order to derive child spans from this Span. This may return // nil, which is a valid input to `WithParentAndManualCollection`, if the Span // has been optimized out. -func (sp *Span) Meta() *SpanMeta { +func (sp *Span) Meta() SpanMeta { // It shouldn't be done in practice, but it is allowed to call Meta on // a finished span. return sp.i.Meta() @@ -250,6 +250,11 @@ type SpanMeta struct { Baggage map[string]string } +// Empty returns whether or not the SpanMeta is a zero value. +func (sm SpanMeta) Empty() bool { + return sm.spanID == 0 && sm.traceID == 0 +} + func (sm *SpanMeta) String() string { return fmt.Sprintf("[spanID: %d, traceID: %d]", sm.spanID, sm.traceID) } diff --git a/pkg/util/tracing/span_inner.go b/pkg/util/tracing/span_inner.go index 765bacc58ca4..5c0ec78e7eb3 100644 --- a/pkg/util/tracing/span_inner.go +++ b/pkg/util/tracing/span_inner.go @@ -97,6 +97,10 @@ func (s *spanInner) Finish() { return } finishTime := timeutil.Now() + duration := finishTime.Sub(s.crdb.startTime) + if duration == 0 { + duration = time.Nanosecond + } s.crdb.mu.Lock() if alreadyFinished := s.crdb.mu.duration >= 0; alreadyFinished { @@ -106,10 +110,7 @@ func (s *spanInner) Finish() { // finished twice, but it may happen so let's be resilient to it. return } - s.crdb.mu.duration = finishTime.Sub(s.crdb.startTime) - if s.crdb.mu.duration == 0 { - s.crdb.mu.duration = time.Nanosecond - } + s.crdb.mu.duration = duration s.crdb.mu.Unlock() if s.ot.shadowSpan != nil { @@ -118,12 +119,14 @@ func (s *spanInner) Finish() { if s.netTr != nil { s.netTr.Finish() } - s.tracer.activeSpans.Lock() - delete(s.tracer.activeSpans.m, s.crdb.spanID) - s.tracer.activeSpans.Unlock() + if s.crdb.rootSpan.spanID == s.crdb.spanID { + s.tracer.activeSpans.Lock() + delete(s.tracer.activeSpans.m, s.crdb.spanID) + s.tracer.activeSpans.Unlock() + } } -func (s *spanInner) Meta() *SpanMeta { +func (s *spanInner) Meta() SpanMeta { var traceID uint64 var spanID uint64 var recordingType RecordingType @@ -157,9 +160,9 @@ func (s *spanInner) Meta() *SpanMeta { shadowCtx == nil && recordingType == 0 && baggage == nil { - return nil + return SpanMeta{} } - return &SpanMeta{ + return SpanMeta{ traceID: traceID, spanID: spanID, shadowTracerType: shadowTrTyp, diff --git a/pkg/util/tracing/span_options.go b/pkg/util/tracing/span_options.go index e6669fa3af4d..ed5574990f55 100644 --- a/pkg/util/tracing/span_options.go +++ b/pkg/util/tracing/span_options.go @@ -21,7 +21,7 @@ import ( // See the SpanOption interface for a synopsis. type spanOptions struct { Parent *Span // see WithParentAndAutoCollection - RemoteParent *SpanMeta // see WithParentAndManualCollection + RemoteParent SpanMeta // see WithParentAndManualCollection RefType opentracing.SpanReferenceType // see WithFollowsFrom LogTags *logtags.Buffer // see WithLogTags Tags map[string]interface{} // see WithTags @@ -31,7 +31,7 @@ type spanOptions struct { func (opts *spanOptions) parentTraceID() uint64 { if opts.Parent != nil && !opts.Parent.i.isNoop() { return opts.Parent.i.crdb.traceID - } else if opts.RemoteParent != nil { + } else if !opts.RemoteParent.Empty() { return opts.RemoteParent.traceID } return 0 @@ -40,17 +40,24 @@ func (opts *spanOptions) parentTraceID() uint64 { func (opts *spanOptions) parentSpanID() uint64 { if opts.Parent != nil && !opts.Parent.i.isNoop() { return opts.Parent.i.crdb.spanID - } else if opts.RemoteParent != nil { + } else if !opts.RemoteParent.Empty() { return opts.RemoteParent.spanID } return 0 } +func (opts *spanOptions) deriveRootSpan() *crdbSpan { + if opts.Parent != nil && !opts.Parent.i.isNoop() { + return opts.Parent.i.crdb.rootSpan + } + return nil +} + func (opts *spanOptions) recordingType() RecordingType { recordingType := RecordingOff if opts.Parent != nil && !opts.Parent.i.isNoop() { recordingType = opts.Parent.i.crdb.recordingType() - } else if opts.RemoteParent != nil { + } else if !opts.RemoteParent.Empty() { recordingType = opts.RemoteParent.recordingType } return recordingType @@ -59,7 +66,7 @@ func (opts *spanOptions) recordingType() RecordingType { func (opts *spanOptions) shadowTrTyp() (string, bool) { if opts.Parent != nil { return opts.Parent.i.ot.shadowTr.Type() - } else if opts.RemoteParent != nil { + } else if !opts.RemoteParent.Empty() { s := opts.RemoteParent.shadowTracerType return s, s != "" } @@ -70,7 +77,7 @@ func (opts *spanOptions) shadowContext() opentracing.SpanContext { if opts.Parent != nil && opts.Parent.i.ot.shadowSpan != nil { return opts.Parent.i.ot.shadowSpan.Context() } - if opts.RemoteParent != nil && opts.RemoteParent.shadowCtx != nil { + if !opts.RemoteParent.Empty() && opts.RemoteParent.shadowCtx != nil { return opts.RemoteParent.shadowCtx } return nil @@ -147,33 +154,12 @@ type parentAndManualCollectionOption SpanMeta // which corresponds to the expectation that the parent span will // wait for the child to Finish(). If this expectation does not hold, // WithFollowsFrom should be added to the StartSpan invocation. -func WithParentAndManualCollection(parent *SpanMeta) SpanOption { - return (*parentAndManualCollectionOption)(parent) -} - -func (p *parentAndManualCollectionOption) apply(opts spanOptions) spanOptions { - opts.RemoteParent = (*SpanMeta)(p) - return opts -} - -type tagsOption []opentracing.Tag - -// WithTags is an option to Tracer.StartSpan which populates the -// tags on the newly created Span. -func WithTags(tags ...opentracing.Tag) SpanOption { - return (tagsOption)(tags) +func WithParentAndManualCollection(parent SpanMeta) SpanOption { + return (parentAndManualCollectionOption)(parent) } -func (o tagsOption) apply(opts spanOptions) spanOptions { - if len(o) == 0 { - return opts - } - if opts.Tags == nil { - opts.Tags = map[string]interface{}{} - } - for _, tag := range o { - opts.Tags[tag.Key] = tag.Value - } +func (p parentAndManualCollectionOption) apply(opts spanOptions) spanOptions { + opts.RemoteParent = (SpanMeta)(p) return opts } diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index dfdd0019e3be..d6aef7abf27e 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/gogo/protobuf/types" - "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/require" "golang.org/x/net/trace" "google.golang.org/grpc/metadata" @@ -182,8 +181,8 @@ func TestRecordingInRecording(t *testing.T) { } func TestSpan_ImportRemoteSpans(t *testing.T) { - // Verify that GetRecording propagates the recording even when the - // receiving Span isn't verbose. + // Verify that GetRecording propagates the recording even when the receiving + // Span isn't verbose during import. tr := NewTracer() sp := tr.StartSpan("root", WithForceRealSpan()) ch := tr.StartSpan("child", WithParentAndManualCollection(sp.Meta())) @@ -393,8 +392,8 @@ func TestNonVerboseChildSpanRegisteredWithParent(t *testing.T) { defer sp.Finish() ch := tr.StartSpan("child", WithParentAndAutoCollection(sp)) defer ch.Finish() - require.Len(t, sp.i.crdb.mu.recording.children, 1) - require.Equal(t, ch.i.crdb, sp.i.crdb.mu.recording.children[0]) + require.Equal(t, 1, sp.i.crdb.mu.recording.children.len()) + require.Equal(t, ch.i.crdb, sp.i.crdb.mu.recording.children.get(0)) ch.RecordStructured(&types.Int32Value{Value: 5}) // Check that the child span (incl its payload) is in the recording. rec := sp.GetRecording() @@ -415,7 +414,7 @@ func TestSpanMaxChildren(t *testing.T) { if exp > maxChildrenPerSpan { exp = maxChildrenPerSpan } - require.Len(t, sp.i.crdb.mu.recording.children, exp) + require.Equal(t, exp, sp.i.crdb.mu.recording.children.len()) } } @@ -486,26 +485,36 @@ func (i *countingStringer) String() string { return fmt.Sprint(*i) } -func TestSpan_GetRecordingTags(t *testing.T) { - // Verify that tags are omitted from GetRecording if the span is - // not verbose when the recording is pulled. See GetRecording for - // details. +// TestSpanTagsInRecordings verifies that tags are dropped if the span is +// not verbose. +func TestSpanTagsInRecordings(t *testing.T) { tr := NewTracer() var counter countingStringer logTags := logtags.SingleTagBuffer("tagfoo", "tagbar") sp := tr.StartSpan("root", WithForceRealSpan(), - WithTags(opentracing.Tag{ - Key: "foo1", - Value: &counter, - }), WithLogTags(logTags), ) defer sp.Finish() require.False(t, sp.IsVerbose()) - sp.SetTag("foo2", &counter) + sp.SetTag("foo1", &counter) + sp.Record("dummy recording") rec := sp.GetRecording() - require.Empty(t, rec[0].Tags) - require.Zero(t, counter) + require.Len(t, rec, 0) + require.Zero(t, int(counter)) + + // Verify that we didn't hold onto anything underneath. + sp.SetVerbose(true) + rec = sp.GetRecording() + require.Len(t, rec, 1) + require.Len(t, rec[0].Tags, 3) // _unfinished:1 _verbose:1 tagfoo:tagbar + require.Zero(t, int(counter)) + + // Verify that subsequent tags are captured. + sp.SetTag("foo2", &counter) + rec = sp.GetRecording() + require.Len(t, rec, 1) + require.Len(t, rec[0].Tags, 4) + require.Equal(t, 1, int(counter)) } diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index 72b982968bc4..8f5da5c63734 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -284,7 +284,7 @@ func (t *Tracer) startSpanGeneric( } if opts.Parent != nil { - if opts.RemoteParent != nil { + if !opts.RemoteParent.Empty() { panic("can't specify both Parent and RemoteParent") } } @@ -396,6 +396,20 @@ func (t *Tracer) startSpanGeneric( netTr: netTr, } + // Copy over the parent span's root span reference, and if there isn't one + // (we're creating a new root span), set a reference to ourselves. + // + // TODO(irfansharif): Given we have a handle on the root span, we should + // reconsider the maxChildrenPerSpan limit, which only limits the branching + // factor. To bound the total memory usage for pkg/tracing, we could instead + // limit the number of spans per trace (no-oping all subsequent ones) and + // do the same for the total number of root spans. + if rootSpan := opts.deriveRootSpan(); rootSpan != nil { + helper.crdbSpan.rootSpan = rootSpan + } else { + helper.crdbSpan.rootSpan = &helper.crdbSpan + } + s := &helper.span { @@ -410,8 +424,8 @@ func (t *Tracer) startSpanGeneric( s.i.crdb.enableRecording(p, opts.recordingType()) } - // Set initial tags. These will propagate to the crdbSpan, ot, and netTr - // as appropriate. + // Set initial tags (has to happen after instantiating the recording type). + // These will propagate to the crdbSpan, ot, and netTr as appropriate. // // NB: this could be optimized. for k, v := range opts.Tags { @@ -426,10 +440,11 @@ func (t *Tracer) startSpanGeneric( if !opts.Parent.i.isNoop() { opts.Parent.i.crdb.mu.Lock() m := opts.Parent.i.crdb.mu.baggage + opts.Parent.i.crdb.mu.Unlock() + for k, v := range m { s.SetBaggageItem(k, v) } - opts.Parent.i.crdb.mu.Unlock() } } else { // Local root span - put it into the registry of active local root @@ -452,7 +467,7 @@ func (t *Tracer) startSpanGeneric( t.activeSpans.m[spanID] = s t.activeSpans.Unlock() - if opts.RemoteParent != nil { + if !opts.RemoteParent.Empty() { for k, v := range opts.RemoteParent.Baggage { s.SetBaggageItem(k, v) } @@ -536,8 +551,8 @@ func (fn textMapWriterFn) Set(key, val string) { // InjectMetaInto is used to serialize the given span metadata into the given // Carrier. This, alongside ExtractMetaFrom, can be used to carry span metadata // across process boundaries. See serializationFormat for more details. -func (t *Tracer) InjectMetaInto(sm *SpanMeta, carrier Carrier) error { - if sm == nil { +func (t *Tracer) InjectMetaInto(sm SpanMeta, carrier Carrier) error { + if sm.Empty() { // Fast path when tracing is disabled. ExtractMetaFrom will accept an // empty map as a noop context. return nil @@ -580,32 +595,20 @@ func (t *Tracer) InjectMetaInto(sm *SpanMeta, carrier Carrier) error { return nil } -var noopSpanMeta = (*SpanMeta)(nil) +// var noopSpanMeta = (*SpanMeta)(nil) +var noopSpanMeta = SpanMeta{} // ExtractMetaFrom is used to deserialize a span metadata (if any) from the // given Carrier. This, alongside InjectMetaFrom, can be used to carry span // metadata across process boundaries. See serializationFormat for more details. -func (t *Tracer) ExtractMetaFrom(carrier Carrier) (*SpanMeta, error) { - var format serializationFormat - switch carrier.(type) { - case MapCarrier: - format = mapFormat - case metadataCarrier: - format = metadataFormat - default: - return noopSpanMeta, errors.New("unsupported carrier") - } - +func (t *Tracer) ExtractMetaFrom(carrier Carrier) (SpanMeta, error) { var shadowType string var shadowCarrier opentracing.TextMapCarrier - var traceID uint64 var spanID uint64 var baggage map[string]string - // TODO(tbg): ForeachKey forces things on the heap. We can do better - // by using an explicit carrier. - err := carrier.ForEach(func(k, v string) error { + iterFn := func(k, v string) error { switch k = strings.ToLower(k); k { case fieldNameTraceID: var err error @@ -636,10 +639,23 @@ func (t *Tracer) ExtractMetaFrom(carrier Carrier) (*SpanMeta, error) { } } return nil - }) - if err != nil { - return noopSpanMeta, err } + + // Instead of iterating through the interface type, we prefer to do so with + // the explicit types to avoid heap allocations. + switch c := carrier.(type) { + case MapCarrier: + if err := c.ForEach(iterFn); err != nil { + return noopSpanMeta, err + } + case metadataCarrier: + if err := c.ForEach(iterFn); err != nil { + return noopSpanMeta, err + } + default: + return noopSpanMeta, errors.New("unsupported carrier") + } + if traceID == 0 && spanID == 0 { return noopSpanMeta, nil } @@ -660,10 +676,20 @@ func (t *Tracer) ExtractMetaFrom(carrier Carrier) (*SpanMeta, error) { // consideration. shadowType = "" } else { + var format serializationFormat + switch carrier.(type) { + case MapCarrier: + format = mapFormat + case metadataCarrier: + format = metadataFormat + default: + return noopSpanMeta, errors.New("unsupported carrier") + } // Shadow tracing is active on this node and the incoming information // was created using the same type of tracer. // // Extract the shadow context using the un-encapsulated textmap. + var err error shadowCtx, err = shadowTr.Extract(format, shadowCarrier) if err != nil { return noopSpanMeta, err @@ -671,7 +697,7 @@ func (t *Tracer) ExtractMetaFrom(carrier Carrier) (*SpanMeta, error) { } } - return &SpanMeta{ + return SpanMeta{ traceID: traceID, spanID: spanID, shadowTracerType: shadowType, @@ -681,7 +707,7 @@ func (t *Tracer) ExtractMetaFrom(carrier Carrier) (*SpanMeta, error) { }, nil } -// GetActiveSpanFromID retrieves any active span given its span ID. +// GetActiveSpanFromID retrieves any active root span given its ID. func (t *Tracer) GetActiveSpanFromID(spanID uint64) (*Span, bool) { t.activeSpans.Lock() span, found := t.activeSpans.m[spanID] diff --git a/pkg/util/tracing/tracer_test.go b/pkg/util/tracing/tracer_test.go index 30d55335a4ca..6578c986ea41 100644 --- a/pkg/util/tracing/tracer_test.go +++ b/pkg/util/tracing/tracer_test.go @@ -32,7 +32,7 @@ func TestStartSpanAlwaysTrace(t *testing.T) { tr._useNetTrace = 1 require.True(t, tr.AlwaysTrace()) nilMeta := tr.noopSpan.Meta() - require.Nil(t, nilMeta) + require.True(t, nilMeta.Empty()) sp := tr.StartSpan("foo", WithParentAndManualCollection(nilMeta)) require.False(t, sp.IsVerbose()) // parent was not verbose, so neither is sp require.False(t, sp.i.isNoop()) @@ -69,11 +69,18 @@ func TestTracerRecording(t *testing.T) { } // Initial recording of this fresh (real) span. + if err := TestingCheckRecordedSpans(s1.GetRecording(), ``); err != nil { + t.Fatal(err) + } + + s1.SetVerbose(true) if err := TestingCheckRecordedSpans(s1.GetRecording(), ` span: a + tags: _unfinished=1 _verbose=1 `); err != nil { t.Fatal(err) } + s1.SetVerbose(false) // Real parent --> real child. real3 := tr.StartSpan("noop3", WithParentAndManualCollection(s1.Meta())) @@ -237,8 +244,8 @@ func TestTracerInjectExtract(t *testing.T) { if err != nil { t.Fatal(err) } - if wireSpanMeta != nil { - t.Errorf("expected noop context: %v", wireSpanMeta) + if !wireSpanMeta.Empty() { + t.Errorf("expected no-op span meta: %v", wireSpanMeta) } noop2 := tr2.StartSpan("remote op", WithParentAndManualCollection(wireSpanMeta)) if !noop2.i.isNoop() {