diff --git a/profiler/internal/extensions/extensions.go b/profiler/internal/extensions/extensions.go new file mode 100644 index 0000000000..b449c52700 --- /dev/null +++ b/profiler/internal/extensions/extensions.go @@ -0,0 +1,60 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +// Package extensions provides an interface for using optional features. +// +// Features such as C allocation profiling might require cgo, unsafe code, and +// external non-Go dependencies which might not be desirable for typical users. +// The main profiler package should not import any package implementing such +// features directly as doing so may have undesired side effects. This package +// provides a bridge between the implementation of such optional features and +// the main profiler package. +package extensions + +import ( + "sync" + + "github.com/google/pprof/profile" +) + +// CAllocationProfiler is the interface for profiling allocations done through +// the standard malloc/calloc/realloc APIs. +// +// A CAllocationProfiler implementation is not necessarily safe to use from +// multiple goroutines concurrently. +type CAllocationProfiler interface { + // Start begins sampling C allocations at the given rate, in bytes. + // There will be an average of one sample for every rate bytes + // allocated. + Start(rate int) + // Stop cancels ongoing C allocation profiling and returns the resulting + // profile. The profile will have the correct sample types such that it + // can be merged with the Go heap profile. Returns a non-nil error if + // any part of the profiling failed. + Stop() (*profile.Profile, error) +} + +var ( + mu sync.Mutex + cAllocationProfiler CAllocationProfiler +) + +// GetCAllocationProfiler returns the currently registered C allocation +// profiler, if one is registered. +func GetCAllocationProfiler() (impl CAllocationProfiler, registered bool) { + mu.Lock() + defer mu.Unlock() + if cAllocationProfiler == nil { + return nil, false + } + return cAllocationProfiler, true +} + +// SetCAllocationProfiler registers a C allocation profiler implementation. +func SetCAllocationProfiler(c CAllocationProfiler) { + mu.Lock() + defer mu.Unlock() + cAllocationProfiler = c +} diff --git a/profiler/internal/pprofutils/delta.go b/profiler/internal/pprofutils/delta.go index 1326db36b7..263668bd9c 100644 --- a/profiler/internal/pprofutils/delta.go +++ b/profiler/internal/pprofutils/delta.go @@ -30,7 +30,10 @@ type Delta struct { // profile. Samples that end up with a delta of 0 are dropped. WARNING: Profile // a will be mutated by this function. You should pass a copy if that's // undesirable. -func (d Delta) Convert(a, b *profile.Profile) (*profile.Profile, error) { +// +// Other profiles that should be merged into the resulting profile can be passed +// through the extra parameter. +func (d Delta) Convert(a, b *profile.Profile, extra ...*profile.Profile) (*profile.Profile, error) { ratios := make([]float64, len(a.SampleType)) found := 0 @@ -58,7 +61,10 @@ func (d Delta) Convert(a, b *profile.Profile) (*profile.Profile, error) { a.ScaleN(ratios) - delta, err := profile.Merge([]*profile.Profile{a, b}) + profiles := make([]*profile.Profile, 0, 2+len(extra)) + profiles = append(profiles, a, b) + profiles = append(profiles, extra...) + delta, err := profile.Merge(profiles) if err != nil { return nil, err } diff --git a/profiler/profile.go b/profiler/profile.go index 038d077dc8..5c6eaa1cf1 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -13,6 +13,7 @@ import ( "runtime" "time" + "gopkg.in/DataDog/dd-trace-go.v1/profiler/internal/extensions" "gopkg.in/DataDog/dd-trace-go.v1/profiler/internal/pprofutils" "github.com/DataDog/gostackparse" @@ -63,14 +64,12 @@ type profileType struct { // this isn't done due to idiosyncratic filename used by the // GoroutineProfile. Filename string - // Delta controls if this profile should be generated as a delta profile. - // This is useful for profiles that represent samples collected over the - // lifetime of the process (i.e. heap, block, mutex). If nil, no delta - // profile is generated. - Delta *pprofutils.Delta + // SupportsDelta indicates whether delta profiles can be computed for + // this profile type, which is used to determine the final filename + SupportsDelta bool // Collect collects the given profile and returns the data for it. Most // profiles will be in pprof format, i.e. gzip compressed proto buf data. - Collect func(profileType, *profiler) ([]byte, error) + Collect func(p *profiler) ([]byte, error) } // profileTypes maps every ProfileType to its implementation. @@ -78,7 +77,7 @@ var profileTypes = map[ProfileType]profileType{ CPUProfile: { Name: "cpu", Filename: "cpu.pprof", - Collect: func(_ profileType, p *profiler) ([]byte, error) { + Collect: func(p *profiler) ([]byte, error) { var buf bytes.Buffer if p.cfg.cpuProfileRate != 0 { // The profile has to be set each time before @@ -103,33 +102,33 @@ var profileTypes = map[ProfileType]profileType{ HeapProfile: { Name: "heap", Filename: "heap.pprof", - Delta: &pprofutils.Delta{SampleTypes: []pprofutils.ValueType{ + Collect: collectGenericProfile("heap", &pprofutils.Delta{SampleTypes: []pprofutils.ValueType{ {Type: "alloc_objects", Unit: "count"}, {Type: "alloc_space", Unit: "bytes"}, - }}, - Collect: collectGenericProfile, + }}), + SupportsDelta: true, }, MutexProfile: { - Name: "mutex", - Filename: "mutex.pprof", - Delta: &pprofutils.Delta{}, - Collect: collectGenericProfile, + Name: "mutex", + Filename: "mutex.pprof", + Collect: collectGenericProfile("mutex", &pprofutils.Delta{}), + SupportsDelta: true, }, BlockProfile: { - Name: "block", - Filename: "block.pprof", - Delta: &pprofutils.Delta{}, - Collect: collectGenericProfile, + Name: "block", + Filename: "block.pprof", + Collect: collectGenericProfile("block", &pprofutils.Delta{}), + SupportsDelta: true, }, GoroutineProfile: { Name: "goroutine", Filename: "goroutines.pprof", - Collect: collectGenericProfile, + Collect: collectGenericProfile("goroutine", nil), }, expGoroutineWaitProfile: { Name: "goroutinewait", Filename: "goroutineswait.pprof", - Collect: func(t profileType, p *profiler) ([]byte, error) { + Collect: func(p *profiler) ([]byte, error) { if n := runtime.NumGoroutine(); n > p.cfg.maxGoroutinesWait { return nil, fmt.Errorf("skipping goroutines wait profile: %d goroutines exceeds DD_PROFILING_WAIT_PROFILE_MAX_GOROUTINES limit of %d", n, p.cfg.maxGoroutinesWait) } @@ -149,7 +148,7 @@ var profileTypes = map[ProfileType]profileType{ MetricsProfile: { Name: "metrics", Filename: "metrics.json", - Collect: func(_ profileType, p *profiler) ([]byte, error) { + Collect: func(p *profiler) ([]byte, error) { var buf bytes.Buffer err := p.met.report(now(), &buf) return buf.Bytes(), err @@ -157,10 +156,42 @@ var profileTypes = map[ProfileType]profileType{ }, } -func collectGenericProfile(t profileType, p *profiler) ([]byte, error) { - var buf bytes.Buffer - err := p.lookupProfile(t.Name, &buf, 0) - return buf.Bytes(), err +func collectGenericProfile(name string, delta *pprofutils.Delta) func(p *profiler) ([]byte, error) { + return func(p *profiler) ([]byte, error) { + var extra []*pprofile.Profile + // TODO: add type safety for name == "heap" check and remove redunancy with profileType.Name. + if cAlloc, ok := extensions.GetCAllocationProfiler(); ok && p.cfg.deltaProfiles && name == "heap" { + // For the heap profile, we'd also like to include C + // allocations if that extension is enabled and have the + // allocations show up in the same profile. Collect them + // first before getting the regular heap snapshot so + // that all allocations cover the same time period + // + // TODO: Support non-delta profiles for C allocations? + cAlloc.Start(2 * 1024 * 1024) + p.interruptibleSleep(p.cfg.period) + profile, err := cAlloc.Stop() + if err == nil { + extra = append(extra, profile) + } + } + + var buf bytes.Buffer + err := p.lookupProfile(name, &buf, 0) + data := buf.Bytes() + if delta == nil || !p.cfg.deltaProfiles { + return data, err + } + + start := time.Now() + delta, err := p.deltaProfile(name, delta, data, extra...) + tags := append(p.cfg.tags, fmt.Sprintf("profile_type:%s", name)) + p.cfg.statsd.Timing("datadog.profiler.go.delta_time", time.Since(start), tags, 1) + if err != nil { + return nil, fmt.Errorf("delta profile error: %s", err) + } + return delta.data, err + } } // lookup returns t's profileType implementation. @@ -174,7 +205,7 @@ func (t ProfileType) lookup() profileType { Type: t, Name: "unknown", Filename: "unknown", - Collect: func(_ profileType, _ *profiler) ([]byte, error) { + Collect: func(_ *profiler) ([]byte, error) { return nil, errors.New("profile type not implemented") }, } @@ -217,49 +248,31 @@ func (b *batch) addProfile(p *profile) { func (p *profiler) runProfile(pt ProfileType) ([]*profile, error) { start := now() t := pt.lookup() - // Collect the original profile as-is. - data, err := t.Collect(t, p) + data, err := t.Collect(p) if err != nil { return nil, err } - // Compute the deltaProf (will be nil if not enabled for this profile type). - deltaStart := time.Now() - deltaProf, err := p.deltaProfile(t, data) - if err != nil { - return nil, fmt.Errorf("delta profile error: %s", err) - } end := now() tags := append(p.cfg.tags, pt.Tag()) - var profs []*profile - if deltaProf != nil { - profs = append(profs, deltaProf) - p.cfg.statsd.Timing("datadog.profiler.go.delta_time", end.Sub(deltaStart), tags, 1) - } else { - // If the user has disabled delta profiles, or the profile type - // doesn't support delta profiles (like the CPU profile) then - // send the original profile unchanged. - profs = append(profs, &profile{ - name: t.Filename, - data: data, - }) + filename := t.Filename + // TODO(fg): Consider making Collect() return the filename. + if p.cfg.deltaProfiles && t.SupportsDelta { + filename = "delta-" + filename } p.cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1) - return profs, nil + return []*profile{{name: filename, data: data}}, nil } // deltaProfile derives the delta profile between curData and the previous -// profile. For profile types that don't have delta profiling enabled, or -// WithDeltaProfiles(false), it simply returns nil, nil. -func (p *profiler) deltaProfile(t profileType, curData []byte) (*profile, error) { - if !p.cfg.deltaProfiles || t.Delta == nil { - return nil, nil - } +// profile. If extra profiles are provided, they will be merged into the final +// profile after computing the delta profile. +func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []byte, extra ...*pprofile.Profile) (*profile, error) { curProf, err := pprofile.ParseData(curData) if err != nil { return nil, fmt.Errorf("delta prof parse: %v", err) } var deltaData []byte - if prevProf := p.prev[t.Type]; prevProf == nil { + if prevProf := p.prev[name]; prevProf == nil { // First time deltaProfile gets called for a type, there is no prevProf. In // this case we emit the current profile as a delta profile. deltaData = curData @@ -268,7 +281,7 @@ func (p *profiler) deltaProfile(t profileType, curData []byte) (*profile, error) // Unfortunately the core implementation isn't resuable via a API, so we do // our own delta calculation below. // https://github.com/golang/go/commit/2ff1e3ebf5de77325c0e96a6c2a229656fc7be50#diff-94594f8f13448da956b02997e50ca5a156b65085993e23bbfdda222da6508258R303-R304 - deltaProf, err := t.Delta.Convert(prevProf, curProf) + deltaProf, err := delta.Convert(prevProf, curProf, extra...) if err != nil { return nil, fmt.Errorf("delta prof merge: %v", err) } @@ -285,11 +298,8 @@ func (p *profiler) deltaProfile(t profileType, curData []byte) (*profile, error) } // Keep the most recent profiles in memory for future diffing. This needs to // be taken into account when enforcing memory limits going forward. - p.prev[t.Type] = curProf - return &profile{ - name: "delta-" + t.Filename, - data: deltaData, - }, nil + p.prev[name] = curProf + return &profile{data: deltaData}, nil } func goroutineDebug2ToPprof(r io.Reader, w io.Writer, t time.Time) (err error) { diff --git a/profiler/profiler.go b/profiler/profiler.go index 4e33a6d998..d5d6f4bbee 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -63,14 +63,14 @@ func Stop() { // profiler collects and sends preset profiles to the Datadog API at a given frequency // using a given configuration. type profiler struct { - cfg *config // profile configuration - out chan batch // upload queue - uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests - exit chan struct{} // exit signals the profiler to stop; it is closed after stopping - stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once. - wg sync.WaitGroup // wg waits for all goroutines to exit when stopping. - met *metrics // metric collector state - prev map[ProfileType]*pprofile.Profile // previous collection results for delta profiling + cfg *config // profile configuration + out chan batch // upload queue + uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests + exit chan struct{} // exit signals the profiler to stop; it is closed after stopping + stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once. + wg sync.WaitGroup // wg waits for all goroutines to exit when stopping. + met *metrics // metric collector state + prev map[string]*pprofile.Profile // previous collection results for delta profiling testHooks testHooks } @@ -178,7 +178,7 @@ func newProfiler(opts ...Option) (*profiler, error) { out: make(chan batch, outChannelSize), exit: make(chan struct{}), met: newMetrics(), - prev: make(map[ProfileType]*pprofile.Profile), + prev: make(map[string]*pprofile.Profile), } p.uploadFunc = p.upload return &p, nil