Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the Go 1.17 collector thread-safe #969

Merged
merged 1 commit into from Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions prometheus/collector.go
Expand Up @@ -118,3 +118,11 @@ func (c *selfCollector) Describe(ch chan<- *Desc) {
func (c *selfCollector) Collect(ch chan<- Metric) {
ch <- c.self
}

// collectorMetric is a metric that is also a collector.
// Because of selfCollector, most (if not all) Metrics in
// this package are also collectors.
type collectorMetric interface {
Metric
Collector
}
32 changes: 25 additions & 7 deletions prometheus/go_collector_go117.go
Expand Up @@ -32,9 +32,10 @@ type goCollector struct {
base baseGoCollector

// rm... fields all pertain to the runtime/metrics package.
rmSampleMu sync.Mutex
rmSampleBuf []metrics.Sample
rmSampleMap map[string]*metrics.Sample
rmMetrics []Metric
rmMetrics []collectorMetric

// With Go 1.17, the runtime/metrics package was introduced.
// From that point on, metric names produced by the runtime/metrics
Expand All @@ -58,7 +59,7 @@ func NewGoCollector() Collector {
}

// Generate a Desc and ValueType for each runtime/metrics metric.
metricSet := make([]Metric, 0, len(descriptions))
metricSet := make([]collectorMetric, 0, len(descriptions))
sampleBuf := make([]metrics.Sample, 0, len(descriptions))
sampleMap := make(map[string]*metrics.Sample, len(descriptions))
for i := range descriptions {
Expand All @@ -76,7 +77,7 @@ func NewGoCollector() Collector {
sampleBuf = append(sampleBuf, metrics.Sample{Name: d.Name})
sampleMap[d.Name] = &sampleBuf[len(sampleBuf)-1]

var m Metric
var m collectorMetric
if d.Kind == metrics.KindFloat64Histogram {
_, hasSum := rmExactSumMap[d.Name]
m = newBatchHistogram(
Expand Down Expand Up @@ -130,9 +131,19 @@ func (c *goCollector) Collect(ch chan<- Metric) {
// Collect base non-memory metrics.
c.base.Collect(ch)

// Collect must be thread-safe, so prevent concurrent use of
// rmSampleBuf. Just read into rmSampleBuf but write all the data
// we get into our Metrics or MemStats.
//
// Note that we cannot simply read and then clone rmSampleBuf
// because we'd need to perform a deep clone of it, which is likely
// not worth it.
c.rmSampleMu.Lock()

// Populate runtime/metrics sample buffer.
metrics.Read(c.rmSampleBuf)

// Update all our metrics from rmSampleBuf.
for i, sample := range c.rmSampleBuf {
// N.B. switch on concrete type because it's significantly more efficient
// than checking for the Counter and Gauge interface implementations. In
Expand All @@ -146,22 +157,29 @@ func (c *goCollector) Collect(ch chan<- Metric) {
if v1 > v0 {
m.Add(unwrapScalarRMValue(sample.Value) - m.get())
}
m.Collect(ch)
case *gauge:
m.Set(unwrapScalarRMValue(sample.Value))
m.Collect(ch)
case *batchHistogram:
m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name))
m.Collect(ch)
default:
panic("unexpected metric type")
}
}

// ms is a dummy MemStats that we populate ourselves so that we can
// populate the old metrics from it.
var ms runtime.MemStats
memStatsFromRM(&ms, c.rmSampleMap)

c.rmSampleMu.Unlock()

// Export all the metrics to ch.
// At this point we must not access rmSampleBuf or rmSampleMap, because
// a concurrent caller could use it. It's safe to Collect all our Metrics,
// however, because they're updated in a thread-safe way while MemStats
// is local to this call of Collect.
mknyszek marked this conversation as resolved.
Show resolved Hide resolved
for _, m := range c.rmMetrics {
m.Collect(ch)
}
for _, i := range c.msMetrics {
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms))
}
Expand Down
22 changes: 22 additions & 0 deletions prometheus/go_collector_go117_test.go
Expand Up @@ -280,3 +280,25 @@ func TestExpectedRuntimeMetrics(t *testing.T) {
t.Log("where X is the Go version you are currently using")
}
}

func TestGoCollectorConcurrency(t *testing.T) {
c := NewGoCollector().(*goCollector)

// Set up multiple goroutines to Collect from the
// same GoCollector. In race mode with GOMAXPROCS > 1,
// this test should fail often if Collect is not
// concurrent-safe.
for i := 0; i < 4; i++ {
go func() {
ch := make(chan Metric)
go func() {
// Drain all metrics recieved until the
mknyszek marked this conversation as resolved.
Show resolved Hide resolved
// channel is closed.
for range ch {
}
}()
c.Collect(ch)
close(ch)
}()
}
}