Skip to content

Commit

Permalink
Merge #64233 #65378
Browse files Browse the repository at this point in the history
64233: tracing: reduce memory pressure throughout r=abarganier a=irfansharif

This commit attempts to reduce the memory overhead of tracing by doing a
few things, guided mostly by BenchmarkTracing and kv95 (see results
below). In decreasing order of impact:

- We no longer materialize recordings when the trace is "empty" or
  non-verbose. When traces straddle RPC boundaries, we serialize the
  recording and to send it over the wire. For "empty" traces (ones with
  no structured recordings or log messages), this is needlessly
  allocation-heavy. We end up shipping the trace skeleton (parent-child
  span relationships, and the operation names + metadata that identify
  each span). Not only does this allocate within pkg/util/tracing, it
  also incurs allocations within gRPC where all this serialization is
  taking place. This commit takes the (opinionated) stance that we can
  avoid materializing these recordings altogether.
  We do this by having each span hold onto a reference to the rootspan,
  and updating an atomic value on the rootspan whenever anything is
  recorded. When materializing the recording, from any span in the tree,
  we can consult the root span to see if the trace was non-empty, and
  allocate if so.

- We pre-allocate a small list of child pointer slots for spans to refer
  to, instead of allocating every time a child span is created.

- Span tags aren't rendered unless the span is verbose, but we were
  still collecting them previously in order to render them if verbosity
  was toggled. Toggling an active span's verbosity rarely every happens,
  so now we don't allocate for tags unless the span is already verbose.
  This also lets us avoid the WithTags option, which allocates even if
  the span will end up dropping tags.

