Skip to content

Commit

Permalink
Make stats collector faster
Browse files Browse the repository at this point in the history
- Make it parallel.
- Sleep is not needed in a normal loop since a condition variable is used.
  Only sleep when error happens at system level, then retry in next loop
- Move `bufReader` out of `Collector`, since it cannot be shared by multiple
  goroutines.

Signed-off-by: Xinfeng Liu <XinfengLiu@icloud.com>
  • Loading branch information
xinfengliu committed Sep 11, 2023
1 parent 6ce5aa1 commit 62759c2
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 39 deletions.
98 changes: 62 additions & 36 deletions daemon/stats/collector.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package stats // import "github.com/docker/docker/daemon/stats"

import (
"bufio"
"context"
"sync"
"time"
Expand All @@ -11,6 +10,7 @@ import (
"github.com/docker/docker/container"
"github.com/docker/docker/errdefs"
"github.com/moby/pubsub"
"golang.org/x/sync/errgroup"
)

// Collector manages and provides container resource stats
Expand All @@ -20,7 +20,6 @@ type Collector struct {
supervisor supervisor
interval time.Duration
publishers map[*container.Container]*pubsub.Publisher
bufReader *bufio.Reader
}

// NewCollector creates a stats collector that will poll the supervisor with the specified interval
Expand All @@ -29,7 +28,6 @@ func NewCollector(supervisor supervisor, interval time.Duration) *Collector {
interval: interval,
supervisor: supervisor,
publishers: make(map[*container.Container]*pubsub.Publisher),
bufReader: bufio.NewReaderSize(nil, 128),
}
s.cond = sync.NewCond(&s.m)
return s
Expand Down Expand Up @@ -111,44 +109,72 @@ func (s *Collector) Run() {
onlineCPUs, err := s.getNumberOnlineCPUs()
if err != nil {
log.G(context.TODO()).Errorf("collecting system online cpu count: %v", err)
// prevent CPU burning in case of something wrong at system level
time.Sleep(s.interval)
continue
}

for _, pair := range pairs {
stats, err := s.supervisor.GetContainerStats(pair.container)

switch err.(type) {
case nil:
// Sample system CPU usage close to container usage to avoid
// noise in metric calculations.
systemUsage, err := s.getSystemCPUUsage()
if err != nil {
log.G(context.TODO()).WithError(err).WithField("container_id", pair.container.ID).Errorf("collecting system cpu usage")
continue
}
g, ctx := errgroup.WithContext(context.Background())
pairCh := make(chan publishersPair)

// FIXME: move to containerd on Linux (not Windows)
stats.CPUStats.SystemUsage = systemUsage
stats.CPUStats.OnlineCPUs = onlineCPUs

pair.publisher.Publish(*stats)

case errdefs.ErrConflict, errdefs.ErrNotFound:
// publish empty stats containing only name and ID if not running or not found
pair.publisher.Publish(types.StatsJSON{
Name: pair.container.Name,
ID: pair.container.ID,
})

default:
log.G(context.TODO()).Errorf("collecting stats for %s: %v", pair.container.ID, err)
pair.publisher.Publish(types.StatsJSON{
Name: pair.container.Name,
ID: pair.container.ID,
})
g.Go(func() error {
defer close(pairCh)
for _, pair := range pairs {
select {
case pairCh <- pair:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
for i := 0; i < int(onlineCPUs); i++ {
g.Go(func() error {
for pair := range pairCh {
stats, err := s.supervisor.GetContainerStats(pair.container)

switch err.(type) {
case nil:
// Sample system CPU usage close to container usage to avoid
// noise in metric calculations.
systemUsage, err := s.getSystemCPUUsage()
if err != nil {
log.G(context.TODO()).WithError(err).WithField("container_id", pair.container.ID).Errorf("collecting system cpu usage")
return err
}

// FIXME: move to containerd on Linux (not Windows)
stats.CPUStats.SystemUsage = systemUsage
stats.CPUStats.OnlineCPUs = onlineCPUs

pair.publisher.Publish(*stats)

case errdefs.ErrConflict, errdefs.ErrNotFound:
// publish empty stats containing only name and ID if not running or not found
pair.publisher.Publish(types.StatsJSON{
Name: pair.container.Name,
ID: pair.container.ID,
})

default:
log.G(context.TODO()).Errorf("collecting stats for %s: %v", pair.container.ID, err)
pair.publisher.Publish(types.StatsJSON{
Name: pair.container.Name,
ID: pair.container.ID,
})
}

select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
return nil
})
}
if err := g.Wait(); err != nil {
time.Sleep(s.interval)
}

time.Sleep(s.interval)
}
}
8 changes: 5 additions & 3 deletions daemon/stats/collector_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package stats // import "github.com/docker/docker/daemon/stats"

import (
"bufio"
"fmt"
"os"
"strconv"
Expand All @@ -29,18 +30,19 @@ const (
// provided. See `man 5 proc` for details on specific field
// information.
func (s *Collector) getSystemCPUUsage() (uint64, error) {
bufReader := bufio.NewReaderSize(nil, 128)
f, err := os.Open("/proc/stat")
if err != nil {
return 0, err
}
defer func() {
s.bufReader.Reset(nil)
bufReader.Reset(nil)
f.Close()
}()
s.bufReader.Reset(f)
bufReader.Reset(f)

for {
line, err := s.bufReader.ReadString('\n')
line, err := bufReader.ReadString('\n')
if err != nil {
break
}
Expand Down

0 comments on commit 62759c2

Please sign in to comment.