diff --git a/circonus/circonus.go b/circonus/circonus.go index eb41b99..9f1ea18 100644 --- a/circonus/circonus.go +++ b/circonus/circonus.go @@ -97,6 +97,15 @@ func (s *CirconusSink) AddSampleWithLabels(key []string, val float32, labels []m s.metrics.RecordValue(flatKey, float64(val)) } +// Shutdown blocks while flushing metrics to the backend. +func (s *CirconusSink) Shutdown() { + // The version of circonus metrics in go.mod (v2.3.1), and the current + // version (v3.4.6) do not support a shutdown operation. Instead we call + // Flush which blocks until metrics are submitted to storage, and then exit + // as the README examples do. + s.metrics.Flush() +} + // Flattens key to Circonus metric name func (s *CirconusSink) flattenKey(parts []string) string { joined := strings.Join(parts, "`") diff --git a/circonus/circonus_test.go b/circonus/circonus_test.go index 2644d57..77e9f57 100644 --- a/circonus/circonus_test.go +++ b/circonus/circonus_test.go @@ -8,6 +8,8 @@ import ( "net/http/httptest" "strings" "testing" + + "github.com/armon/go-metrics" ) func TestNewCirconusSink(t *testing.T) { @@ -152,3 +154,8 @@ func TestAddSample(t *testing.T) { } } + +func TestMetricSinkInterface(t *testing.T) { + var cs *CirconusSink + _ = metrics.MetricSink(cs) +} diff --git a/datadog/dogstatsd.go b/datadog/dogstatsd.go index fe021d0..c980004 100644 --- a/datadog/dogstatsd.go +++ b/datadog/dogstatsd.go @@ -120,6 +120,11 @@ func (s *DogStatsdSink) AddSampleWithLabels(key []string, val float32, labels [] s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate) } +// Shutdown disables further metric collection, blocks to flush data, and tears down the sink. +func (s *DogStatsdSink) Shutdown() { + s.client.Close() +} + func (s *DogStatsdSink) getFlatkeyAndCombinedLabels(key []string, labels []metrics.Label) (string, []string) { key, parsedLabels := s.parseKey(key) flatKey := s.flattenKey(key) diff --git a/datadog/dogstatsd_test.go b/datadog/dogstatsd_test.go index cd3f833..3545e8d 100644 --- a/datadog/dogstatsd_test.go +++ b/datadog/dogstatsd_test.go @@ -151,3 +151,8 @@ func assertServerMatchesExpected(t *testing.T, server *net.UDPConn, buf []byte, t.Fatalf("Line %s does not match expected: %s", string(msg), expected) } } + +func TestMetricSinkInterface(t *testing.T) { + var dd *DogStatsdSink + _ = metrics.MetricSink(dd) +} diff --git a/inmem.go b/inmem.go index 7c427ac..a52d0ff 100644 --- a/inmem.go +++ b/inmem.go @@ -230,6 +230,10 @@ func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Labe agg.Ingest(float64(val), i.rateDenom) } +func (i *InmemSink) Shutdown() { + // Do nothing. InmemSink does not have cleanup associated with shutdown. +} + // Data is used to retrieve all the aggregated metrics // Intervals may be in use, and a read lock should be acquired func (i *InmemSink) Data() []*IntervalMetrics { diff --git a/metrics.go b/metrics.go index 6753b13..047657b 100644 --- a/metrics.go +++ b/metrics.go @@ -172,6 +172,10 @@ func (m *Metrics) UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabe } } +func (m *Metrics) Shutdown() { + m.sink.Shutdown() +} + // labelIsAllowed return true if a should be included in metric // the caller should lock m.filterLock while calling this method func (m *Metrics) labelIsAllowed(label *Label) bool { diff --git a/prometheus/prometheus.go b/prometheus/prometheus.go index 5a8282f..f89cfd9 100644 --- a/prometheus/prometheus.go +++ b/prometheus/prometheus.go @@ -1,3 +1,4 @@ +//go:build go1.9 // +build go1.9 package prometheus @@ -20,7 +21,7 @@ var ( // PrometheusSink. DefaultPrometheusOpts = PrometheusOpts{ Expiration: 60 * time.Second, - Name: "default_prometheus_sink", + Name: "default_prometheus_sink", } ) @@ -393,6 +394,10 @@ func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labe } } +// Shutdown is not implemented. PrometheusSink is in memory storage. +func (p *PrometheusSink) Shutdown() { +} + // PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address // on an interval. type PrometheusPushSink struct { @@ -446,6 +451,10 @@ func (s *PrometheusPushSink) flushMetrics() { }() } +// Shutdown tears down the PrometheusPushSink, and blocks while flushing metrics to the backend. func (s *PrometheusPushSink) Shutdown() { close(s.stopChan) + // Closing the channel only stops the running goroutine that pushes metrics. + // To minimize the chance of data loss pusher.Push is called one last time. + s.pusher.Push() } diff --git a/prometheus/prometheus_test.go b/prometheus/prometheus_test.go index 6190874..321a7b9 100644 --- a/prometheus/prometheus_test.go +++ b/prometheus/prometheus_test.go @@ -54,6 +54,7 @@ func TestNewPrometheusSink(t *testing.T) { t.Fatalf("Unregister(sink) = false, want true") } } + // TestMultiplePrometheusSink tests registering multiple sinks on the same registerer with different descriptors func TestMultiplePrometheusSink(t *testing.T) { gaugeDef := GaugeDefinition{ @@ -66,14 +67,14 @@ func TestMultiplePrometheusSink(t *testing.T) { GaugeDefinitions: append([]GaugeDefinition{}, gaugeDef), SummaryDefinitions: append([]SummaryDefinition{}), CounterDefinitions: append([]CounterDefinition{}), - Name: "sink1", + Name: "sink1", } sink1, err := NewPrometheusSinkFrom(cfg) if err != nil { t.Fatalf("err = %v, want nil", err) } - + reg := prometheus.DefaultRegisterer if reg == nil { t.Fatalf("Expected default register to be non nil, got nil.") @@ -359,3 +360,10 @@ func TestDefinitionsWithLabels(t *testing.T) { return true }) } + +func TestMetricSinkInterface(t *testing.T) { + var ps *PrometheusSink + _ = metrics.MetricSink(ps) + var pps *PrometheusPushSink + _ = metrics.MetricSink(pps) +} diff --git a/sink.go b/sink.go index 0b7d6e4..b839844 100644 --- a/sink.go +++ b/sink.go @@ -22,6 +22,11 @@ type MetricSink interface { // Samples are for timing information, where quantiles are used AddSample(key []string, val float32) AddSampleWithLabels(key []string, val float32, labels []Label) + + // Shutdown the metric sink, flush metrics to storage, and cleanup resources. + // Called immediately prior to application exit. Implementations must block + // until metrics are flushed to storage. + Shutdown() } // BlackholeSink is used to just blackhole messages @@ -34,6 +39,7 @@ func (*BlackholeSink) IncrCounter(key []string, val float32) func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {} func (*BlackholeSink) AddSample(key []string, val float32) {} func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {} +func (*BlackholeSink) Shutdown() {} // FanoutSink is used to sink to fanout values to multiple sinks type FanoutSink []MetricSink @@ -74,6 +80,12 @@ func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Lab } } +func (fh FanoutSink) Shutdown() { + for _, s := range fh { + s.Shutdown() + } +} + // sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided // by each sink type type sinkURLFactoryFunc func(*url.URL) (MetricSink, error) diff --git a/sink_test.go b/sink_test.go index 36da370..dbaab7d 100644 --- a/sink_test.go +++ b/sink_test.go @@ -10,9 +10,10 @@ import ( type MockSink struct { lock sync.Mutex - keys [][]string - vals []float32 - labels [][]Label + shutdown bool + keys [][]string + vals []float32 + labels [][]Label } func (m *MockSink) getKeys() [][]string { @@ -63,6 +64,12 @@ func (m *MockSink) AddSampleWithLabels(key []string, val float32, labels []Label m.vals = append(m.vals, val) m.labels = append(m.labels, labels) } +func (m *MockSink) Shutdown() { + m.lock.Lock() + defer m.lock.Unlock() + + m.shutdown = true +} func TestFanoutSink_Gauge(t *testing.T) { m1 := &MockSink{} diff --git a/start.go b/start.go index 6aa0bd3..38976f8 100644 --- a/start.go +++ b/start.go @@ -144,3 +144,15 @@ func UpdateFilter(allow, block []string) { func UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) { globalMetrics.Load().(*Metrics).UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels) } + +// Shutdown disables metric collection, then blocks while attempting to flush metrics to storage. +// WARNING: Not all MetricSink backends support this functionality, and calling this will cause them to leak resources. +// This is intended for use immediately prior to application exit. +func Shutdown() { + m := globalMetrics.Load().(*Metrics) + // Swap whatever MetricSink is currently active with a BlackholeSink. Callers must not have a + // reason to expect that calls to the library will successfully collect metrics after Shutdown + // has been called. + globalMetrics.Store(&Metrics{sink: &BlackholeSink{}}) + m.Shutdown() +} diff --git a/start_test.go b/start_test.go index 8ff5ca0..38a7b26 100644 --- a/start_test.go +++ b/start_test.go @@ -192,6 +192,26 @@ func Test_GlobalMetrics_UpdateFilter(t *testing.T) { } } +func Test_GlobalMetrics_Shutdown(t *testing.T) { + s := &MockSink{} + m := &Metrics{sink: s} + globalMetrics.Store(m) + + Shutdown() + + loaded := globalMetrics.Load() + metrics, ok := loaded.(*Metrics) + if !ok { + t.Fatalf("Expected globalMetrics to contain a Metrics pointer, but found: %v", loaded) + } + if metrics == m { + t.Errorf("Calling shutdown should have replaced the Metrics struct stored in globalMetrics") + } + if !s.shutdown { + t.Errorf("Expected Shutdown to have been called on MockSink") + } +} + // Benchmark_GlobalMetrics_Direct/direct-8 5000000 278 ns/op // Benchmark_GlobalMetrics_Direct/atomic.Value-8 5000000 235 ns/op func Benchmark_GlobalMetrics_Direct(b *testing.B) {