Skip to content

Commit

Permalink
profiler: collect profiles concurrently (#1282)
Browse files Browse the repository at this point in the history
* profiler: collect profiles concurrently

The upcoming C allocation profile will work by starting a profile,
waiting for the profile period to elapse, then stopping and collecting
the output. This won't work if the profiles are collected synchronously
since the CPU profile is also collected this way, meaning there won't be
enough time in a single profile period to collect both. So the profiles
need to be collected concurrently.

With concurrent profile collection, all profiles should block so that
they're collected at the end of the profile period. This preserves the
property that all profiles in a batch cover the same time, so events in
one profile correspond to events in another.

Along the way, a few tests needed to be fixed to not take 1 minute (the
default profiling period) to complete. There was also a potential panic
in TestAllUploaded if the sync.WaitGroup count goes negative due to
multiple uploads going through before profiling is stopped.

Also fixed unbounded growth of the collected profiles slice. Whoops!
Made TestAllUploaded check the *second* batch of profiles to catch this
bug.
  • Loading branch information
nsrip-dd committed May 12, 2022
1 parent 46b9da9 commit 9abb5f3
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 17 deletions.
16 changes: 14 additions & 2 deletions profiler/profile.go
Expand Up @@ -160,7 +160,9 @@ func collectGenericProfile(name string, delta *pprofutils.Delta) func(p *profile
return func(p *profiler) ([]byte, error) {
var extra []*pprofile.Profile
// TODO: add type safety for name == "heap" check and remove redunancy with profileType.Name.
if cAlloc, ok := extensions.GetCAllocationProfiler(); ok && p.cfg.deltaProfiles && name == "heap" {
cAlloc, ok := extensions.GetCAllocationProfiler()
switch {
case ok && p.cfg.deltaProfiles && name == "heap":
// For the heap profile, we'd also like to include C
// allocations if that extension is enabled and have the
// allocations show up in the same profile. Collect them
Expand All @@ -174,6 +176,11 @@ func collectGenericProfile(name string, delta *pprofutils.Delta) func(p *profile
if err == nil {
extra = append(extra, profile)
}
default:
// In all cases, sleep until the end of the profile
// period so that all profiles cover the same period of
// time
p.interruptibleSleep(p.cfg.period)
}

var buf bytes.Buffer
Expand Down Expand Up @@ -272,7 +279,10 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []
return nil, fmt.Errorf("delta prof parse: %v", err)
}
var deltaData []byte
if prevProf := p.prev[name]; prevProf == nil {
p.mu.Lock()
prevProf := p.prev[name]
p.mu.Unlock()
if prevProf == nil {
// First time deltaProfile gets called for a type, there is no prevProf. In
// this case we emit the current profile as a delta profile.
deltaData = curData
Expand All @@ -298,7 +308,9 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []
}
// Keep the most recent profiles in memory for future diffing. This needs to
// be taken into account when enforcing memory limits going forward.
p.mu.Lock()
p.prev[name] = curProf
p.mu.Unlock()
return &profile{data: deltaData}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion profiler/profile_test.go
Expand Up @@ -117,6 +117,7 @@ main;bar 0 0 8 16
// followed by prof2 when calling runProfile().
deltaProfiler := func(prof1, prof2 []byte, opts ...Option) (*profiler, func()) {
returnProfs := [][]byte{prof1, prof2}
opts = append(opts, WithPeriod(5*time.Millisecond))
p, err := unstartedProfiler(opts...)
p.testHooks.lookupProfile = func(_ string, w io.Writer, _ int) error {
_, err := w.Write(returnProfs[0])
Expand Down Expand Up @@ -189,7 +190,7 @@ main;bar 0 0 8 16
})

t.Run("goroutine", func(t *testing.T) {
p, err := unstartedProfiler()
p, err := unstartedProfiler(WithPeriod(time.Millisecond))
p.testHooks.lookupProfile = func(name string, w io.Writer, _ int) error {
_, err := w.Write([]byte(name))
return err
Expand Down
33 changes: 24 additions & 9 deletions profiler/profiler.go
Expand Up @@ -63,6 +63,7 @@ func Stop() {
// profiler collects and sends preset profiles to the Datadog API at a given frequency
// using a given configuration.
type profiler struct {
mu sync.Mutex
cfg *config // profile configuration
out chan batch // upload queue
uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests
Expand Down Expand Up @@ -211,6 +212,12 @@ func (p *profiler) run() {
// an item.
func (p *profiler) collect(ticker <-chan time.Time) {
defer close(p.out)
var (
// mu guards completed
mu sync.Mutex
completed []*profile
wg sync.WaitGroup
)
for {
select {
case <-ticker:
Expand All @@ -225,16 +232,24 @@ func (p *profiler) collect(ticker <-chan time.Time) {
end: now.Add(p.cfg.cpuDuration),
}

completed = completed[:0]
for _, t := range p.enabledProfileTypes() {
profs, err := p.runProfile(t)
if err != nil {
log.Error("Error getting %s profile: %v; skipping.", t, err)
p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1)
continue
}
for _, prof := range profs {
bat.addProfile(prof)
}
wg.Add(1)
go func(t ProfileType) {
defer wg.Done()
profs, err := p.runProfile(t)
if err != nil {
log.Error("Error getting %s profile: %v; skipping.", t, err)
p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1)
}
mu.Lock()
defer mu.Unlock()
completed = append(completed, profs...)
}(t)
}
wg.Wait()
for _, prof := range completed {
bat.addProfile(prof)
}
p.enqueueUpload(bat)
case <-p.exit:
Expand Down
28 changes: 23 additions & 5 deletions profiler/profiler_test.go
Expand Up @@ -213,7 +213,10 @@ func TestStopLatency(t *testing.T) {
stopped <- struct{}{}
}()

timeout := 20 * time.Millisecond
// CPU profiling polls in 100 millisecond intervals and this can't be
// interrupted by pprof.StopCPUProfile, so we can't guarantee profiling
// will stop faster than that.
timeout := 200 * time.Millisecond
select {
case <-stopped:
case <-time.After(timeout):
Expand All @@ -227,6 +230,7 @@ func TestStopLatency(t *testing.T) {
func TestProfilerInternal(t *testing.T) {
t.Run("collect", func(t *testing.T) {
p, err := unstartedProfiler(
WithPeriod(1*time.Millisecond),
CPUDuration(1*time.Millisecond),
WithProfileTypes(HeapProfile, CPUProfile),
)
Expand Down Expand Up @@ -344,17 +348,31 @@ func TestAllUploaded(t *testing.T) {
//
// TODO: Further check that the uploaded profiles are all valid
var (
wg sync.WaitGroup
profiles []string
)
// received indicates that the server has received a profile upload.
// This is used similarly to a sync.WaitGroup but avoids a potential
// panic if too many requests are received before profiling is stopped
// and the WaitGroup count goes negative.
//
// The channel is buffered with 2 entries so we can check that the
// second batch of profiles is correct in case the profiler gets in a
// bad state after the first round of profiling.
received := make(chan struct{}, 2)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer wg.Done()
defer func() {
select {
case received <- struct{}{}:
default:
}
}()
_, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
t.Fatalf("bad media type: %s", err)
return
}
mr := multipart.NewReader(r.Body, params["boundary"])
profiles = profiles[:0]
for {
p, err := mr.NextPart()
if err == io.EOF {
Expand All @@ -369,7 +387,6 @@ func TestAllUploaded(t *testing.T) {
}
}))
defer server.Close()
wg.Add(1)

// re-implemented testing.T.Setenv since that function requires Go 1.17
old, ok := os.LookupEnv("DD_PROFILING_WAIT_PROFILE")
Expand All @@ -392,7 +409,8 @@ func TestAllUploaded(t *testing.T) {
CPUDuration(1*time.Millisecond),
)
defer Stop()
wg.Wait()
<-received
<-received

expected := []string{
"data[cpu.pprof]",
Expand Down

0 comments on commit 9abb5f3

Please sign in to comment.