Skip to content

Commit

Permalink
Make the Go 1.17 collector thread-safe
Browse files Browse the repository at this point in the history
Currently the Go 1.17 collector does not protect access to rmSampleBuf,
and Collect may be invoked concurrently. This change adds a mutex around
all uses of rmSampleBuf and pulls out operations that may block on
channels, so concurrent calls to Collect can at least pipeline those two
operations.

Signed-off-by: Michael Anthony Knyszek <mknyszek@google.com>
  • Loading branch information
mknyszek committed Jan 21, 2022
1 parent 0108796 commit aba7051
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 7 deletions.
8 changes: 8 additions & 0 deletions prometheus/collector.go
Expand Up @@ -118,3 +118,11 @@ func (c *selfCollector) Describe(ch chan<- *Desc) {
func (c *selfCollector) Collect(ch chan<- Metric) {
ch <- c.self
}

// collectorMetric is a metric that is also a collector.
// Because of selfCollector, most (if not all) Metrics in
// this package are also collectors.
type collectorMetric interface {
Metric
Collector
}
32 changes: 25 additions & 7 deletions prometheus/go_collector_go117.go
Expand Up @@ -32,9 +32,10 @@ type goCollector struct {
base baseGoCollector

// rm... fields all pertain to the runtime/metrics package.
rmSampleMu sync.Mutex
rmSampleBuf []metrics.Sample
rmSampleMap map[string]*metrics.Sample
rmMetrics []Metric
rmMetrics []collectorMetric

// With Go 1.17, the runtime/metrics package was introduced.
// From that point on, metric names produced by the runtime/metrics
Expand All @@ -58,7 +59,7 @@ func NewGoCollector() Collector {
}

// Generate a Desc and ValueType for each runtime/metrics metric.
metricSet := make([]Metric, 0, len(descriptions))
metricSet := make([]collectorMetric, 0, len(descriptions))
sampleBuf := make([]metrics.Sample, 0, len(descriptions))
sampleMap := make(map[string]*metrics.Sample, len(descriptions))
for i := range descriptions {
Expand All @@ -76,7 +77,7 @@ func NewGoCollector() Collector {
sampleBuf = append(sampleBuf, metrics.Sample{Name: d.Name})
sampleMap[d.Name] = &sampleBuf[len(sampleBuf)-1]

var m Metric
var m collectorMetric
if d.Kind == metrics.KindFloat64Histogram {
_, hasSum := rmExactSumMap[d.Name]
m = newBatchHistogram(
Expand Down Expand Up @@ -130,9 +131,19 @@ func (c *goCollector) Collect(ch chan<- Metric) {
// Collect base non-memory metrics.
c.base.Collect(ch)

// Collect must be thread-safe, so prevent concurrent use of
// rmSampleBuf. Just read into rmSampleBuf but write all the data
// we get into our Metrics or MemStats.
//
// Note that we cannot simply read and then clone rmSampleBuf
// because we'd need to perform a deep clone of it, which is likely
// not worth it.
c.rmSampleMu.Lock()

// Populate runtime/metrics sample buffer.
metrics.Read(c.rmSampleBuf)

// Update all our metrics from rmSampleBuf.
for i, sample := range c.rmSampleBuf {
// N.B. switch on concrete type because it's significantly more efficient
// than checking for the Counter and Gauge interface implementations. In
Expand All @@ -146,22 +157,29 @@ func (c *goCollector) Collect(ch chan<- Metric) {
if v1 > v0 {
m.Add(unwrapScalarRMValue(sample.Value) - m.get())
}
m.Collect(ch)
case *gauge:
m.Set(unwrapScalarRMValue(sample.Value))
m.Collect(ch)
case *batchHistogram:
m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name))
m.Collect(ch)
default:
panic("unexpected metric type")
}
}

// ms is a dummy MemStats that we populate ourselves so that we can
// populate the old metrics from it.
var ms runtime.MemStats
memStatsFromRM(&ms, c.rmSampleMap)

c.rmSampleMu.Unlock()

// Export all the metrics to ch.
// At this point we must not access rmSampleBuf or rmSampleMap, because
// a concurrent caller could use it. It's safe to Collect all our Metrics,
// however, because they're updated in a thread-safe way while MemStats
// is local to this call of Collect.
for _, m := range c.rmMetrics {
m.Collect(ch)
}
for _, i := range c.msMetrics {
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms))
}
Expand Down
22 changes: 22 additions & 0 deletions prometheus/go_collector_go117_test.go
Expand Up @@ -280,3 +280,25 @@ func TestExpectedRuntimeMetrics(t *testing.T) {
t.Log("where X is the Go version you are currently using")
}
}

func TestGoCollectorConcurrency(t *testing.T) {
c := NewGoCollector().(*goCollector)

// Set up multiple goroutines to Collect from the
// same GoCollector. In race mode with GOMAXPROCS > 1,
// this test should fail often if Collect is not
// concurrent-safe.
for i := 0; i < 4; i++ {
go func() {
ch := make(chan Metric)
go func() {
// Drain all metrics recieved until the
// channel is closed.
for range ch {
}
}()
c.Collect(ch)
close(ch)
}()
}
}

0 comments on commit aba7051

Please sign in to comment.