Skip to content

Commit

Permalink
tracing: reduce memory pressure throughout
Browse files Browse the repository at this point in the history
This commit attempts to reduce the memory overhead of tracing by doing a
few things, guided mostly by BenchmarkTracing and kv95 (see results
below). In decreasing order of impact:

- We no longer materialize recordings when the trace is "empty" or
  non-verbose. When traces straddle RPC boundaries, we serialize the
  recording and to send it over the wire. For "empty" traces (ones with
  no structured recordings or log messages), this is needlessly
  allocation-heavy. We end up shipping the trace skeleton (parent-child
  span relationships, and the operation names + metadata that identify
  each span). Not only does this allocate within pkg/util/tracing, it
  also incurs allocations within gRPC where all this serialization is
  taking place. This commit takes the (opinionated) stance that we can
  avoid materializing these recordings altogether.
  We do this by having each span hold onto a reference to the rootspan,
  and updating an atomic value on the rootspan whenever anything is
  recorded. When materializing the recording, from any span in the tree,
  we can consult the root span to see if the trace was non-empty, and
  allocate if so.

- We pre-allocate a small list of child pointer slots for spans to refer
  to, instead of allocating every time a child span is created.

- Span tags aren't rendered unless the span is verbose, but we were
  still collecting them previously in order to render them if verbosity
  was toggled. Toggling an active span's verbosity rarely every happens,
  so now we don't allocate for tags unless the span is already verbose.
  This also lets us avoid the WithTags option, which allocates even if
  the span will end up dropping tags.

