From f961078d449132c65af66eed37800e8df87694ab Mon Sep 17 00:00:00 2001 From: Nick Ripley Date: Wed, 4 May 2022 14:14:06 -0400 Subject: [PATCH 1/2] profiler: collect profiles concurrently The upcoming C allocation profile will work by starting a profile, waiting for the profile period to elapse, then stopping and collecting the output. This won't work if the profiles are collected synchronously since the CPU profile is also collected this way, meaning there won't be enough time in a single profile period to collect both. So the profiles need to be collected concurrently. --- profiler/profile.go | 7 ++++++- profiler/profiler.go | 32 +++++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/profiler/profile.go b/profiler/profile.go index 5c6eaa1cf1..403e47d225 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -272,7 +272,10 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData [] return nil, fmt.Errorf("delta prof parse: %v", err) } var deltaData []byte - if prevProf := p.prev[name]; prevProf == nil { + p.mu.Lock() + prevProf := p.prev[name] + p.mu.Unlock() + if prevProf == nil { // First time deltaProfile gets called for a type, there is no prevProf. In // this case we emit the current profile as a delta profile. deltaData = curData @@ -298,7 +301,9 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData [] } // Keep the most recent profiles in memory for future diffing. This needs to // be taken into account when enforcing memory limits going forward. + p.mu.Lock() p.prev[name] = curProf + p.mu.Unlock() return &profile{data: deltaData}, nil } diff --git a/profiler/profiler.go b/profiler/profiler.go index d5d6f4bbee..0f9135ec5c 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -63,6 +63,7 @@ func Stop() { // profiler collects and sends preset profiles to the Datadog API at a given frequency // using a given configuration. type profiler struct { + mu sync.Mutex cfg *config // profile configuration out chan batch // upload queue uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests @@ -211,6 +212,12 @@ func (p *profiler) run() { // an item. func (p *profiler) collect(ticker <-chan time.Time) { defer close(p.out) + var ( + // mu guards completed + mu sync.Mutex + completed []*profile + wg sync.WaitGroup + ) for { select { case <-ticker: @@ -226,15 +233,22 @@ func (p *profiler) collect(ticker <-chan time.Time) { } for _, t := range p.enabledProfileTypes() { - profs, err := p.runProfile(t) - if err != nil { - log.Error("Error getting %s profile: %v; skipping.", t, err) - p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1) - continue - } - for _, prof := range profs { - bat.addProfile(prof) - } + wg.Add(1) + go func(t ProfileType) { + defer wg.Done() + profs, err := p.runProfile(t) + if err != nil { + log.Error("Error getting %s profile: %v; skipping.", t, err) + p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1) + } + mu.Lock() + defer mu.Unlock() + completed = append(completed, profs...) + }(t) + } + wg.Wait() + for _, prof := range completed { + bat.addProfile(prof) } p.enqueueUpload(bat) case <-p.exit: From 7b6ceae920a03cce7661a654510c0096f1c0101f Mon Sep 17 00:00:00 2001 From: Nick Ripley Date: Thu, 12 May 2022 09:21:31 -0400 Subject: [PATCH 2/2] profiler: make all profiles blocking With concurrent profile collection, all profiles should block so that they're collected at the end of the profile period. This preserves the property that all profiles in a batch cover the same time, so events in one profile correspond to events in another. Along the way, a few tests needed to be fixed to not take 1 minute (the default profling period) to complete. There was also a potential panic in TestAllUploaded if the sync.WaitGroup count goes negative due to multiple uploads going through before profiling is stopped. Also fixed unbounded growth of the collected profiles slice. Whoops! Made TestAllUploaded check the *second* batch of profiles to catch this bug. --- profiler/profile.go | 9 ++++++++- profiler/profile_test.go | 3 ++- profiler/profiler.go | 1 + profiler/profiler_test.go | 28 +++++++++++++++++++++++----- 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/profiler/profile.go b/profiler/profile.go index 403e47d225..1e7ca048e0 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -160,7 +160,9 @@ func collectGenericProfile(name string, delta *pprofutils.Delta) func(p *profile return func(p *profiler) ([]byte, error) { var extra []*pprofile.Profile // TODO: add type safety for name == "heap" check and remove redunancy with profileType.Name. - if cAlloc, ok := extensions.GetCAllocationProfiler(); ok && p.cfg.deltaProfiles && name == "heap" { + cAlloc, ok := extensions.GetCAllocationProfiler() + switch { + case ok && p.cfg.deltaProfiles && name == "heap": // For the heap profile, we'd also like to include C // allocations if that extension is enabled and have the // allocations show up in the same profile. Collect them @@ -174,6 +176,11 @@ func collectGenericProfile(name string, delta *pprofutils.Delta) func(p *profile if err == nil { extra = append(extra, profile) } + default: + // In all cases, sleep until the end of the profile + // period so that all profiles cover the same period of + // time + p.interruptibleSleep(p.cfg.period) } var buf bytes.Buffer diff --git a/profiler/profile_test.go b/profiler/profile_test.go index cd9d23f86f..a921e1b885 100644 --- a/profiler/profile_test.go +++ b/profiler/profile_test.go @@ -117,6 +117,7 @@ main;bar 0 0 8 16 // followed by prof2 when calling runProfile(). deltaProfiler := func(prof1, prof2 []byte, opts ...Option) (*profiler, func()) { returnProfs := [][]byte{prof1, prof2} + opts = append(opts, WithPeriod(5*time.Millisecond)) p, err := unstartedProfiler(opts...) p.testHooks.lookupProfile = func(_ string, w io.Writer, _ int) error { _, err := w.Write(returnProfs[0]) @@ -189,7 +190,7 @@ main;bar 0 0 8 16 }) t.Run("goroutine", func(t *testing.T) { - p, err := unstartedProfiler() + p, err := unstartedProfiler(WithPeriod(time.Millisecond)) p.testHooks.lookupProfile = func(name string, w io.Writer, _ int) error { _, err := w.Write([]byte(name)) return err diff --git a/profiler/profiler.go b/profiler/profiler.go index 0f9135ec5c..1830b49705 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -232,6 +232,7 @@ func (p *profiler) collect(ticker <-chan time.Time) { end: now.Add(p.cfg.cpuDuration), } + completed = completed[:0] for _, t := range p.enabledProfileTypes() { wg.Add(1) go func(t ProfileType) { diff --git a/profiler/profiler_test.go b/profiler/profiler_test.go index 7573eac4f9..d4a8e993d2 100644 --- a/profiler/profiler_test.go +++ b/profiler/profiler_test.go @@ -213,7 +213,10 @@ func TestStopLatency(t *testing.T) { stopped <- struct{}{} }() - timeout := 20 * time.Millisecond + // CPU profiling polls in 100 millisecond intervals and this can't be + // interrupted by pprof.StopCPUProfile, so we can't guarantee profiling + // will stop faster than that. + timeout := 200 * time.Millisecond select { case <-stopped: case <-time.After(timeout): @@ -227,6 +230,7 @@ func TestStopLatency(t *testing.T) { func TestProfilerInternal(t *testing.T) { t.Run("collect", func(t *testing.T) { p, err := unstartedProfiler( + WithPeriod(1*time.Millisecond), CPUDuration(1*time.Millisecond), WithProfileTypes(HeapProfile, CPUProfile), ) @@ -344,17 +348,31 @@ func TestAllUploaded(t *testing.T) { // // TODO: Further check that the uploaded profiles are all valid var ( - wg sync.WaitGroup profiles []string ) + // received indicates that the server has received a profile upload. + // This is used similarly to a sync.WaitGroup but avoids a potential + // panic if too many requests are received before profiling is stopped + // and the WaitGroup count goes negative. + // + // The channel is buffered with 2 entries so we can check that the + // second batch of profiles is correct in case the profiler gets in a + // bad state after the first round of profiling. + received := make(chan struct{}, 2) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - defer wg.Done() + defer func() { + select { + case received <- struct{}{}: + default: + } + }() _, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { t.Fatalf("bad media type: %s", err) return } mr := multipart.NewReader(r.Body, params["boundary"]) + profiles = profiles[:0] for { p, err := mr.NextPart() if err == io.EOF { @@ -369,7 +387,6 @@ func TestAllUploaded(t *testing.T) { } })) defer server.Close() - wg.Add(1) // re-implemented testing.T.Setenv since that function requires Go 1.17 old, ok := os.LookupEnv("DD_PROFILING_WAIT_PROFILE") @@ -392,7 +409,8 @@ func TestAllUploaded(t *testing.T) { CPUDuration(1*time.Millisecond), ) defer Stop() - wg.Wait() + <-received + <-received expected := []string{ "data[cpu.pprof]",