Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Shutdown to Metrics API #132

Merged
merged 3 commits into from Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions circonus/circonus.go
Expand Up @@ -97,6 +97,15 @@ func (s *CirconusSink) AddSampleWithLabels(key []string, val float32, labels []m
s.metrics.RecordValue(flatKey, float64(val))
}

// Shutdown blocks while flushing metrics to the backend.
func (s *CirconusSink) Shutdown() {
// The version of circonus metrics in go.mod (v2.3.1), and the current
// version (v3.4.6) do not support a shutdown operation. Instead we call
// Flush which blocks until metrics are submitted to storage, and then exit
// as the README examples do.
s.metrics.Flush()
}

// Flattens key to Circonus metric name
func (s *CirconusSink) flattenKey(parts []string) string {
joined := strings.Join(parts, "`")
Expand Down
7 changes: 7 additions & 0 deletions circonus/circonus_test.go
Expand Up @@ -8,6 +8,8 @@ import (
"net/http/httptest"
"strings"
"testing"

"github.com/armon/go-metrics"
)

func TestNewCirconusSink(t *testing.T) {
Expand Down Expand Up @@ -152,3 +154,8 @@ func TestAddSample(t *testing.T) {

}
}

func TestMetricSinkInterface(t *testing.T) {
var cs *CirconusSink
_ = metrics.MetricSink(cs)
}
5 changes: 5 additions & 0 deletions datadog/dogstatsd.go
Expand Up @@ -120,6 +120,11 @@ func (s *DogStatsdSink) AddSampleWithLabels(key []string, val float32, labels []
s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate)
}

// Shutdown disables further metric collection, blocks to flush data, and tears down the sink.
func (s *DogStatsdSink) Shutdown() {
s.client.Close()
ggambetti marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *DogStatsdSink) getFlatkeyAndCombinedLabels(key []string, labels []metrics.Label) (string, []string) {
key, parsedLabels := s.parseKey(key)
flatKey := s.flattenKey(key)
Expand Down
5 changes: 5 additions & 0 deletions datadog/dogstatsd_test.go
Expand Up @@ -151,3 +151,8 @@ func assertServerMatchesExpected(t *testing.T, server *net.UDPConn, buf []byte,
t.Fatalf("Line %s does not match expected: %s", string(msg), expected)
}
}

func TestMetricSinkInterface(t *testing.T) {
var dd *DogStatsdSink
_ = metrics.MetricSink(dd)
}
4 changes: 4 additions & 0 deletions inmem.go
Expand Up @@ -230,6 +230,10 @@ func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Labe
agg.Ingest(float64(val), i.rateDenom)
}

func (i *InmemSink) Shutdown() {
// Do nothing. InmemSink does not have cleanup associated with shutdown.
}

// Data is used to retrieve all the aggregated metrics
// Intervals may be in use, and a read lock should be acquired
func (i *InmemSink) Data() []*IntervalMetrics {
Expand Down
4 changes: 4 additions & 0 deletions metrics.go
Expand Up @@ -172,6 +172,10 @@ func (m *Metrics) UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabe
}
}

func (m *Metrics) Shutdown() {
m.sink.Shutdown()
}

// labelIsAllowed return true if a should be included in metric
// the caller should lock m.filterLock while calling this method
func (m *Metrics) labelIsAllowed(label *Label) bool {
Expand Down
11 changes: 10 additions & 1 deletion prometheus/prometheus.go
@@ -1,3 +1,4 @@
//go:build go1.9
// +build go1.9

package prometheus
Expand All @@ -20,7 +21,7 @@ var (
// PrometheusSink.
DefaultPrometheusOpts = PrometheusOpts{
Expiration: 60 * time.Second,
Name: "default_prometheus_sink",
Name: "default_prometheus_sink",
}
)

Expand Down Expand Up @@ -393,6 +394,10 @@ func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labe
}
}

// Shutdown is not implemented. PrometheusSink is in memory storage.
func (p *PrometheusSink) Shutdown() {
}

// PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address
// on an interval.
type PrometheusPushSink struct {
Expand Down Expand Up @@ -446,6 +451,10 @@ func (s *PrometheusPushSink) flushMetrics() {
}()
}

// Shutdown tears down the PrometheusPushSink, and blocks while flushing metrics to the backend.
func (s *PrometheusPushSink) Shutdown() {
close(s.stopChan)
// Closing the channel only stops the running goroutine that pushes metrics.
// To minimize the chance of data loss pusher.Push is called one last time.
s.pusher.Push()
}
12 changes: 10 additions & 2 deletions prometheus/prometheus_test.go
Expand Up @@ -54,6 +54,7 @@ func TestNewPrometheusSink(t *testing.T) {
t.Fatalf("Unregister(sink) = false, want true")
}
}