- We were previously allocating SpanMeta, despite the objects being
  extremely shortlived in practice (they existed primarily to pass on
  the parent span's metadata to the newly created child spans). We now
  pass SpanMetas by value.

- We can make use of explicit carriers when extracting SpanMeta from
  remote spans, instead of accessing data through the Carrier interface.
  This helps reduce SpanMeta allocations, at the cost of duplicating some
  code.

- metadata.MD, used to carry span metadata across gRPC, is also
  relatively short-lived (existing only for the duration of the RPC).
  Its API is also relatively allocation-heavy (improved with
  grpc/grpc-go#4360), allocating for every key
  being written. Tracing has a very specific usage pattern (adding to
  the metadata.MD only the trace and span ID), so we can pool our
  allocations here.

- We can slightly improve lock contention around the tracing registry by
  locking only when we're dealing with rootspans. We can also avoid
  computing the span duration outside the critical area.

---

Before this PR, comparing traced scans vs. not:

    name                                 old time/op    new time/op    delta
    Tracing/Cockroach/Scan1-24              403µs ± 3%     415µs ± 1%   +2.82%  (p=0.000 n=10+9)
    Tracing/MultinodeCockroach/Scan1-24     878µs ± 1%     975µs ± 6%  +11.07%  (p=0.000 n=10+10)

    name                                 old alloc/op   new alloc/op   delta
    Tracing/Cockroach/Scan1-24             23.2kB ± 2%    29.8kB ± 2%  +28.64%  (p=0.000 n=10+10)
    Tracing/MultinodeCockroach/Scan1-24    76.5kB ± 5%    95.1kB ± 1%  +24.31%  (p=0.000 n=10+10)

    name                                 old allocs/op  new allocs/op  delta
    Tracing/Cockroach/Scan1-24                235 ± 2%       258 ± 1%   +9.50%  (p=0.000 n=10+9)
    Tracing/MultinodeCockroach/Scan1-24       760 ± 1%       891 ± 1%  +17.20%  (p=0.000 n=9+10)

After this PR:

    name                                 old time/op    new time/op    delta
    Tracing/Cockroach/Scan1-24              437µs ± 4%     443µs ± 3%     ~     (p=0.315 n=10+10)
    Tracing/MultinodeCockroach/Scan1-24     925µs ± 2%     968µs ± 1%   +4.62%  (p=0.000 n=10+10)

    name                                 old alloc/op   new alloc/op   delta
    Tracing/Cockroach/Scan1-24             23.3kB ± 3%    26.3kB ± 2%  +13.24%  (p=0.000 n=10+10)
    Tracing/MultinodeCockroach/Scan1-24    77.0kB ± 4%    81.7kB ± 3%   +6.08%  (p=0.000 n=10+10)

    name                                 old allocs/op  new allocs/op  delta
    Tracing/Cockroach/Scan1-24                236 ± 1%       241 ± 1%   +2.45%  (p=0.000 n=9+9)
    Tracing/MultinodeCockroach/Scan1-24       758 ± 1%       793 ± 2%   +4.67%  (p=0.000 n=10+10)

---

kv95/enc=false/nodes=1/cpu=32 across a few runs also shows a modest
improvement before and after this PR, using a sampling rate of 10%:

    36929.02 v. 37415.52  (reads/s)   +1.30%
     1944.38 v.  1968.94  (writes/s)  +1.24%

Release note: None
  • Loading branch information
irfansharif committed May 5, 2021
1 parent cc35207 commit be4c1aa
Show file tree
Hide file tree
Showing 13 changed files with 339 additions and 167 deletions.
2 changes: 1 addition & 1 deletion pkg/rpc/context.go
Expand Up @@ -728,7 +728,7 @@ func (ctx *Context) grpcDialOptions(
// is in setupSpanForIncomingRPC().
//
tagger := func(span *tracing.Span) {
span.SetTag("node", ctx.NodeID.String())
span.SetTag("node", ctx.NodeID.Get().String())
}
unaryInterceptors = append(unaryInterceptors,
tracing.ClientInterceptor(tracer, tagger))
Expand Down
32 changes: 9 additions & 23 deletions pkg/sql/sem/builtins/builtins.go
Expand Up @@ -59,7 +59,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/duration"
Expand Down Expand Up @@ -3732,30 +3731,17 @@ may increase either contention or retry errors, or both.`,
traceID := uint64(*(args[0].(*tree.DInt)))
verbosity := bool(*(args[1].(*tree.DBool)))

const query = `SELECT span_id
FROM crdb_internal.node_inflight_trace_spans
WHERE trace_id = $1
AND parent_span_id = 0`

ie := ctx.InternalExecutor.(sqlutil.InternalExecutor)
row, err := ie.QueryRowEx(
ctx.Ctx(),
"crdb_internal.set_trace_verbose",
ctx.Txn,
sessiondata.NoSessionDataOverride,
query,
traceID,
)
if err != nil {
var rootSpan *tracing.Span
if err := ctx.Settings.Tracer.VisitSpans(func(span *tracing.Span) error {
if span.TraceID() == traceID && rootSpan == nil {
rootSpan = span
}

return nil
}); err != nil {
return nil, err
}
if row == nil {
return tree.DBoolFalse, nil
}
rootSpanID := uint64(*row[0].(*tree.DInt))

rootSpan, found := ctx.Settings.Tracer.GetActiveSpanFromID(rootSpanID)
if !found {
if rootSpan == nil { // not found
return tree.DBoolFalse, nil
}

Expand Down
8 changes: 0 additions & 8 deletions pkg/util/tracing/alloc_test.go
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/logtags"
"github.com/opentracing/opentracing-go"
)

// BenchmarkTracer_StartSpanCtx primarily helps keep
Expand All @@ -35,10 +34,6 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) {
staticLogTags := logtags.Buffer{}
staticLogTags.Add("foo", "bar")

staticTag := opentracing.Tag{
Key: "statictag",
Value: "staticvalue",
}
b.ResetTimer()

parSp := tr.StartSpan("one-off", WithForceRealSpan())
Expand All @@ -55,9 +50,6 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) {
{"real,logtag", []SpanOption{
WithForceRealSpan(), WithLogTags(&staticLogTags),
}},
{"real,tag", []SpanOption{
WithForceRealSpan(), WithTags(staticTag),
}},
{"real,autoparent", []SpanOption{
WithForceRealSpan(), WithParentAndAutoCollection(parSp),
}},
Expand Down
129 changes: 108 additions & 21 deletions pkg/util/tracing/crdbspan.go
Expand Up @@ -29,6 +29,13 @@ import (
// crdbSpan is a span for internal crdb usage. This is used to power SQL session
// tracing.
type crdbSpan struct {
rootSpan *crdbSpan // root span of the containing trace; could be itself
// traceEmpty indicates whether or not the trace rooted at this span
// (provided it is a root span) contains any recordings or baggage. All
// spans hold a reference to the rootSpan; this field is accessed
// through that reference.
traceEmpty int32 // accessed atomically, through markTraceAsNonEmpty and inAnEmptyTrace

traceID uint64 // probabilistically unique
spanID uint64 // probabilistically unique
parentSpanID uint64
Expand Down Expand Up @@ -76,16 +83,16 @@ type crdbSpanMu struct {

// children contains the list of child spans started after this Span
// started recording.
children []*crdbSpan
children childSpanRefs
// remoteSpan contains the list of remote child span recordings that
// were manually imported.
remoteSpans []tracingpb.RecordedSpan
}

// tags are only set when recording. These are tags that have been added to
// this Span, and will be appended to the tags in logTags when someone
// needs to actually observe the total set of tags that is a part of this
// Span.
// tags are only captured when recording. These are tags that have been
// added to this Span, and will be appended to the tags in logTags when
// someone needs to actually observe the total set of tags that is a part of
// this Span.
// TODO(radu): perhaps we want a recording to capture all the tags (even
// those that were set before recording started)?
tags opentracing.Tags
Expand All @@ -94,6 +101,55 @@ type crdbSpanMu struct {
baggage map[string]string
}

type childSpanRefs struct {
preAllocated [4]*crdbSpan
overflow []*crdbSpan
}

func (c *childSpanRefs) len() int {
l := 0
for i := 0; i < len(c.preAllocated); i++ {
if c.preAllocated[i] == nil {
return l
}
l++
}

return len(c.overflow) + l
}

func (c *childSpanRefs) add(ref *crdbSpan) {
for i := 0; i < len(c.preAllocated); i++ {
if c.preAllocated[i] == nil {
c.preAllocated[i] = ref
return
}
}

// Only record the child if the parent still has room.
if len(c.overflow) < maxChildrenPerSpan-len(c.preAllocated) {
c.overflow = append(c.overflow, ref)
}
}

func (c *childSpanRefs) get(idx int) *crdbSpan {
if idx < len(c.preAllocated) {
ref := c.preAllocated[idx]
if ref == nil {
panic(fmt.Sprintf("idx %d out of bounds", idx))
}
return ref
}
return c.overflow[idx-len(c.preAllocated)]
}

func (c *childSpanRefs) reset() {
for i := 0; i < len(c.preAllocated); i++ {
c.preAllocated[i] = nil
}
c.overflow = nil
}

func newSizeLimitedBuffer(limit int64) sizeLimitedBuffer {
return sizeLimitedBuffer{
limit: limit,
Expand Down Expand Up @@ -151,8 +207,7 @@ func (s *crdbSpan) resetRecording() {
s.mu.recording.logs.Reset()
s.mu.recording.structured.Reset()
s.mu.recording.dropped = false

s.mu.recording.children = nil
s.mu.recording.children.reset()
s.mu.recording.remoteSpans = nil
}

Expand All @@ -178,8 +233,6 @@ func (s *crdbSpan) getRecording(everyoneIsV211 bool, wantTags bool) Recording {
return nil // noop span
}

s.mu.Lock()

if !everyoneIsV211 {
// The cluster may contain nodes that are running v20.2. Unfortunately that
// version can easily crash when a peer returns a recording that that node
Expand All @@ -189,16 +242,27 @@ func (s *crdbSpan) getRecording(everyoneIsV211 bool, wantTags bool) Recording {
//
// TODO(tbg): remove this in the v21.2 cycle.
if s.recordingType() == RecordingOff {
s.mu.Unlock()
return nil
}
}

// The capacity here is approximate since we don't know how many grandchildren
// there are.
result := make(Recording, 0, 1+len(s.mu.recording.children)+len(s.mu.recording.remoteSpans))
// Return early (without allocating) if the trace is empty, i.e. there are
// no recordings or baggage. If the trace is verbose, we'll still recurse in
// order to pick up all the operations that were part of the trace, despite
// nothing having any actual data in them.
if s.recordingType() != RecordingVerbose && s.inAnEmptyTrace() {
return nil
}

s.mu.Lock()
// The capacity here is approximate since we don't know how many
// grandchildren there are.
result := make(Recording, 0, 1+s.mu.recording.children.len()+len(s.mu.recording.remoteSpans))
// Shallow-copy the children so we can process them without the lock.
children := s.mu.recording.children
var children []*crdbSpan
for i := 0; i < s.mu.recording.children.len(); i++ {
children = append(children, s.mu.recording.children.get(i))
}
result = append(result, s.getRecordingLocked(wantTags))
result = append(result, s.mu.recording.remoteSpans...)
s.mu.Unlock()
Expand All @@ -218,6 +282,11 @@ func (s *crdbSpan) getRecording(everyoneIsV211 bool, wantTags bool) Recording {
}

func (s *crdbSpan) importRemoteSpans(remoteSpans []tracingpb.RecordedSpan) {
if len(remoteSpans) == 0 {
return
}

s.markTraceAsNonEmpty()
// Change the root of the remote recording to be a child of this Span. This is
// usually already the case, except with DistSQL traces where remote
// processors run in spans that FollowFrom an RPC Span that we don't collect.
Expand All @@ -229,6 +298,11 @@ func (s *crdbSpan) importRemoteSpans(remoteSpans []tracingpb.RecordedSpan) {
}

func (s *crdbSpan) setTagLocked(key string, value interface{}) {
if s.recordingType() != RecordingVerbose {
// Don't bother storing tags if we're unlikely to retrieve them.
return
}

if s.mu.tags == nil {
s.mu.tags = make(opentracing.Tags)
}
Expand Down Expand Up @@ -266,10 +340,21 @@ type sizable interface {
Size() int
}

// inAnEmptyTrace indicates whether or not the containing trace is "empty" (i.e.
// has any recordings or baggage).
func (s *crdbSpan) inAnEmptyTrace() bool {
val := atomic.LoadInt32(&s.rootSpan.traceEmpty)
return val == 0
}

func (s *crdbSpan) markTraceAsNonEmpty() {
atomic.StoreInt32(&s.rootSpan.traceEmpty, 1)
}

func (s *crdbSpan) recordInternal(payload sizable, buffer *sizeLimitedBuffer) {
s.markTraceAsNonEmpty()
s.mu.Lock()
defer s.mu.Unlock()

size := int64(payload.Size())
if size > buffer.limit {
// The incoming payload alone blows past the memory limit. Let's just
Expand All @@ -291,6 +376,7 @@ func (s *crdbSpan) recordInternal(payload sizable, buffer *sizeLimitedBuffer) {
}

func (s *crdbSpan) setBaggageItemAndTag(restrictedKey, value string) {
s.markTraceAsNonEmpty()
s.mu.Lock()
defer s.mu.Unlock()
s.setBaggageItemLocked(restrictedKey, value)
Expand Down Expand Up @@ -405,11 +491,9 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan {

func (s *crdbSpan) addChild(child *crdbSpan) {
s.mu.Lock()
// Only record the child if the parent still has room.
if len(s.mu.recording.children) < maxChildrenPerSpan {
s.mu.recording.children = append(s.mu.recording.children, child)
}
s.mu.Unlock()
defer s.mu.Unlock()

s.mu.recording.children.add(child)
}

// setVerboseRecursively sets the verbosity of the crdbSpan appropriately and
Expand All @@ -422,7 +506,10 @@ func (s *crdbSpan) setVerboseRecursively(to bool) {
}

s.mu.Lock()
children := s.mu.recording.children
var children []*crdbSpan
for i := 0; i < s.mu.recording.children.len(); i++ {
children = append(children, s.mu.recording.children.get(i))
}
s.mu.Unlock()

for _, child := range children {
Expand Down

0 comments on commit be4c1aa

Please sign in to comment.