From 9abb5f35f7af13f485fd3b4f550102c79972c5de Mon Sep 17 00:00:00 2001 From: Nick Ripley <97066770+nsrip-dd@users.noreply.github.com> Date: Thu, 12 May 2022 11:17:47 -0400 Subject: [PATCH] profiler: collect profiles concurrently (#1282) * profiler: collect profiles concurrently The upcoming C allocation profile will work by starting a profile, waiting for the profile period to elapse, then stopping and collecting the output. This won't work if the profiles are collected synchronously since the CPU profile is also collected this way, meaning there won't be enough time in a single profile period to collect both. So the profiles need to be collected concurrently. With concurrent profile collection, all profiles should block so that they're collected at the end of the profile period. This preserves the property that all profiles in a batch cover the same time, so events in one profile correspond to events in another. Along the way, a few tests needed to be fixed to not take 1 minute (the default profiling period) to complete. There was also a potential panic in TestAllUploaded if the sync.WaitGroup count goes negative due to multiple uploads going through before profiling is stopped. Also fixed unbounded growth of the collected profiles slice. Whoops! Made TestAllUploaded check the *second* batch of profiles to catch this bug. --- profiler/profile.go | 16 ++++++++++++++-- profiler/profile_test.go | 3 ++- profiler/profiler.go | 33 ++++++++++++++++++++++++--------- profiler/profiler_test.go | 28 +++++++++++++++++++++++----- 4 files changed, 63 insertions(+), 17 deletions(-) diff --git a/profiler/profile.go b/profiler/profile.go index 5c6eaa1cf1..1e7ca048e0 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -160,7 +160,9 @@ func collectGenericProfile(name string, delta *pprofutils.Delta) func(p *profile return func(p *profiler) ([]byte, error) { var extra []*pprofile.Profile // TODO: add type safety for name == "heap" check and remove redunancy with profileType.Name. - if cAlloc, ok := extensions.GetCAllocationProfiler(); ok && p.cfg.deltaProfiles && name == "heap" { + cAlloc, ok := extensions.GetCAllocationProfiler() + switch { + case ok && p.cfg.deltaProfiles && name == "heap": // For the heap profile, we'd also like to include C // allocations if that extension is enabled and have the // allocations show up in the same profile. Collect them @@ -174,6 +176,11 @@ func collectGenericProfile(name string, delta *pprofutils.Delta) func(p *profile if err == nil { extra = append(extra, profile) } + default: + // In all cases, sleep until the end of the profile + // period so that all profiles cover the same period of + // time + p.interruptibleSleep(p.cfg.period) } var buf bytes.Buffer @@ -272,7 +279,10 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData [] return nil, fmt.Errorf("delta prof parse: %v", err) } var deltaData []byte - if prevProf := p.prev[name]; prevProf == nil { + p.mu.Lock() + prevProf := p.prev[name] + p.mu.Unlock() + if prevProf == nil { // First time deltaProfile gets called for a type, there is no prevProf. In // this case we emit the current profile as a delta profile. deltaData = curData @@ -298,7 +308,9 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData [] } // Keep the most recent profiles in memory for future diffing. This needs to // be taken into account when enforcing memory limits going forward. + p.mu.Lock() p.prev[name] = curProf + p.mu.Unlock() return &profile{data: deltaData}, nil } diff --git a/profiler/profile_test.go b/profiler/profile_test.go index cd9d23f86f..a921e1b885 100644 --- a/profiler/profile_test.go +++ b/profiler/profile_test.go @@ -117,6 +117,7 @@ main;bar 0 0 8 16 // followed by prof2 when calling runProfile(). deltaProfiler := func(prof1, prof2 []byte, opts ...Option) (*profiler, func()) { returnProfs := [][]byte{prof1, prof2} + opts = append(opts, WithPeriod(5*time.Millisecond)) p, err := unstartedProfiler(opts...) p.testHooks.lookupProfile = func(_ string, w io.Writer, _ int) error { _, err := w.Write(returnProfs[0]) @@ -189,7 +190,7 @@ main;bar 0 0 8 16 }) t.Run("goroutine", func(t *testing.T) { - p, err := unstartedProfiler() + p, err := unstartedProfiler(WithPeriod(time.Millisecond)) p.testHooks.lookupProfile = func(name string, w io.Writer, _ int) error { _, err := w.Write([]byte(name)) return err diff --git a/profiler/profiler.go b/profiler/profiler.go index d5d6f4bbee..1830b49705 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -63,6 +63,7 @@ func Stop() { // profiler collects and sends preset profiles to the Datadog API at a given frequency // using a given configuration. type profiler struct { + mu sync.Mutex cfg *config // profile configuration out chan batch // upload queue uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests @@ -211,6 +212,12 @@ func (p *profiler) run() { // an item. func (p *profiler) collect(ticker <-chan time.Time) { defer close(p.out) + var ( + // mu guards completed + mu sync.Mutex + completed []*profile + wg sync.WaitGroup + ) for { select { case <-ticker: @@ -225,16 +232,24 @@ func (p *profiler) collect(ticker <-chan time.Time) { end: now.Add(p.cfg.cpuDuration), } + completed = completed[:0] for _, t := range p.enabledProfileTypes() { - profs, err := p.runProfile(t) - if err != nil { - log.Error("Error getting %s profile: %v; skipping.", t, err) - p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1) - continue - } - for _, prof := range profs { - bat.addProfile(prof) - } + wg.Add(1) + go func(t ProfileType) { + defer wg.Done() + profs, err := p.runProfile(t) + if err != nil { + log.Error("Error getting %s profile: %v; skipping.", t, err) + p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1) + } + mu.Lock() + defer mu.Unlock() + completed = append(completed, profs...) + }(t) + } + wg.Wait() + for _, prof := range completed { + bat.addProfile(prof) } p.enqueueUpload(bat) case <-p.exit: diff --git a/profiler/profiler_test.go b/profiler/profiler_test.go index 7573eac4f9..d4a8e993d2 100644 --- a/profiler/profiler_test.go +++ b/profiler/profiler_test.go @@ -213,7 +213,10 @@ func TestStopLatency(t *testing.T) { stopped <- struct{}{} }() - timeout := 20 * time.Millisecond + // CPU profiling polls in 100 millisecond intervals and this can't be + // interrupted by pprof.StopCPUProfile, so we can't guarantee profiling + // will stop faster than that. + timeout := 200 * time.Millisecond select { case <-stopped: case <-time.After(timeout): @@ -227,6 +230,7 @@ func TestStopLatency(t *testing.T) { func TestProfilerInternal(t *testing.T) { t.Run("collect", func(t *testing.T) { p, err := unstartedProfiler( + WithPeriod(1*time.Millisecond), CPUDuration(1*time.Millisecond), WithProfileTypes(HeapProfile, CPUProfile), ) @@ -344,17 +348,31 @@ func TestAllUploaded(t *testing.T) { // // TODO: Further check that the uploaded profiles are all valid var ( - wg sync.WaitGroup profiles []string ) + // received indicates that the server has received a profile upload. + // This is used similarly to a sync.WaitGroup but avoids a potential + // panic if too many requests are received before profiling is stopped + // and the WaitGroup count goes negative. + // + // The channel is buffered with 2 entries so we can check that the + // second batch of profiles is correct in case the profiler gets in a + // bad state after the first round of profiling. + received := make(chan struct{}, 2) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - defer wg.Done() + defer func() { + select { + case received <- struct{}{}: + default: + } + }() _, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { t.Fatalf("bad media type: %s", err) return } mr := multipart.NewReader(r.Body, params["boundary"]) + profiles = profiles[:0] for { p, err := mr.NextPart() if err == io.EOF { @@ -369,7 +387,6 @@ func TestAllUploaded(t *testing.T) { } })) defer server.Close() - wg.Add(1) // re-implemented testing.T.Setenv since that function requires Go 1.17 old, ok := os.LookupEnv("DD_PROFILING_WAIT_PROFILE") @@ -392,7 +409,8 @@ func TestAllUploaded(t *testing.T) { CPUDuration(1*time.Millisecond), ) defer Stop() - wg.Wait() + <-received + <-received expected := []string{ "data[cpu.pprof]",