- We were previously allocating SpanMeta, despite the objects being
  extremely shortlived in practice (they existed primarily to pass on
  the parent span's metadata to the newly created child spans). We now
  pass SpanMetas by value.

- We can make use of explicit carriers when extracting SpanMeta from
  remote spans, instead of accessing data through the Carrier interface.
  This helps reduce SpanMeta allocations, at the cost of duplicating some
  code.

- metadata.MD, used to carry span metadata across gRPC, is also
  relatively short-lived (existing only for the duration of the RPC).
  Its API is also relatively allocation-heavy (improved with
  grpc/grpc-go#4360), allocating for every key
  being written. Tracing has a very specific usage pattern (adding to
  the metadata.MD only the trace and span ID), so we can pool our
  allocations here.

- We can slightly improve lock contention around the tracing registry by
  locking only when we're dealing with rootspans. We can also avoid
  computing the span duration outside the critical area.

---

Before this PR, comparing traced scans vs. not:

    name                                 old time/op    new time/op    delta
    Tracing/Cockroach/Scan1-24              403µs ± 3%     415µs ± 1%   +2.82%  (p=0.000 n=10+9)
    Tracing/MultinodeCockroach/Scan1-24     878µs ± 1%     975µs ± 6%  +11.07%  (p=0.000 n=10+10)

    name                                 old alloc/op   new alloc/op   delta
    Tracing/Cockroach/Scan1-24             23.2kB ± 2%    29.8kB ± 2%  +28.64%  (p=0.000 n=10+10)
    Tracing/MultinodeCockroach/Scan1-24    76.5kB ± 5%    95.1kB ± 1%  +24.31%  (p=0.000 n=10+10)

    name                                 old allocs/op  new allocs/op  delta
    Tracing/Cockroach/Scan1-24                235 ± 2%       258 ± 1%   +9.50%  (p=0.000 n=10+9)
    Tracing/MultinodeCockroach/Scan1-24       760 ± 1%       891 ± 1%  +17.20%  (p=0.000 n=9+10)

After this PR:

    name                                 old time/op    new time/op    delta
    Tracing/Cockroach/Scan1-24              437µs ± 4%     443µs ± 3%     ~     (p=0.315 n=10+10)
    Tracing/MultinodeCockroach/Scan1-24     925µs ± 2%     968µs ± 1%   +4.62%  (p=0.000 n=10+10)

    name                                 old alloc/op   new alloc/op   delta
    Tracing/Cockroach/Scan1-24             23.3kB ± 3%    26.3kB ± 2%  +13.24%  (p=0.000 n=10+10)
    Tracing/MultinodeCockroach/Scan1-24    77.0kB ± 4%    81.7kB ± 3%   +6.08%  (p=0.000 n=10+10)

    name                                 old allocs/op  new allocs/op  delta
    Tracing/Cockroach/Scan1-24                236 ± 1%       241 ± 1%   +2.45%  (p=0.000 n=9+9)
    Tracing/MultinodeCockroach/Scan1-24       758 ± 1%       793 ± 2%   +4.67%  (p=0.000 n=10+10)

---

kv95/enc=false/nodes=1/cpu=32 across a few runs also shows a modest
improvement before and after this PR, using a sampling rate of 10%:

    36929.02 v. 37415.52  (reads/s)   +1.30%
     1944.38 v.  1968.94  (writes/s)  +1.24%

Release note: None


65378: sem/tree: remove redundant allocation when pg-encoding chars r=yuzefovich a=yuzefovich

In order to handle chars of the specified width previously we would
always allocate a new string. This is not needed if the given string
already satisfies the width requirement, so this commit removes that
allocation.

Release note: None

Co-authored-by: irfan sharif <irfanmahmoudsharif@gmail.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
3 people committed May 18, 2021
3 parents b4da7ff + eba03c4 + 8ae6e5a commit 73bcba9
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 158 deletions.
2 changes: 1 addition & 1 deletion pkg/rpc/context.go
Expand Up @@ -728,7 +728,7 @@ func (ctx *Context) grpcDialOptions(
// is in setupSpanForIncomingRPC().
//
tagger := func(span *tracing.Span) {
span.SetTag("node", ctx.NodeID.String())
span.SetTag("node", ctx.NodeID.Get().String())
}
unaryInterceptors = append(unaryInterceptors,
tracing.ClientInterceptor(tracer, tagger))
Expand Down
32 changes: 9 additions & 23 deletions pkg/sql/sem/builtins/builtins.go
Expand Up @@ -59,7 +59,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/duration"
Expand Down Expand Up @@ -3777,30 +3776,17 @@ may increase either contention or retry errors, or both.`,
traceID := uint64(*(args[0].(*tree.DInt)))
verbosity := bool(*(args[1].(*tree.DBool)))

const query = `SELECT span_id
FROM crdb_internal.node_inflight_trace_spans
WHERE trace_id = $1
AND parent_span_id = 0`

ie := ctx.InternalExecutor.(sqlutil.InternalExecutor)
row, err := ie.QueryRowEx(
ctx.Ctx(),
"crdb_internal.set_trace_verbose",
ctx.Txn,
sessiondata.NoSessionDataOverride,
query,
traceID,
)
if err != nil {
var rootSpan *tracing.Span
if err := ctx.Settings.Tracer.VisitSpans(func(span *tracing.Span) error {
if span.TraceID() == traceID && rootSpan == nil {
rootSpan = span
}

return nil
}); err != nil {
return nil, err
}
if row == nil {
return tree.DBoolFalse, nil
}
rootSpanID := uint64(*row[0].(*tree.DInt))

rootSpan, found := ctx.Settings.Tracer.GetActiveSpanFromID(rootSpanID)
if !found {
if rootSpan == nil { // not found
return tree.DBoolFalse, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/sem/tree/pgwire_encode.go
Expand Up @@ -22,9 +22,9 @@ import (
// ResolveBlankPaddedChar pads the given string with spaces if blank padding is
// required or returns the string unmodified otherwise.
func ResolveBlankPaddedChar(s string, t *types.T) string {
if t.Oid() == oid.T_bpchar {
// Pad spaces on the right of the string to make it of length specified in
// the type t.
if t.Oid() == oid.T_bpchar && len(s) < int(t.Width()) {
// Pad spaces on the right of the string to make it of length specified
// in the type t.
return fmt.Sprintf("%-*v", t.Width(), s)
}
return s
Expand Down
8 changes: 0 additions & 8 deletions pkg/util/tracing/alloc_test.go
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/logtags"
"github.com/opentracing/opentracing-go"
)

// BenchmarkTracer_StartSpanCtx primarily helps keep
Expand All @@ -35,10 +34,6 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) {
staticLogTags := logtags.Buffer{}
staticLogTags.Add("foo", "bar")

staticTag := opentracing.Tag{
Key: "statictag",
Value: "staticvalue",
}
b.ResetTimer()

parSp := tr.StartSpan("one-off", WithForceRealSpan())
Expand All @@ -55,9 +50,6 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) {
{"real,logtag", []SpanOption{
WithForceRealSpan(), WithLogTags(&staticLogTags),
}},
{"real,tag", []SpanOption{
WithForceRealSpan(), WithTags(staticTag),
}},
{"real,autoparent", []SpanOption{
WithForceRealSpan(), WithParentAndAutoCollection(parSp),
}},
Expand Down
123 changes: 102 additions & 21 deletions pkg/util/tracing/crdbspan.go
Expand Up @@ -29,6 +29,13 @@ import (
// crdbSpan is a span for internal crdb usage. This is used to power SQL session
// tracing.
type crdbSpan struct {
rootSpan *crdbSpan // root span of the containing trace; could be itself
// traceEmpty indicates whether or not the trace rooted at this span
// (provided it is a root span) contains any recordings or baggage. All
// spans hold a reference to the rootSpan; this field is accessed
// through that reference.
traceEmpty int32 // accessed atomically, through markTraceAsNonEmpty and inAnEmptyTrace

traceID uint64 // probabilistically unique
spanID uint64 // probabilistically unique
parentSpanID uint64
Expand Down Expand Up @@ -76,16 +83,16 @@ type crdbSpanMu struct {

// children contains the list of child spans started after this Span
// started recording.
children []*crdbSpan
children childSpanRefs
// remoteSpan contains the list of remote child span recordings that
// were manually imported.
remoteSpans []tracingpb.RecordedSpan
}

// tags are only set when recording. These are tags that have been added to
// this Span, and will be appended to the tags in logTags when someone
// needs to actually observe the total set of tags that is a part of this
// Span.
// tags are only captured when recording. These are tags that have been
// added to this Span, and will be appended to the tags in logTags when
// someone needs to actually observe the total set of tags that is a part of
// this Span.
// TODO(radu): perhaps we want a recording to capture all the tags (even
// those that were set before recording started)?
tags opentracing.Tags
Expand All @@ -94,6 +101,49 @@ type crdbSpanMu struct {
baggage map[string]string
}

type childSpanRefs struct {
refCount int
preAllocated [4]*crdbSpan
overflow []*crdbSpan
}

func (c *childSpanRefs) len() int {
return c.refCount
}

func (c *childSpanRefs) add(ref *crdbSpan) {
if c.refCount < len(c.preAllocated) {
c.preAllocated[c.refCount] = ref
c.refCount++
return
}

// Only record the child if the parent still has room.
if c.refCount < maxChildrenPerSpan {
c.overflow = append(c.overflow, ref)
c.refCount++
}
}

func (c *childSpanRefs) get(idx int) *crdbSpan {
if idx < len(c.preAllocated) {
ref := c.preAllocated[idx]
if ref == nil {
panic(fmt.Sprintf("idx %d out of bounds", idx))
}
return ref
}
return c.overflow[idx-len(c.preAllocated)]
}

func (c *childSpanRefs) reset() {
for i := 0; i < len(c.preAllocated); i++ {
c.preAllocated[i] = nil
}
c.overflow = nil
c.refCount = 0
}

func newSizeLimitedBuffer(limit int64) sizeLimitedBuffer {
return sizeLimitedBuffer{
limit: limit,
Expand Down Expand Up @@ -151,8 +201,7 @@ func (s *crdbSpan) resetRecording() {
s.mu.recording.logs.Reset()
s.mu.recording.structured.Reset()
s.mu.recording.dropped = false

s.mu.recording.children = nil
s.mu.recording.children.reset()
s.mu.recording.remoteSpans = nil
}

Expand All @@ -178,8 +227,6 @@ func (s *crdbSpan) getRecording(everyoneIsV211 bool, wantTags bool) Recording {
return nil // noop span
}

s.mu.Lock()

if !everyoneIsV211 {
// The cluster may contain nodes that are running v20.2. Unfortunately that
// version can easily crash when a peer returns a recording that that node
Expand All @@ -189,16 +236,27 @@ func (s *crdbSpan) getRecording(everyoneIsV211 bool, wantTags bool) Recording {
//
// TODO(tbg): remove this in the v21.2 cycle.
if s.recordingType() == RecordingOff {
s.mu.Unlock()
return nil
}
}

// The capacity here is approximate since we don't know how many grandchildren
// there are.
result := make(Recording, 0, 1+len(s.mu.recording.children)+len(s.mu.recording.remoteSpans))
// Return early (without allocating) if the trace is empty, i.e. there are
// no recordings or baggage. If the trace is verbose, we'll still recurse in
// order to pick up all the operations that were part of the trace, despite
// nothing having any actual data in them.
if s.recordingType() != RecordingVerbose && s.inAnEmptyTrace() {
return nil
}

s.mu.Lock()
// The capacity here is approximate since we don't know how many
// grandchildren there are.
result := make(Recording, 0, 1+s.mu.recording.children.len()+len(s.mu.recording.remoteSpans))
// Shallow-copy the children so we can process them without the lock.
children := s.mu.recording.children
var children []*crdbSpan
for i := 0; i < s.mu.recording.children.len(); i++ {
children = append(children, s.mu.recording.children.get(i))
}
result = append(result, s.getRecordingLocked(wantTags))
result = append(result, s.mu.recording.remoteSpans...)
s.mu.Unlock()
Expand All @@ -218,6 +276,11 @@ func (s *crdbSpan) getRecording(everyoneIsV211 bool, wantTags bool) Recording {
}

func (s *crdbSpan) importRemoteSpans(remoteSpans []tracingpb.RecordedSpan) {
if len(remoteSpans) == 0 {
return
}

s.markTraceAsNonEmpty()
// Change the root of the remote recording to be a child of this Span. This is
// usually already the case, except with DistSQL traces where remote
// processors run in spans that FollowFrom an RPC Span that we don't collect.
Expand All @@ -229,6 +292,11 @@ func (s *crdbSpan) importRemoteSpans(remoteSpans []tracingpb.RecordedSpan) {
}

func (s *crdbSpan) setTagLocked(key string, value interface{}) {
if s.recordingType() != RecordingVerbose {
// Don't bother storing tags if we're unlikely to retrieve them.
return
}

if s.mu.tags == nil {
s.mu.tags = make(opentracing.Tags)
}
Expand Down Expand Up @@ -266,10 +334,21 @@ type sizable interface {
Size() int
}

// inAnEmptyTrace indicates whether or not the containing trace is "empty" (i.e.
// has any recordings or baggage).
func (s *crdbSpan) inAnEmptyTrace() bool {
val := atomic.LoadInt32(&s.rootSpan.traceEmpty)
return val == 0
}

func (s *crdbSpan) markTraceAsNonEmpty() {
atomic.StoreInt32(&s.rootSpan.traceEmpty, 1)
}

func (s *crdbSpan) recordInternal(payload sizable, buffer *sizeLimitedBuffer) {
s.markTraceAsNonEmpty()
s.mu.Lock()
defer s.mu.Unlock()

size := int64(payload.Size())
if size > buffer.limit {
// The incoming payload alone blows past the memory limit. Let's just
Expand All @@ -291,6 +370,7 @@ func (s *crdbSpan) recordInternal(payload sizable, buffer *sizeLimitedBuffer) {
}

func (s *crdbSpan) setBaggageItemAndTag(restrictedKey, value string) {
s.markTraceAsNonEmpty()
s.mu.Lock()
defer s.mu.Unlock()
s.setBaggageItemLocked(restrictedKey, value)
Expand Down Expand Up @@ -405,11 +485,9 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan {

func (s *crdbSpan) addChild(child *crdbSpan) {
s.mu.Lock()
// Only record the child if the parent still has room.
if len(s.mu.recording.children) < maxChildrenPerSpan {
s.mu.recording.children = append(s.mu.recording.children, child)
}
s.mu.Unlock()
defer s.mu.Unlock()

s.mu.recording.children.add(child)
}

// setVerboseRecursively sets the verbosity of the crdbSpan appropriately and
Expand All @@ -422,7 +500,10 @@ func (s *crdbSpan) setVerboseRecursively(to bool) {
}

s.mu.Lock()
children := s.mu.recording.children
var children []*crdbSpan
for i := 0; i < s.mu.recording.children.len(); i++ {
children = append(children, s.mu.recording.children.get(i))
}
s.mu.Unlock()

for _, child := range children {
Expand Down
18 changes: 11 additions & 7 deletions pkg/util/tracing/grpc_interceptor.go
Expand Up @@ -57,7 +57,7 @@ func (w metadataCarrier) ForEach(fn func(key, val string) error) error {
return nil
}

func extractSpanMeta(ctx context.Context, tracer *Tracer) (*SpanMeta, error) {
func extractSpanMeta(ctx context.Context, tracer *Tracer) (SpanMeta, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(nil)
Expand All @@ -68,12 +68,12 @@ func extractSpanMeta(ctx context.Context, tracer *Tracer) (*SpanMeta, error) {
// spanInclusionFuncForServer is used as a SpanInclusionFunc for the server-side
// of RPCs, deciding for which operations the gRPC opentracing interceptor should
// create a span.
func spanInclusionFuncForServer(t *Tracer, spanMeta *SpanMeta) bool {
func spanInclusionFuncForServer(t *Tracer, spanMeta SpanMeta) bool {
// If there is an incoming trace on the RPC (spanMeta) or the tracer is
// configured to always trace, return true. The second part is particularly
// useful for calls coming through the HTTP->RPC gateway (i.e. the AdminUI),
// where client is never tracing.
return spanMeta != nil || t.AlwaysTrace()
return !spanMeta.Empty() || t.AlwaysTrace()
}

// setSpanTags sets one or more tags on the given span according to the
Expand Down Expand Up @@ -129,9 +129,10 @@ func ServerInterceptor(tracer *Tracer) grpc.UnaryServerInterceptor {
ctx, serverSpan := tracer.StartSpanCtx(
ctx,
info.FullMethod,
WithTags(gRPCComponentTag, ext.SpanKindRPCServer),
WithParentAndManualCollection(spanMeta),
)
serverSpan.SetTag(gRPCComponentTag.Key, gRPCComponentTag.Value)
serverSpan.SetTag(ext.SpanKindRPCServer.Key, ext.SpanKindRPCServer.Value)
defer serverSpan.Finish()

resp, err = handler(ctx, req)
Expand Down Expand Up @@ -172,9 +173,10 @@ func StreamServerInterceptor(tracer *Tracer) grpc.StreamServerInterceptor {
ctx, serverSpan := tracer.StartSpanCtx(
ss.Context(),
info.FullMethod,
WithTags(gRPCComponentTag, ext.SpanKindRPCServer),
WithParentAndManualCollection(spanMeta),
)
serverSpan.SetTag(gRPCComponentTag.Key, gRPCComponentTag.Value)
serverSpan.SetTag(ext.SpanKindRPCServer.Key, ext.SpanKindRPCServer.Value)
defer serverSpan.Finish()
ss = &tracingServerStream{
ServerStream: ss,
Expand Down Expand Up @@ -259,8 +261,9 @@ func ClientInterceptor(tracer *Tracer, init func(*Span)) grpc.UnaryClientInterce
clientSpan := tracer.StartSpan(
method,
WithParentAndAutoCollection(parent),
WithTags(gRPCComponentTag, ext.SpanKindRPCClient),
)
clientSpan.SetTag(gRPCComponentTag.Key, gRPCComponentTag.Value)
clientSpan.SetTag(ext.SpanKindRPCClient.Key, ext.SpanKindRPCClient.Value)
init(clientSpan)
defer clientSpan.Finish()
ctx = injectSpanMeta(ctx, tracer, clientSpan)
Expand Down Expand Up @@ -308,8 +311,9 @@ func StreamClientInterceptor(tracer *Tracer, init func(*Span)) grpc.StreamClient
clientSpan := tracer.StartSpan(
method,
WithParentAndAutoCollection(parent),
WithTags(gRPCComponentTag, ext.SpanKindRPCClient),
)
clientSpan.SetTag(gRPCComponentTag.Key, gRPCComponentTag.Value)
clientSpan.SetTag(ext.SpanKindRPCClient.Key, ext.SpanKindRPCClient.Value)
init(clientSpan)
ctx = injectSpanMeta(ctx, tracer, clientSpan)
cs, err := streamer(ctx, desc, cc, method, opts...)
Expand Down

0 comments on commit 73bcba9

Please sign in to comment.