Skip to content

Commit

Permalink
profiler: add extension hook for C allocation profiling (#1264)
Browse files Browse the repository at this point in the history
This is a first pass at adding an internal bridge for integrating C
allocation profiling with the regular heap profile. C allocation
profiling will require cgo and possibly other external dependencies and
non-standard linker flags. As such, the main profiler package should not
directly import the C allocation profiler implementation. But there
still needs to be some way to actually _use_ the allocation profiler if
it's available. This commit adds a bridge for doing so, and makes a few
changes to enable actually running the profile and merging it with the
heap profile.
  • Loading branch information
nsrip-dd committed May 5, 2022
1 parent 61462f0 commit 3de9a7d
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 71 deletions.
60 changes: 60 additions & 0 deletions profiler/internal/extensions/extensions.go
@@ -0,0 +1,60 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022 Datadog, Inc.

// Package extensions provides an interface for using optional features.
//
// Features such as C allocation profiling might require cgo, unsafe code, and
// external non-Go dependencies which might not be desirable for typical users.
// The main profiler package should not import any package implementing such
// features directly as doing so may have undesired side effects. This package
// provides a bridge between the implementation of such optional features and
// the main profiler package.
package extensions

import (
"sync"

"github.com/google/pprof/profile"
)

// CAllocationProfiler is the interface for profiling allocations done through
// the standard malloc/calloc/realloc APIs.
//
// A CAllocationProfiler implementation is not necessarily safe to use from
// multiple goroutines concurrently.
type CAllocationProfiler interface {
// Start begins sampling C allocations at the given rate, in bytes.
// There will be an average of one sample for every rate bytes
// allocated.
Start(rate int)
// Stop cancels ongoing C allocation profiling and returns the resulting
// profile. The profile will have the correct sample types such that it
// can be merged with the Go heap profile. Returns a non-nil error if
// any part of the profiling failed.
Stop() (*profile.Profile, error)
}

var (
mu sync.Mutex
cAllocationProfiler CAllocationProfiler
)

// GetCAllocationProfiler returns the currently registered C allocation
// profiler, if one is registered.
func GetCAllocationProfiler() (impl CAllocationProfiler, registered bool) {
mu.Lock()
defer mu.Unlock()
if cAllocationProfiler == nil {
return nil, false
}
return cAllocationProfiler, true
}

// SetCAllocationProfiler registers a C allocation profiler implementation.
func SetCAllocationProfiler(c CAllocationProfiler) {
mu.Lock()
defer mu.Unlock()
cAllocationProfiler = c
}
10 changes: 8 additions & 2 deletions profiler/internal/pprofutils/delta.go
Expand Up @@ -30,7 +30,10 @@ type Delta struct {
// profile. Samples that end up with a delta of 0 are dropped. WARNING: Profile
// a will be mutated by this function. You should pass a copy if that's
// undesirable.
func (d Delta) Convert(a, b *profile.Profile) (*profile.Profile, error) {
//
// Other profiles that should be merged into the resulting profile can be passed
// through the extra parameter.
func (d Delta) Convert(a, b *profile.Profile, extra ...*profile.Profile) (*profile.Profile, error) {
ratios := make([]float64, len(a.SampleType))

found := 0
Expand Down Expand Up @@ -58,7 +61,10 @@ func (d Delta) Convert(a, b *profile.Profile) (*profile.Profile, error) {

a.ScaleN(ratios)

delta, err := profile.Merge([]*profile.Profile{a, b})
profiles := make([]*profile.Profile, 0, 2+len(extra))
profiles = append(profiles, a, b)
profiles = append(profiles, extra...)
delta, err := profile.Merge(profiles)
if err != nil {
return nil, err
}
Expand Down
130 changes: 70 additions & 60 deletions profiler/profile.go
Expand Up @@ -13,6 +13,7 @@ import (
"runtime"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/profiler/internal/extensions"
"gopkg.in/DataDog/dd-trace-go.v1/profiler/internal/pprofutils"

"github.com/DataDog/gostackparse"
Expand Down Expand Up @@ -63,22 +64,20 @@ type profileType struct {
// this isn't done due to idiosyncratic filename used by the
// GoroutineProfile.
Filename string
// Delta controls if this profile should be generated as a delta profile.
// This is useful for profiles that represent samples collected over the
// lifetime of the process (i.e. heap, block, mutex). If nil, no delta
// profile is generated.
Delta *pprofutils.Delta
// SupportsDelta indicates whether delta profiles can be computed for
// this profile type, which is used to determine the final filename
SupportsDelta bool
// Collect collects the given profile and returns the data for it. Most
// profiles will be in pprof format, i.e. gzip compressed proto buf data.
Collect func(profileType, *profiler) ([]byte, error)
Collect func(p *profiler) ([]byte, error)
}

// profileTypes maps every ProfileType to its implementation.
var profileTypes = map[ProfileType]profileType{
CPUProfile: {
Name: "cpu",
Filename: "cpu.pprof",
Collect: func(_ profileType, p *profiler) ([]byte, error) {
Collect: func(p *profiler) ([]byte, error) {
var buf bytes.Buffer
if p.cfg.cpuProfileRate != 0 {
// The profile has to be set each time before
Expand All @@ -103,33 +102,33 @@ var profileTypes = map[ProfileType]profileType{
HeapProfile: {
Name: "heap",
Filename: "heap.pprof",
Delta: &pprofutils.Delta{SampleTypes: []pprofutils.ValueType{
Collect: collectGenericProfile("heap", &pprofutils.Delta{SampleTypes: []pprofutils.ValueType{
{Type: "alloc_objects", Unit: "count"},
{Type: "alloc_space", Unit: "bytes"},
}},
Collect: collectGenericProfile,
}}),
SupportsDelta: true,
},
MutexProfile: {
Name: "mutex",
Filename: "mutex.pprof",
Delta: &pprofutils.Delta{},
Collect: collectGenericProfile,
Name: "mutex",
Filename: "mutex.pprof",
Collect: collectGenericProfile("mutex", &pprofutils.Delta{}),
SupportsDelta: true,
},
BlockProfile: {
Name: "block",
Filename: "block.pprof",
Delta: &pprofutils.Delta{},
Collect: collectGenericProfile,
Name: "block",
Filename: "block.pprof",
Collect: collectGenericProfile("block", &pprofutils.Delta{}),
SupportsDelta: true,
},
GoroutineProfile: {
Name: "goroutine",
Filename: "goroutines.pprof",
Collect: collectGenericProfile,
Collect: collectGenericProfile("goroutine", nil),
},
expGoroutineWaitProfile: {
Name: "goroutinewait",
Filename: "goroutineswait.pprof",
Collect: func(t profileType, p *profiler) ([]byte, error) {
Collect: func(p *profiler) ([]byte, error) {
if n := runtime.NumGoroutine(); n > p.cfg.maxGoroutinesWait {
return nil, fmt.Errorf("skipping goroutines wait profile: %d goroutines exceeds DD_PROFILING_WAIT_PROFILE_MAX_GOROUTINES limit of %d", n, p.cfg.maxGoroutinesWait)
}
Expand All @@ -149,18 +148,50 @@ var profileTypes = map[ProfileType]profileType{
MetricsProfile: {
Name: "metrics",
Filename: "metrics.json",
Collect: func(_ profileType, p *profiler) ([]byte, error) {
Collect: func(p *profiler) ([]byte, error) {
var buf bytes.Buffer
err := p.met.report(now(), &buf)
return buf.Bytes(), err
},
},
}

func collectGenericProfile(t profileType, p *profiler) ([]byte, error) {
var buf bytes.Buffer
err := p.lookupProfile(t.Name, &buf, 0)
return buf.Bytes(), err
func collectGenericProfile(name string, delta *pprofutils.Delta) func(p *profiler) ([]byte, error) {
return func(p *profiler) ([]byte, error) {
var extra []*pprofile.Profile
// TODO: add type safety for name == "heap" check and remove redunancy with profileType.Name.
if cAlloc, ok := extensions.GetCAllocationProfiler(); ok && p.cfg.deltaProfiles && name == "heap" {
// For the heap profile, we'd also like to include C
// allocations if that extension is enabled and have the
// allocations show up in the same profile. Collect them
// first before getting the regular heap snapshot so
// that all allocations cover the same time period
//
// TODO: Support non-delta profiles for C allocations?
cAlloc.Start(2 * 1024 * 1024)
p.interruptibleSleep(p.cfg.period)
profile, err := cAlloc.Stop()
if err == nil {
extra = append(extra, profile)
}
}

var buf bytes.Buffer
err := p.lookupProfile(name, &buf, 0)
data := buf.Bytes()
if delta == nil || !p.cfg.deltaProfiles {
return data, err
}

start := time.Now()
delta, err := p.deltaProfile(name, delta, data, extra...)
tags := append(p.cfg.tags, fmt.Sprintf("profile_type:%s", name))
p.cfg.statsd.Timing("datadog.profiler.go.delta_time", time.Since(start), tags, 1)
if err != nil {
return nil, fmt.Errorf("delta profile error: %s", err)
}
return delta.data, err
}
}

// lookup returns t's profileType implementation.
Expand All @@ -174,7 +205,7 @@ func (t ProfileType) lookup() profileType {
Type: t,
Name: "unknown",
Filename: "unknown",
Collect: func(_ profileType, _ *profiler) ([]byte, error) {
Collect: func(_ *profiler) ([]byte, error) {
return nil, errors.New("profile type not implemented")
},
}
Expand Down Expand Up @@ -217,49 +248,31 @@ func (b *batch) addProfile(p *profile) {
func (p *profiler) runProfile(pt ProfileType) ([]*profile, error) {
start := now()
t := pt.lookup()
// Collect the original profile as-is.
data, err := t.Collect(t, p)
data, err := t.Collect(p)
if err != nil {
return nil, err
}
// Compute the deltaProf (will be nil if not enabled for this profile type).
deltaStart := time.Now()
deltaProf, err := p.deltaProfile(t, data)
if err != nil {
return nil, fmt.Errorf("delta profile error: %s", err)
}
end := now()
tags := append(p.cfg.tags, pt.Tag())
var profs []*profile
if deltaProf != nil {
profs = append(profs, deltaProf)
p.cfg.statsd.Timing("datadog.profiler.go.delta_time", end.Sub(deltaStart), tags, 1)
} else {
// If the user has disabled delta profiles, or the profile type
// doesn't support delta profiles (like the CPU profile) then
// send the original profile unchanged.
profs = append(profs, &profile{
name: t.Filename,
data: data,
})
filename := t.Filename
// TODO(fg): Consider making Collect() return the filename.
if p.cfg.deltaProfiles && t.SupportsDelta {
filename = "delta-" + filename
}
p.cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1)
return profs, nil
return []*profile{{name: filename, data: data}}, nil
}

// deltaProfile derives the delta profile between curData and the previous
// profile. For profile types that don't have delta profiling enabled, or
// WithDeltaProfiles(false), it simply returns nil, nil.
func (p *profiler) deltaProfile(t profileType, curData []byte) (*profile, error) {
if !p.cfg.deltaProfiles || t.Delta == nil {
return nil, nil
}
// profile. If extra profiles are provided, they will be merged into the final
// profile after computing the delta profile.
func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []byte, extra ...*pprofile.Profile) (*profile, error) {
curProf, err := pprofile.ParseData(curData)
if err != nil {
return nil, fmt.Errorf("delta prof parse: %v", err)
}
var deltaData []byte
if prevProf := p.prev[t.Type]; prevProf == nil {
if prevProf := p.prev[name]; prevProf == nil {
// First time deltaProfile gets called for a type, there is no prevProf. In
// this case we emit the current profile as a delta profile.
deltaData = curData
Expand All @@ -268,7 +281,7 @@ func (p *profiler) deltaProfile(t profileType, curData []byte) (*profile, error)
// Unfortunately the core implementation isn't resuable via a API, so we do
// our own delta calculation below.
// https://github.com/golang/go/commit/2ff1e3ebf5de77325c0e96a6c2a229656fc7be50#diff-94594f8f13448da956b02997e50ca5a156b65085993e23bbfdda222da6508258R303-R304
deltaProf, err := t.Delta.Convert(prevProf, curProf)
deltaProf, err := delta.Convert(prevProf, curProf, extra...)
if err != nil {
return nil, fmt.Errorf("delta prof merge: %v", err)
}
Expand All @@ -285,11 +298,8 @@ func (p *profiler) deltaProfile(t profileType, curData []byte) (*profile, error)
}
// Keep the most recent profiles in memory for future diffing. This needs to
// be taken into account when enforcing memory limits going forward.
p.prev[t.Type] = curProf
return &profile{
name: "delta-" + t.Filename,
data: deltaData,
}, nil
p.prev[name] = curProf
return &profile{data: deltaData}, nil
}

func goroutineDebug2ToPprof(r io.Reader, w io.Writer, t time.Time) (err error) {
Expand Down
18 changes: 9 additions & 9 deletions profiler/profiler.go
Expand Up @@ -63,14 +63,14 @@ func Stop() {
// profiler collects and sends preset profiles to the Datadog API at a given frequency
// using a given configuration.
type profiler struct {
cfg *config // profile configuration
out chan batch // upload queue
uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests
exit chan struct{} // exit signals the profiler to stop; it is closed after stopping
stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once.
wg sync.WaitGroup // wg waits for all goroutines to exit when stopping.
met *metrics // metric collector state
prev map[ProfileType]*pprofile.Profile // previous collection results for delta profiling
cfg *config // profile configuration
out chan batch // upload queue
uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests
exit chan struct{} // exit signals the profiler to stop; it is closed after stopping
stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once.
wg sync.WaitGroup // wg waits for all goroutines to exit when stopping.
met *metrics // metric collector state
prev map[string]*pprofile.Profile // previous collection results for delta profiling

testHooks testHooks
}
Expand Down Expand Up @@ -178,7 +178,7 @@ func newProfiler(opts ...Option) (*profiler, error) {
out: make(chan batch, outChannelSize),
exit: make(chan struct{}),
met: newMetrics(),
prev: make(map[ProfileType]*pprofile.Profile),
prev: make(map[string]*pprofile.Profile),
}
p.uploadFunc = p.upload
return &p, nil
Expand Down

0 comments on commit 3de9a7d

Please sign in to comment.