Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Shutdown to Metrics API #132

Merged
merged 3 commits into from Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions circonus/circonus.go
Expand Up @@ -97,6 +97,13 @@ func (s *CirconusSink) AddSampleWithLabels(key []string, val float32, labels []m
s.metrics.RecordValue(flatKey, float64(val))
}

func (s *CirconusSink) Shutdown() {
// The used version of the circonus metrics library does not support a shutdown operation.
// Instead we call Flush which blocks until metrics are submitted to storage, and then exit
ggambetti marked this conversation as resolved.
Show resolved Hide resolved
// as the README examples do.
s.metrics.Flush()
}

// Flattens key to Circonus metric name
func (s *CirconusSink) flattenKey(parts []string) string {
joined := strings.Join(parts, "`")
Expand Down
7 changes: 7 additions & 0 deletions circonus/circonus_test.go
Expand Up @@ -8,6 +8,8 @@ import (
"net/http/httptest"
"strings"
"testing"

"github.com/armon/go-metrics"
)

func TestNewCirconusSink(t *testing.T) {
Expand Down Expand Up @@ -152,3 +154,8 @@ func TestAddSample(t *testing.T) {

}
}

func TestMetricSinkInterface(t *testing.T) {
var cs *CirconusSink
_ = metrics.MetricSink(cs)
}
1 change: 1 addition & 0 deletions const_unix.go
@@ -1,3 +1,4 @@
//go:build !windows
ggambetti marked this conversation as resolved.
Show resolved Hide resolved
// +build !windows

package metrics
Expand Down
1 change: 1 addition & 0 deletions const_windows.go
@@ -1,3 +1,4 @@
//go:build windows
// +build windows

package metrics
Expand Down
4 changes: 4 additions & 0 deletions datadog/dogstatsd.go
Expand Up @@ -120,6 +120,10 @@ func (s *DogStatsdSink) AddSampleWithLabels(key []string, val float32, labels []
s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate)
}

func (s *DogStatsdSink) Shutdown() {
s.client.Close()
ggambetti marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *DogStatsdSink) getFlatkeyAndCombinedLabels(key []string, labels []metrics.Label) (string, []string) {
key, parsedLabels := s.parseKey(key)
flatKey := s.flattenKey(key)
Expand Down
5 changes: 5 additions & 0 deletions datadog/dogstatsd_test.go
Expand Up @@ -151,3 +151,8 @@ func assertServerMatchesExpected(t *testing.T, server *net.UDPConn, buf []byte,
t.Fatalf("Line %s does not match expected: %s", string(msg), expected)
}
}

func TestMetricSinkInterface(t *testing.T) {
var dd *DogStatsdSink
_ = metrics.MetricSink(dd)
}
4 changes: 4 additions & 0 deletions inmem.go
Expand Up @@ -230,6 +230,10 @@ func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Labe
agg.Ingest(float64(val), i.rateDenom)
}

func (i *InmemSink) Shutdown() {
// Do nothing. InmemSink does not have cleanup associated with shutdown.
}

// Data is used to retrieve all the aggregated metrics
// Intervals may be in use, and a read lock should be acquired
func (i *InmemSink) Data() []*IntervalMetrics {
Expand Down
4 changes: 4 additions & 0 deletions metrics.go
Expand Up @@ -172,6 +172,10 @@ func (m *Metrics) UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabe
}
}

func (m *Metrics) Shutdown() {
m.sink.Shutdown()
}

// labelIsAllowed return true if a should be included in metric
// the caller should lock m.filterLock while calling this method
func (m *Metrics) labelIsAllowed(label *Label) bool {
Expand Down
7 changes: 6 additions & 1 deletion prometheus/prometheus.go
@@ -1,3 +1,4 @@
//go:build go1.9
// +build go1.9

package prometheus
Expand All @@ -20,7 +21,7 @@ var (
// PrometheusSink.
DefaultPrometheusOpts = PrometheusOpts{
Expiration: 60 * time.Second,
Name: "default_prometheus_sink",
Name: "default_prometheus_sink",
}
)

Expand Down Expand Up @@ -393,6 +394,10 @@ func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labe
}
}

// Shutdown is not implemented. PrometheusSink is in memory storage.
func (p *PrometheusSink) Shutdown() {
}

// PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address
// on an interval.
type PrometheusPushSink struct {
Expand Down
12 changes: 10 additions & 2 deletions prometheus/prometheus_test.go
Expand Up @@ -54,6 +54,7 @@ func TestNewPrometheusSink(t *testing.T) {
t.Fatalf("Unregister(sink) = false, want true")
}
}

