Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use simpler locking in the Go 1.17 collector #975

Merged
merged 1 commit into from Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 17 additions & 16 deletions prometheus/go_collector_go117.go
Expand Up @@ -31,8 +31,11 @@ import (
type goCollector struct {
base baseGoCollector

// mu protects updates to all fields ensuring a consistent
// snapshot is always produced by Collect.
mu sync.Mutex

// rm... fields all pertain to the runtime/metrics package.
rmSampleMu sync.Mutex
rmSampleBuf []metrics.Sample
rmSampleMap map[string]*metrics.Sample
rmMetrics []collectorMetric
Expand Down Expand Up @@ -135,10 +138,16 @@ func (c *goCollector) Collect(ch chan<- Metric) {
// rmSampleBuf. Just read into rmSampleBuf but write all the data
// we get into our Metrics or MemStats.
//
// Note that we cannot simply read and then clone rmSampleBuf
// because we'd need to perform a deep clone of it, which is likely
// not worth it.
c.rmSampleMu.Lock()
// This lock also ensures that the Metrics we send out are all from
// the same updates, ensuring their mutual consistency insofar as
// is guaranteed by the runtime/metrics package.
//
// N.B. This locking is heavy-handed, but Collect is expected to be called
// relatively infrequently. Also the core operation here, metrics.Read,
// is fast (O(tens of microseconds)) so contention should certainly be
// low, though channel operations and any allocations may add to that.
c.mu.Lock()
defer c.mu.Unlock()

// Populate runtime/metrics sample buffer.
metrics.Read(c.rmSampleBuf)
Expand All @@ -157,10 +166,13 @@ func (c *goCollector) Collect(ch chan<- Metric) {
if v1 > v0 {
m.Add(unwrapScalarRMValue(sample.Value) - m.get())
}
m.Collect(ch)
case *gauge:
m.Set(unwrapScalarRMValue(sample.Value))
m.Collect(ch)
case *batchHistogram:
m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name))
m.Collect(ch)
default:
panic("unexpected metric type")
}
Expand All @@ -169,17 +181,6 @@ func (c *goCollector) Collect(ch chan<- Metric) {
// populate the old metrics from it.
var ms runtime.MemStats
memStatsFromRM(&ms, c.rmSampleMap)

c.rmSampleMu.Unlock()

// Export all the metrics to ch.
// At this point we must not access rmSampleBuf or rmSampleMap, because
// a concurrent caller could use it. It's safe to Collect all our Metrics,
// however, because they're updated in a thread-safe way while MemStats
// is local to this call of Collect.
for _, m := range c.rmMetrics {
m.Collect(ch)
}
for _, i := range c.msMetrics {
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms))
}
Expand Down
2 changes: 1 addition & 1 deletion prometheus/go_collector_go117_test.go
Expand Up @@ -292,7 +292,7 @@ func TestGoCollectorConcurrency(t *testing.T) {
go func() {
ch := make(chan Metric)
go func() {
// Drain all metrics recieved until the
// Drain all metrics received until the
// channel is closed.
for range ch {
}
Expand Down
17 changes: 17 additions & 0 deletions prometheus/go_collector_test.go
Expand Up @@ -154,3 +154,20 @@ func TestGoCollectorGC(t *testing.T) {
break
}
}

func BenchmarkGoCollector(b *testing.B) {
c := NewGoCollector().(*goCollector)

b.ResetTimer()
for i := 0; i < b.N; i++ {
ch := make(chan Metric, 8)
go func() {
// Drain all metrics received until the
// channel is closed.
for range ch {
}
}()
c.Collect(ch)
close(ch)
}
}