Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing: reduce memory pressure throughout #64233

Merged
merged 1 commit into from May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/rpc/context.go
Expand Up @@ -728,7 +728,7 @@ func (ctx *Context) grpcDialOptions(
// is in setupSpanForIncomingRPC().
//
tagger := func(span *tracing.Span) {
span.SetTag("node", ctx.NodeID.String())
span.SetTag("node", ctx.NodeID.Get().String())
}
unaryInterceptors = append(unaryInterceptors,
tracing.ClientInterceptor(tracer, tagger))
Expand Down
32 changes: 9 additions & 23 deletions pkg/sql/sem/builtins/builtins.go
Expand Up @@ -59,7 +59,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/duration"
Expand Down Expand Up @@ -3732,30 +3731,17 @@ may increase either contention or retry errors, or both.`,
traceID := uint64(*(args[0].(*tree.DInt)))
verbosity := bool(*(args[1].(*tree.DBool)))

const query = `SELECT span_id
FROM crdb_internal.node_inflight_trace_spans
WHERE trace_id = $1
AND parent_span_id = 0`

ie := ctx.InternalExecutor.(sqlutil.InternalExecutor)
row, err := ie.QueryRowEx(
ctx.Ctx(),
"crdb_internal.set_trace_verbose",
ctx.Txn,
sessiondata.NoSessionDataOverride,
query,
traceID,
)
if err != nil {
var rootSpan *tracing.Span
if err := ctx.Settings.Tracer.VisitSpans(func(span *tracing.Span) error {
if span.TraceID() == traceID && rootSpan == nil {
rootSpan = span
}

return nil
}); err != nil {
return nil, err
}
if row == nil {
return tree.DBoolFalse, nil
}
rootSpanID := uint64(*row[0].(*tree.DInt))

rootSpan, found := ctx.Settings.Tracer.GetActiveSpanFromID(rootSpanID)
if !found {
if rootSpan == nil { // not found
return tree.DBoolFalse, nil
}

Expand Down
8 changes: 0 additions & 8 deletions pkg/util/tracing/alloc_test.go
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/logtags"
"github.com/opentracing/opentracing-go"
)

// BenchmarkTracer_StartSpanCtx primarily helps keep
Expand All @@ -35,10 +34,6 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) {
staticLogTags := logtags.Buffer{}
staticLogTags.Add("foo", "bar")

staticTag := opentracing.Tag{
Key: "statictag",
Value: "staticvalue",
}
b.ResetTimer()

parSp := tr.StartSpan("one-off", WithForceRealSpan())
Expand All @@ -55,9 +50,6 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) {
{"real,logtag", []SpanOption{
WithForceRealSpan(), WithLogTags(&staticLogTags),
}},
{"real,tag", []SpanOption{
WithForceRealSpan(), WithTags(staticTag),
}},
{"real,autoparent", []SpanOption{
WithForceRealSpan(), WithParentAndAutoCollection(parSp),
}},
Expand Down
123 changes: 102 additions & 21 deletions pkg/util/tracing/crdbspan.go
Expand Up @@ -29,6 +29,13 @@ import (
// crdbSpan is a span for internal crdb usage. This is used to power SQL session
// tracing.
type crdbSpan struct {
rootSpan *crdbSpan // root span of the containing trace; could be itself
// traceEmpty indicates whether or not the trace rooted at this span
// (provided it is a root span) contains any recordings or baggage. All
// spans hold a reference to the rootSpan; this field is accessed
// through that reference.
traceEmpty int32 // accessed atomically, through markTraceAsNonEmpty and inAnEmptyTrace

traceID uint64 // probabilistically unique
spanID uint64 // probabilistically unique
parentSpanID uint64
Expand Down Expand Up @@ -76,16 +83,16 @@ type crdbSpanMu struct {

// children contains the list of child spans started after this Span
// started recording.
children []*crdbSpan
children childSpanRefs
// remoteSpan contains the list of remote child span recordings that
// were manually imported.
remoteSpans []tracingpb.RecordedSpan
}

// tags are only set when recording. These are tags that have been added to
// this Span, and will be appended to the tags in logTags when someone
// needs to actually observe the total set of tags that is a part of this
// Span.
// tags are only captured when recording. These are tags that have been
// added to this Span, and will be appended to the tags in logTags when
// someone needs to actually observe the total set of tags that is a part of
// this Span.
// TODO(radu): perhaps we want a recording to capture all the tags (even
// those that were set before recording started)?
tags opentracing.Tags
Expand All @@ -94,6 +101,49 @@ type crdbSpanMu struct {
baggage map[string]string
}

type childSpanRefs struct {
refCount int
preAllocated [4]*crdbSpan
overflow []*crdbSpan
}

func (c *childSpanRefs) len() int {
return c.refCount
}

func (c *childSpanRefs) add(ref *crdbSpan) {
if c.refCount < len(c.preAllocated) {
c.preAllocated[c.refCount] = ref
c.refCount++
return
}

// Only record the child if the parent still has room.
if c.refCount < maxChildrenPerSpan {
c.overflow = append(c.overflow, ref)
c.refCount++
}
}

func (c *childSpanRefs) get(idx int) *crdbSpan {
if idx < len(c.preAllocated) {
ref := c.preAllocated[idx]
if ref == nil {
panic(fmt.Sprintf("idx %d out of bounds", idx))
}
return ref
}
return c.overflow[idx-len(c.preAllocated)]
}

func (c *childSpanRefs) reset() {
for i := 0; i < len(c.preAllocated); i++ {
c.preAllocated[i] = nil
}
c.overflow = nil
c.refCount = 0
}

func newSizeLimitedBuffer(limit int64) sizeLimitedBuffer {
return sizeLimitedBuffer{
limit: limit,
Expand Down Expand Up @@ -151,8 +201,7 @@ func (s *crdbSpan) resetRecording() {
s.mu.recording.logs.Reset()
s.mu.recording.structured.Reset()
s.mu.recording.dropped = false

s.mu.recording.children = nil
s.mu.recording.children.reset()
s.mu.recording.remoteSpans = nil
}

Expand All @@ -178,8 +227,6 @@ func (s *crdbSpan) getRecording(everyoneIsV211 bool, wantTags bool) Recording {
return nil // noop span
}

s.mu.Lock()

if !everyoneIsV211 {
// The cluster may contain nodes that are running v20.2. Unfortunately that
// version can easily crash when a peer returns a recording that that node
Expand All @@ -189,16 +236,27 @@ func (s *crdbSpan) getRecording(everyoneIsV211 bool, wantTags bool) Recording {
//
// TODO(tbg): remove this in the v21.2 cycle.
if s.recordingType() == RecordingOff {
s.mu.Unlock()
return nil
}
}

// The capacity here is approximate since we don't know how many grandchildren
// there are.
result := make(Recording, 0, 1+len(s.mu.recording.children)+len(s.mu.recording.remoteSpans))
// Return early (without allocating) if the trace is empty, i.e. there are
// no recordings or baggage. If the trace is verbose, we'll still recurse in
// order to pick up all the operations that were part of the trace, despite
// nothing having any actual data in them.
if s.recordingType() != RecordingVerbose && s.inAnEmptyTrace() {
return nil
}

s.mu.Lock()
// The capacity here is approximate since we don't know how many
// grandchildren there are.
result := make(Recording, 0, 1+s.mu.recording.children.len()+len(s.mu.recording.remoteSpans))
// Shallow-copy the children so we can process them without the lock.
children := s.mu.recording.children
var children []*crdbSpan
for i := 0; i < s.mu.recording.children.len(); i++ {
children = append(children, s.mu.recording.children.get(i))
}
result = append(result, s.getRecordingLocked(wantTags))
result = append(result, s.mu.recording.remoteSpans...)
s.mu.Unlock()
Expand All @@ -218,6 +276,11 @@ func (s *crdbSpan) getRecording(everyoneIsV211 bool, wantTags bool) Recording {
}

func (s *crdbSpan) importRemoteSpans(remoteSpans []tracingpb.RecordedSpan) {
if len(remoteSpans) == 0 {
return
}

s.markTraceAsNonEmpty()
// Change the root of the remote recording to be a child of this Span. This is
// usually already the case, except with DistSQL traces where remote
// processors run in spans that FollowFrom an RPC Span that we don't collect.
Expand All @@ -229,6 +292,11 @@ func (s *crdbSpan) importRemoteSpans(remoteSpans []tracingpb.RecordedSpan) {
}

func (s *crdbSpan) setTagLocked(key string, value interface{}) {
if s.recordingType() != RecordingVerbose {
// Don't bother storing tags if we're unlikely to retrieve them.
return
}

if s.mu.tags == nil {
s.mu.tags = make(opentracing.Tags)
}
Expand Down Expand Up @@ -266,10 +334,21 @@ type sizable interface {
Size() int
}

// inAnEmptyTrace indicates whether or not the containing trace is "empty" (i.e.
// has any recordings or baggage).
func (s *crdbSpan) inAnEmptyTrace() bool {
val := atomic.LoadInt32(&s.rootSpan.traceEmpty)
return val == 0
}

func (s *crdbSpan) markTraceAsNonEmpty() {
atomic.StoreInt32(&s.rootSpan.traceEmpty, 1)
}

func (s *crdbSpan) recordInternal(payload sizable, buffer *sizeLimitedBuffer) {
s.markTraceAsNonEmpty()
s.mu.Lock()
defer s.mu.Unlock()

size := int64(payload.Size())
if size > buffer.limit {
// The incoming payload alone blows past the memory limit. Let's just
Expand All @@ -291,6 +370,7 @@ func (s *crdbSpan) recordInternal(payload sizable, buffer *sizeLimitedBuffer) {
}

func (s *crdbSpan) setBaggageItemAndTag(restrictedKey, value string) {
s.markTraceAsNonEmpty()
s.mu.Lock()
defer s.mu.Unlock()
s.setBaggageItemLocked(restrictedKey, value)
Expand Down Expand Up @@ -405,11 +485,9 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan {

func (s *crdbSpan) addChild(child *crdbSpan) {
s.mu.Lock()
// Only record the child if the parent still has room.
if len(s.mu.recording.children) < maxChildrenPerSpan {
s.mu.recording.children = append(s.mu.recording.children, child)
}
s.mu.Unlock()
defer s.mu.Unlock()

s.mu.recording.children.add(child)
}

// setVerboseRecursively sets the verbosity of the crdbSpan appropriately and
Expand All @@ -422,7 +500,10 @@ func (s *crdbSpan) setVerboseRecursively(to bool) {
}

s.mu.Lock()
children := s.mu.recording.children
var children []*crdbSpan
for i := 0; i < s.mu.recording.children.len(); i++ {
children = append(children, s.mu.recording.children.get(i))
}
s.mu.Unlock()

for _, child := range children {
Expand Down
18 changes: 11 additions & 7 deletions pkg/util/tracing/grpc_interceptor.go
Expand Up @@ -57,7 +57,7 @@ func (w metadataCarrier) ForEach(fn func(key, val string) error) error {
return nil
}

func extractSpanMeta(ctx context.Context, tracer *Tracer) (*SpanMeta, error) {
func extractSpanMeta(ctx context.Context, tracer *Tracer) (SpanMeta, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(nil)
Expand All @@ -68,12 +68,12 @@ func extractSpanMeta(ctx context.Context, tracer *Tracer) (*SpanMeta, error) {
// spanInclusionFuncForServer is used as a SpanInclusionFunc for the server-side
// of RPCs, deciding for which operations the gRPC opentracing interceptor should
// create a span.
func spanInclusionFuncForServer(t *Tracer, spanMeta *SpanMeta) bool {
func spanInclusionFuncForServer(t *Tracer, spanMeta SpanMeta) bool {
// If there is an incoming trace on the RPC (spanMeta) or the tracer is
// configured to always trace, return true. The second part is particularly
// useful for calls coming through the HTTP->RPC gateway (i.e. the AdminUI),
// where client is never tracing.
return spanMeta != nil || t.AlwaysTrace()
return !spanMeta.Empty() || t.AlwaysTrace()
}

// setSpanTags sets one or more tags on the given span according to the
Expand Down Expand Up @@ -129,9 +129,10 @@ func ServerInterceptor(tracer *Tracer) grpc.UnaryServerInterceptor {
ctx, serverSpan := tracer.StartSpanCtx(
ctx,
info.FullMethod,
WithTags(gRPCComponentTag, ext.SpanKindRPCServer),
WithParentAndManualCollection(spanMeta),
)
serverSpan.SetTag(gRPCComponentTag.Key, gRPCComponentTag.Value)
serverSpan.SetTag(ext.SpanKindRPCServer.Key, ext.SpanKindRPCServer.Value)
defer serverSpan.Finish()

resp, err = handler(ctx, req)
Expand Down Expand Up @@ -172,9 +173,10 @@ func StreamServerInterceptor(tracer *Tracer) grpc.StreamServerInterceptor {
ctx, serverSpan := tracer.StartSpanCtx(
ss.Context(),
info.FullMethod,
WithTags(gRPCComponentTag, ext.SpanKindRPCServer),
WithParentAndManualCollection(spanMeta),
)
serverSpan.SetTag(gRPCComponentTag.Key, gRPCComponentTag.Value)
serverSpan.SetTag(ext.SpanKindRPCServer.Key, ext.SpanKindRPCServer.Value)
defer serverSpan.Finish()
ss = &tracingServerStream{
ServerStream: ss,
Expand Down Expand Up @@ -259,8 +261,9 @@ func ClientInterceptor(tracer *Tracer, init func(*Span)) grpc.UnaryClientInterce
clientSpan := tracer.StartSpan(
method,
WithParentAndAutoCollection(parent),
WithTags(gRPCComponentTag, ext.SpanKindRPCClient),
)
clientSpan.SetTag(gRPCComponentTag.Key, gRPCComponentTag.Value)
clientSpan.SetTag(ext.SpanKindRPCClient.Key, ext.SpanKindRPCClient.Value)
init(clientSpan)
defer clientSpan.Finish()
ctx = injectSpanMeta(ctx, tracer, clientSpan)
Expand Down Expand Up @@ -308,8 +311,9 @@ func StreamClientInterceptor(tracer *Tracer, init func(*Span)) grpc.StreamClient
clientSpan := tracer.StartSpan(
method,
WithParentAndAutoCollection(parent),
WithTags(gRPCComponentTag, ext.SpanKindRPCClient),
)
clientSpan.SetTag(gRPCComponentTag.Key, gRPCComponentTag.Value)
clientSpan.SetTag(ext.SpanKindRPCClient.Key, ext.SpanKindRPCClient.Value)
init(clientSpan)
ctx = injectSpanMeta(ctx, tracer, clientSpan)
cs, err := streamer(ctx, desc, cc, method, opts...)
Expand Down