diff --git a/profiler/profile.go b/profiler/profile.go index 5c6eaa1cf1..1e7ca048e0 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -160,7 +160,9 @@ func collectGenericProfile(name string, delta *pprofutils.Delta) func(p *profile return func(p *profiler) ([]byte, error) { var extra []*pprofile.Profile // TODO: add type safety for name == "heap" check and remove redunancy with profileType.Name. - if cAlloc, ok := extensions.GetCAllocationProfiler(); ok && p.cfg.deltaProfiles && name == "heap" { + cAlloc, ok := extensions.GetCAllocationProfiler() + switch { + case ok && p.cfg.deltaProfiles && name == "heap": // For the heap profile, we'd also like to include C // allocations if that extension is enabled and have the // allocations show up in the same profile. Collect them @@ -174,6 +176,11 @@ func collectGenericProfile(name string, delta *pprofutils.Delta) func(p *profile if err == nil { extra = append(extra, profile) } + default: + // In all cases, sleep until the end of the profile + // period so that all profiles cover the same period of + // time + p.interruptibleSleep(p.cfg.period) } var buf bytes.Buffer @@ -272,7 +279,10 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData [] return nil, fmt.Errorf("delta prof parse: %v", err) } var deltaData []byte - if prevProf := p.prev[name]; prevProf == nil { + p.mu.Lock() + prevProf := p.prev[name] + p.mu.Unlock() + if prevProf == nil { // First time deltaProfile gets called for a type, there is no prevProf. In // this case we emit the current profile as a delta profile. deltaData = curData @@ -298,7 +308,9 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData [] } // Keep the most recent profiles in memory for future diffing. This needs to // be taken into account when enforcing memory limits going forward. + p.mu.Lock() p.prev[name] = curProf + p.mu.Unlock() return &profile{data: deltaData}, nil } diff --git a/profiler/profile_test.go b/profiler/profile_test.go index cd9d23f86f..a921e1b885 100644 --- a/profiler/profile_test.go +++ b/profiler/profile_test.go @@ -117,6 +117,7 @@ main;bar 0 0 8 16 // followed by prof2 when calling runProfile(). deltaProfiler := func(prof1, prof2 []byte, opts ...Option) (*profiler, func()) { returnProfs := [][]byte{prof1, prof2} + opts = append(opts, WithPeriod(5*time.Millisecond)) p, err := unstartedProfiler(opts...) p.testHooks.lookupProfile = func(_ string, w io.Writer, _ int) error { _, err := w.Write(returnProfs[0]) @@ -189,7 +190,7 @@ main;bar 0 0 8 16 }) t.Run("goroutine", func(t *testing.T) { - p, err := unstartedProfiler() + p, err := unstartedProfiler(WithPeriod(time.Millisecond)) p.testHooks.lookupProfile = func(name string, w io.Writer, _ int) error { _, err := w.Write([]byte(name)) return err diff --git a/profiler/profiler.go b/profiler/profiler.go index d5d6f4bbee..1830b49705 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -63,6 +63,7 @@ func Stop() { // profiler collects and sends preset profiles to the Datadog API at a given frequency // using a given configuration. type profiler struct { + mu sync.Mutex cfg *config // profile configuration out chan batch // upload queue uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests @@ -211,6 +212,12 @@ func (p *profiler) run() { // an item. func (p *profiler) collect(ticker <-chan time.Time) { defer close(p.out) + var ( + // mu guards completed + mu sync.Mutex + completed []*profile + wg sync.WaitGroup + ) for { select { case <-ticker: @@ -225,16 +232,24 @@ func (p *profiler) collect(ticker <-chan time.Time) { end: now.Add(p.cfg.cpuDuration), } + completed = completed[:0] for _, t := range p.enabledProfileTypes() { - profs, err := p.runProfile(t) - if err != nil { - log.Error("Error getting %s profile: %v; skipping.", t, err) - p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1) - continue - } - for _, prof := range profs { - bat.addProfile(prof) - } + wg.Add(1) + go func(t ProfileType) { + defer wg.Done() + profs, err := p.runProfile(t) + if err != nil { + log.Error("Error getting %s profile: %v; skipping.", t, err) + p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1) + } + mu.Lock() + defer mu.Unlock() + completed = append(completed, profs...) + }(t) + } + wg.Wait() + for _, prof := range completed { + bat.addProfile(prof) } p.enqueueUpload(bat) case <-p.exit: diff --git a/profiler/profiler_test.go b/profiler/profiler_test.go index 7573eac4f9..d4a8e993d2 100644 --- a/profiler/profiler_test.go +++ b/profiler/profiler_test.go @@ -213,7 +213,10 @@ func TestStopLatency(t *testing.T) { stopped <- struct{}{} }() - timeout := 20 * time.Millisecond + // CPU profiling polls in 100 millisecond intervals and this can't be + // interrupted by pprof.StopCPUProfile, so we can't guarantee profiling + // will stop faster than that. + timeout := 200 * time.Millisecond select { case <-stopped: case <-time.After(timeout): @@ -227,6 +230,7 @@ func TestStopLatency(t *testing.T) { func TestProfilerInternal(t *testing.T) { t.Run("collect", func(t *testing.T) { p, err := unstartedProfiler( + WithPeriod(1*time.Millisecond), CPUDuration(1*time.Millisecond), WithProfileTypes(HeapProfile, CPUProfile), ) @@ -344,17 +348,31 @@ func TestAllUploaded(t *testing.T) { // // TODO: Further check that the uploaded profiles are all valid var ( - wg sync.WaitGroup profiles []string ) + // received indicates that the server has received a profile upload. + // This is used similarly to a sync.WaitGroup but avoids a potential + // panic if too many requests are received before profiling is stopped + // and the WaitGroup count goes negative. + // + // The channel is buffered with 2 entries so we can check that the + // second batch of profiles is correct in case the profiler gets in a + // bad state after the first round of profiling. + received := make(chan struct{}, 2) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - defer wg.Done() + defer func() { + select { + case received <- struct{}{}: + default: + } + }() _, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { t.Fatalf("bad media type: %s", err) return } mr := multipart.NewReader(r.Body, params["boundary"]) + profiles = profiles[:0] for { p, err := mr.NextPart() if err == io.EOF { @@ -369,7 +387,6 @@ func TestAllUploaded(t *testing.T) { } })) defer server.Close() - wg.Add(1) // re-implemented testing.T.Setenv since that function requires Go 1.17 old, ok := os.LookupEnv("DD_PROFILING_WAIT_PROFILE") @@ -392,7 +409,8 @@ func TestAllUploaded(t *testing.T) { CPUDuration(1*time.Millisecond), ) defer Stop() - wg.Wait() + <-received + <-received expected := []string{ "data[cpu.pprof]",