Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

profiler: collect profiles concurrently #1282

Merged
merged 3 commits into from May 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion profiler/profile.go
Expand Up @@ -272,7 +272,10 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []
return nil, fmt.Errorf("delta prof parse: %v", err)
}
var deltaData []byte
if prevProf := p.prev[name]; prevProf == nil {
p.mu.Lock()
prevProf := p.prev[name]
p.mu.Unlock()
if prevProf == nil {
// First time deltaProfile gets called for a type, there is no prevProf. In
// this case we emit the current profile as a delta profile.
deltaData = curData
Expand All @@ -298,7 +301,9 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []
}
// Keep the most recent profiles in memory for future diffing. This needs to
// be taken into account when enforcing memory limits going forward.
p.mu.Lock()
p.prev[name] = curProf
p.mu.Unlock()
return &profile{data: deltaData}, nil
}

Expand Down
32 changes: 23 additions & 9 deletions profiler/profiler.go
Expand Up @@ -63,6 +63,7 @@ func Stop() {
// profiler collects and sends preset profiles to the Datadog API at a given frequency
// using a given configuration.
type profiler struct {
mu sync.Mutex
cfg *config // profile configuration
out chan batch // upload queue
uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests
Expand Down Expand Up @@ -211,6 +212,12 @@ func (p *profiler) run() {
// an item.
func (p *profiler) collect(ticker <-chan time.Time) {
defer close(p.out)
var (
// mu guards completed
mu sync.Mutex
completed []*profile
nsrip-dd marked this conversation as resolved.
Show resolved Hide resolved
wg sync.WaitGroup
)
for {
select {
case <-ticker:
Expand All @@ -226,15 +233,22 @@ func (p *profiler) collect(ticker <-chan time.Time) {
}

for _, t := range p.enabledProfileTypes() {
profs, err := p.runProfile(t)
if err != nil {
log.Error("Error getting %s profile: %v; skipping.", t, err)
p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1)
continue
}
for _, prof := range profs {
bat.addProfile(prof)
}
wg.Add(1)
go func(t ProfileType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT/🐞: enabledProfileTypes() goes out of its way to return the profiles in a useful order (see comment). Collecting the profiles concurrently essentially circumvents that.

Idea for quick-fix (from Nick): Put interruptibleSleep(profilingPeriod) in all the non-blocking profile types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, and I had to change a few of the tests since they would now block for the default profiling period (1 minute!)

defer wg.Done()
profs, err := p.runProfile(t)
if err != nil {
log.Error("Error getting %s profile: %v; skipping.", t, err)
p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1)
}
mu.Lock()
defer mu.Unlock()
completed = append(completed, profs...)
nsrip-dd marked this conversation as resolved.
Show resolved Hide resolved
}(t)
}
wg.Wait()
for _, prof := range completed {
bat.addProfile(prof)
}
p.enqueueUpload(bat)
case <-p.exit:
Expand Down