Skip to content

Commit

Permalink
Updated documentation and adds tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ggambetti committed Apr 19, 2022
1 parent 3e16282 commit 4fbf9d3
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 12 deletions.
6 changes: 4 additions & 2 deletions circonus/circonus.go
Expand Up @@ -97,9 +97,11 @@ func (s *CirconusSink) AddSampleWithLabels(key []string, val float32, labels []m
s.metrics.RecordValue(flatKey, float64(val))
}

// Shutdown blocks while flushing metrics to the backend.
func (s *CirconusSink) Shutdown() {
// The used version of the circonus metrics library does not support a shutdown operation.
// Instead we call Flush which blocks until metrics are submitted to storage, and then exit
// The version of circonus metrics in go.mod (v2.3.1), and the current
// version (v3.4.6) do not support a shutdown operation. Instead we call
// Flush which blocks until metrics are submitted to storage, and then exit
// as the README examples do.
s.metrics.Flush()
}
Expand Down
1 change: 0 additions & 1 deletion const_unix.go
@@ -1,4 +1,3 @@
//go:build !windows
// +build !windows

package metrics
Expand Down
1 change: 0 additions & 1 deletion const_windows.go
@@ -1,4 +1,3 @@
//go:build windows
// +build windows

package metrics
Expand Down
1 change: 1 addition & 0 deletions datadog/dogstatsd.go
Expand Up @@ -120,6 +120,7 @@ func (s *DogStatsdSink) AddSampleWithLabels(key []string, val float32, labels []
s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate)
}

// Shutdown disables further metric collection, blocks to flush data, and tears down the sink.
func (s *DogStatsdSink) Shutdown() {
s.client.Close()
}
Expand Down
4 changes: 4 additions & 0 deletions prometheus/prometheus.go
Expand Up @@ -451,6 +451,10 @@ func (s *PrometheusPushSink) flushMetrics() {
}()
}

// Shutdown tears down the PrometheusPushSink, and blocks while flushing metrics to the backend.
func (s *PrometheusPushSink) Shutdown() {
close(s.stopChan)
// Closing the channel only stops the running goroutine that pushes metrics.
// To minimize the chance of data loss pusher.Push is called one last time.
s.pusher.Push()
}
4 changes: 3 additions & 1 deletion sink.go
Expand Up @@ -23,7 +23,9 @@ type MetricSink interface {
AddSample(key []string, val float32)
AddSampleWithLabels(key []string, val float32, labels []Label)

// Shutdown the sink, flushing data, and performing cleanup as necessary.
// Shutdown the metric sink, flush metrics to storage, and cleanup resources.
// Called immediately prior to application exit. Implementations must block
// until metrics are flushed to storage.
Shutdown()
}

Expand Down
14 changes: 10 additions & 4 deletions sink_test.go
Expand Up @@ -10,9 +10,10 @@ import (
type MockSink struct {
lock sync.Mutex

keys [][]string
vals []float32
labels [][]Label
shutdown bool
keys [][]string
vals []float32
labels [][]Label
}

func (m *MockSink) getKeys() [][]string {
Expand Down Expand Up @@ -63,7 +64,12 @@ func (m *MockSink) AddSampleWithLabels(key []string, val float32, labels []Label
m.vals = append(m.vals, val)
m.labels = append(m.labels, labels)
}
func (m *MockSink) Shutdown() {}
func (m *MockSink) Shutdown() {
m.lock.Lock()
defer m.lock.Unlock()

m.shutdown = true
}

func TestFanoutSink_Gauge(t *testing.T) {
m1 := &MockSink{}
Expand Down
8 changes: 5 additions & 3 deletions start.go
Expand Up @@ -145,12 +145,14 @@ func UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string)
globalMetrics.Load().(*Metrics).UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels)
}

// Shutdown flushes and disables metric collection, blocking while waiting for this to complete.
// WARNING: Not all MetricSink backends support this functionality, and calling this will cause resource leaks.
// Shutdown disables metric collection, then blocks while attempting to flush metrics to storage.
// WARNING: Not all MetricSink backends support this functionality, and calling this will cause them to leak resources.
// This is intended for use immediately prior to application exit.
func Shutdown() {
m := globalMetrics.Load().(*Metrics)
// Replace global metrics with the BlackholeSink like how init setup the library.
// Swap whatever MetricSink is currently active with a BlackholeSink. Callers must not have a
// reason to expect that calls to the library will successfully collect metrics after Shutdown
// has been called.
globalMetrics.Store(&Metrics{sink: &BlackholeSink{}})
m.Shutdown()
}
20 changes: 20 additions & 0 deletions start_test.go
Expand Up @@ -192,6 +192,26 @@ func Test_GlobalMetrics_UpdateFilter(t *testing.T) {
}
}

func Test_GlobalMetrics_Shutdown(t *testing.T) {
s := &MockSink{}
m := &Metrics{sink: s}
globalMetrics.Store(m)

Shutdown()

loaded := globalMetrics.Load()
metrics, ok := loaded.(*Metrics)
if !ok {
t.Fatalf("Expected globalMetrics to contain a Metrics pointer, but found: %v", loaded)
}
if metrics == m {
t.Errorf("Calling shutdown should have replaced the Metrics struct stored in globalMetrics")
}
if !s.shutdown {
t.Errorf("Expected Shutdown to have been called on MockSink")
}
}

// Benchmark_GlobalMetrics_Direct/direct-8 5000000 278 ns/op
// Benchmark_GlobalMetrics_Direct/atomic.Value-8 5000000 235 ns/op
func Benchmark_GlobalMetrics_Direct(b *testing.B) {
Expand Down

0 comments on commit 4fbf9d3

Please sign in to comment.