// TestMultiplePrometheusSink tests registering multiple sinks on the same registerer with different descriptors
func TestMultiplePrometheusSink(t *testing.T) {
gaugeDef := GaugeDefinition{
Expand All @@ -66,14 +67,14 @@ func TestMultiplePrometheusSink(t *testing.T) {
GaugeDefinitions: append([]GaugeDefinition{}, gaugeDef),
SummaryDefinitions: append([]SummaryDefinition{}),
CounterDefinitions: append([]CounterDefinition{}),
Name: "sink1",
Name: "sink1",
}

sink1, err := NewPrometheusSinkFrom(cfg)
if err != nil {
t.Fatalf("err = %v, want nil", err)
}

reg := prometheus.DefaultRegisterer
if reg == nil {
t.Fatalf("Expected default register to be non nil, got nil.")
Expand Down Expand Up @@ -359,3 +360,10 @@ func TestDefinitionsWithLabels(t *testing.T) {
return true
})
}

func TestMetricSinkInterface(t *testing.T) {
var ps *PrometheusSink
_ = metrics.MetricSink(ps)
var pps *PrometheusPushSink
_ = metrics.MetricSink(pps)
}
10 changes: 10 additions & 0 deletions sink.go
Expand Up @@ -22,6 +22,9 @@ type MetricSink interface {
// Samples are for timing information, where quantiles are used
AddSample(key []string, val float32)
AddSampleWithLabels(key []string, val float32, labels []Label)

// Shutdown the sink, flushing data, and performing cleanup as necessary.
Shutdown()
}

// BlackholeSink is used to just blackhole messages
Expand All @@ -34,6 +37,7 @@ func (*BlackholeSink) IncrCounter(key []string, val float32)
func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {}
func (*BlackholeSink) AddSample(key []string, val float32) {}
func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {}
func (*BlackholeSink) Shutdown() {}

// FanoutSink is used to sink to fanout values to multiple sinks
type FanoutSink []MetricSink
Expand Down Expand Up @@ -74,6 +78,12 @@ func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Lab
}
}

func (fh FanoutSink) Shutdown() {
for _, s := range fh {
s.Shutdown()
}
}

// sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided
// by each sink type
type sinkURLFactoryFunc func(*url.URL) (MetricSink, error)
Expand Down
1 change: 1 addition & 0 deletions sink_test.go
Expand Up @@ -63,6 +63,7 @@ func (m *MockSink) AddSampleWithLabels(key []string, val float32, labels []Label
m.vals = append(m.vals, val)
m.labels = append(m.labels, labels)
}
func (m *MockSink) Shutdown() {}

func TestFanoutSink_Gauge(t *testing.T) {
m1 := &MockSink{}
Expand Down
10 changes: 10 additions & 0 deletions start.go
Expand Up @@ -144,3 +144,13 @@ func UpdateFilter(allow, block []string) {
func UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) {
globalMetrics.Load().(*Metrics).UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels)
}

// Shutdown flushes and disables metric collection, blocking while waiting for this to complete.
// WARNING: Not all MetricSink backends support this functionality, and calling this will cause resource leaks.
// This is intended for use immediately prior to application exit.
func Shutdown() {
ggambetti marked this conversation as resolved.
Show resolved Hide resolved
m := globalMetrics.Load().(*Metrics)
// Replace global metrics with the BlackholeSink like how init setup the library.
globalMetrics.Store(&Metrics{sink: &BlackholeSink{}})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this line 100% <-- would you mind helping me understand why we need to do this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to explicitly break the following example, so that library consumers build their applications knowing that nothing is collected post Shutdown, and don't accidentally become dependant on collecting metrics post-Shutdown.

import "github.com/armon/go-metrics"

func main() {
  // The implementation of the metrics library means
  // that {"hello", "world"} will only ever have the value 1.
  metrics.New(...)
  metrics.IncrCounter([]string{"hello", "world"}, 1)
  metrics.Shutdown()
  metrics.IncrCounter([]string{"hello", "world"}, 1)
}

Resetting the library to the BlackholeSink isn't strictly necessary. But between applications not cleaning up goroutines (that generate calls to the library), and MetricSink implementations not supporting shutdown/close, it's possible that applications collect metrics and then upload them in the small window between Shutdown and exit. This makes it pretty explicit that this isn't intended or supported.

There is a race condition here: callers getting a pointer to the underlying Metrics struct and then completing their call into the MetricSink after Shutdown finishes. MetricSink must be thread-safe, so this is likely fine.

Copy link
Contributor

@acpana acpana Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for laying out for me here 💯 I see now the issue that could happen;

m.Shutdown()
}