// TestMultiplePrometheusSink tests registering multiple sinks on the same registerer with different descriptors
func TestMultiplePrometheusSink(t *testing.T) {
gaugeDef := GaugeDefinition{
Expand All @@ -66,14 +67,14 @@ func TestMultiplePrometheusSink(t *testing.T) {
GaugeDefinitions: append([]GaugeDefinition{}, gaugeDef),
SummaryDefinitions: append([]SummaryDefinition{}),
CounterDefinitions: append([]CounterDefinition{}),
Name: "sink1",
Name: "sink1",
}

sink1, err := NewPrometheusSinkFrom(cfg)
if err != nil {
t.Fatalf("err = %v, want nil", err)
}

reg := prometheus.DefaultRegisterer
if reg == nil {
t.Fatalf("Expected default register to be non nil, got nil.")
Expand Down Expand Up @@ -359,3 +360,10 @@ func TestDefinitionsWithLabels(t *testing.T) {
return true
})
}

func TestMetricSinkInterface(t *testing.T) {
var ps *PrometheusSink
_ = metrics.MetricSink(ps)
var pps *PrometheusPushSink
_ = metrics.MetricSink(pps)
}
12 changes: 12 additions & 0 deletions sink.go
Expand Up @@ -22,6 +22,11 @@ type MetricSink interface {
// Samples are for timing information, where quantiles are used
AddSample(key []string, val float32)
AddSampleWithLabels(key []string, val float32, labels []Label)

// Shutdown the metric sink, flush metrics to storage, and cleanup resources.
// Called immediately prior to application exit. Implementations must block
// until metrics are flushed to storage.
Shutdown()
}

// BlackholeSink is used to just blackhole messages
Expand All @@ -34,6 +39,7 @@ func (*BlackholeSink) IncrCounter(key []string, val float32)
func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {}
func (*BlackholeSink) AddSample(key []string, val float32) {}
func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {}
func (*BlackholeSink) Shutdown() {}

// FanoutSink is used to sink to fanout values to multiple sinks
type FanoutSink []MetricSink
Expand Down Expand Up @@ -74,6 +80,12 @@ func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Lab
}
}

func (fh FanoutSink) Shutdown() {
for _, s := range fh {
s.Shutdown()
}
}

// sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided
// by each sink type
type sinkURLFactoryFunc func(*url.URL) (MetricSink, error)
Expand Down
13 changes: 10 additions & 3 deletions sink_test.go
Expand Up @@ -10,9 +10,10 @@ import (
type MockSink struct {
lock sync.Mutex

keys [][]string
vals []float32
labels [][]Label
shutdown bool
keys [][]string
vals []float32
labels [][]Label
}

func (m *MockSink) getKeys() [][]string {
Expand Down Expand Up @@ -63,6 +64,12 @@ func (m *MockSink) AddSampleWithLabels(key []string, val float32, labels []Label
m.vals = append(m.vals, val)
m.labels = append(m.labels, labels)
}
func (m *MockSink) Shutdown() {
m.lock.Lock()
defer m.lock.Unlock()

m.shutdown = true
}

func TestFanoutSink_Gauge(t *testing.T) {
m1 := &MockSink{}
Expand Down
12 changes: 12 additions & 0 deletions start.go
Expand Up @@ -144,3 +144,15 @@ func UpdateFilter(allow, block []string) {
func UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) {
globalMetrics.Load().(*Metrics).UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels)
}

// Shutdown disables metric collection, then blocks while attempting to flush metrics to storage.
// WARNING: Not all MetricSink backends support this functionality, and calling this will cause them to leak resources.
// This is intended for use immediately prior to application exit.
func Shutdown() {
ggambetti marked this conversation as resolved.
Show resolved Hide resolved
m := globalMetrics.Load().(*Metrics)
// Swap whatever MetricSink is currently active with a BlackholeSink. Callers must not have a
// reason to expect that calls to the library will successfully collect metrics after Shutdown
// has been called.
globalMetrics.Store(&Metrics{sink: &BlackholeSink{}})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this line 100% <-- would you mind helping me understand why we need to do this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to explicitly break the following example, so that library consumers build their applications knowing that nothing is collected post Shutdown, and don't accidentally become dependant on collecting metrics post-Shutdown.

import "github.com/armon/go-metrics"

func main() {
  // The implementation of the metrics library means
  // that {"hello", "world"} will only ever have the value 1.
  metrics.New(...)
  metrics.IncrCounter([]string{"hello", "world"}, 1)
  metrics.Shutdown()
  metrics.IncrCounter([]string{"hello", "world"}, 1)
}

Resetting the library to the BlackholeSink isn't strictly necessary. But between applications not cleaning up goroutines (that generate calls to the library), and MetricSink implementations not supporting shutdown/close, it's possible that applications collect metrics and then upload them in the small window between Shutdown and exit. This makes it pretty explicit that this isn't intended or supported.

There is a race condition here: callers getting a pointer to the underlying Metrics struct and then completing their call into the MetricSink after Shutdown finishes. MetricSink must be thread-safe, so this is likely fine.

Copy link
Contributor

@acpana acpana Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for laying out for me here 💯 I see now the issue that could happen;

m.Shutdown()
}
20 changes: 20 additions & 0 deletions start_test.go
Expand Up @@ -192,6 +192,26 @@ func Test_GlobalMetrics_UpdateFilter(t *testing.T) {
}
}

func Test_GlobalMetrics_Shutdown(t *testing.T) {
s := &MockSink{}
m := &Metrics{sink: s}
globalMetrics.Store(m)

Shutdown()

loaded := globalMetrics.Load()
metrics, ok := loaded.(*Metrics)
if !ok {
t.Fatalf("Expected globalMetrics to contain a Metrics pointer, but found: %v", loaded)
}
if metrics == m {
t.Errorf("Calling shutdown should have replaced the Metrics struct stored in globalMetrics")
}
if !s.shutdown {
t.Errorf("Expected Shutdown to have been called on MockSink")
}
}

// Benchmark_GlobalMetrics_Direct/direct-8 5000000 278 ns/op
// Benchmark_GlobalMetrics_Direct/atomic.Value-8 5000000 235 ns/op
func Benchmark_GlobalMetrics_Direct(b *testing.B) {
Expand Down