Skip to content

Commit

Permalink
Metrics wrapper that adds the cluster name as a label. (#8961)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Gritter committed May 13, 2020
1 parent 06f131e commit d5b1d5d
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 10 deletions.
30 changes: 20 additions & 10 deletions command/server.go
Expand Up @@ -918,7 +918,7 @@ func (c *ServerCommand) Run(args []string) int {
"in a Docker container, provide the IPC_LOCK cap to the container."))
}

metricsHelper, err := c.setupTelemetry(config)
metricsHelper, metricSink, err := c.setupTelemetry(config)
if err != nil {
c.UI.Error(fmt.Sprintf("Error initializing telemetry: %s", err))
return 1
Expand Down Expand Up @@ -1111,6 +1111,7 @@ func (c *ServerCommand) Run(args []string) int {
BuiltinRegistry: builtinplugins.Registry,
DisableKeyEncodingChecks: config.DisablePrintableCheck,
MetricsHelper: metricsHelper,
MetricSink: metricSink,
SecureRandomReader: secureRandomReader,
}
if c.flagDev {
Expand Down Expand Up @@ -2352,7 +2353,7 @@ func (c *ServerCommand) detectRedirect(detect physical.RedirectDetect,
}

// setupTelemetry is used to setup the telemetry sub-systems and returns the in-memory sink to be used in http configuration
func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.MetricsHelper, error) {
func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.MetricsHelper, *metricsutil.ClusterMetricSink, error) {
/* Setup telemetry
Aggregate on 10 second intervals for 1 minute. Expose the
metrics over stderr when there is a SIGUSR1 received.
Expand Down Expand Up @@ -2389,7 +2390,7 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr

sink, err := prometheus.NewPrometheusSinkFrom(prometheusOpts)
if err != nil {
return nil, err
return nil, nil, err
}
fanout = append(fanout, sink)
}
Expand All @@ -2399,7 +2400,7 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr
if telConfig.StatsiteAddr != "" {
sink, err := metrics.NewStatsiteSink(telConfig.StatsiteAddr)
if err != nil {
return nil, err
return nil, nil, err
}
fanout = append(fanout, sink)
}
Expand All @@ -2408,7 +2409,7 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr
if telConfig.StatsdAddr != "" {
sink, err := metrics.NewStatsdSink(telConfig.StatsdAddr)
if err != nil {
return nil, err
return nil, nil, err
}
fanout = append(fanout, sink)
}
Expand Down Expand Up @@ -2444,7 +2445,7 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr

sink, err := circonus.NewCirconusSink(cfg)
if err != nil {
return nil, err
return nil, nil, err
}
sink.Start()
fanout = append(fanout, sink)
Expand All @@ -2459,7 +2460,7 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr

sink, err := datadog.NewDogStatsdSink(telConfig.DogStatsDAddr, metricsConf.HostName)
if err != nil {
return nil, errwrap.Wrapf("failed to start DogStatsD sink: {{err}}", err)
return nil, nil, errwrap.Wrapf("failed to start DogStatsD sink: {{err}}", err)
}
sink.SetTags(tags)
fanout = append(fanout, sink)
Expand All @@ -2469,7 +2470,7 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr
if telConfig.StackdriverProjectID != "" {
client, err := monitoring.NewMetricClient(context.Background(), option.WithUserAgent(useragent.String()))
if err != nil {
return nil, fmt.Errorf("Failed to create stackdriver client: %v", err)
return nil, nil, fmt.Errorf("Failed to create stackdriver client: %v", err)
}
sink := stackdriver.NewSink(client, &stackdriver.Config{
LabelExtractor: stackdrivervault.Extractor,
Expand All @@ -2495,10 +2496,19 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr
_, err := metrics.NewGlobal(metricsConf, fanout)

if err != nil {
return nil, err
return nil, nil, err
}

// Intialize a wrapper around the global sink; this will be passed to Core
// and to any backend.
wrapper := &metricsutil.ClusterMetricSink{
ClusterName: config.ClusterName,
MaxGaugeCardinality: 500,
GaugeInterval: 10 * time.Minute,
Sink: fanout,
}

return metricHelper, nil
return metricHelper, wrapper, nil
}

func (c *ServerCommand) Reload(lock *sync.RWMutex, reloadFuncs *map[string][]reloadutil.ReloadFunc, configPath []string) error {
Expand Down
59 changes: 59 additions & 0 deletions helper/metricsutil/wrapped_metrics.go
@@ -0,0 +1,59 @@
package metricsutil

import (
"time"

metrics "github.com/armon/go-metrics"
)

// ClusterMetricSink serves as a shim around go-metrics
// and inserts a "cluster" label.
//
// It also provides a mechanism to limit the cardinality of the labels on a gauge
// (at each reporting interval, which isn't sufficient if there is variability in which
// labels are the top N) and a backoff mechanism for gauge computation.
type ClusterMetricSink struct {
// ClusterName is either the cluster ID, or a name provided
// in the telemetry configuration stanza.
ClusterName string

MaxGaugeCardinality int
GaugeInterval time.Duration

// Sink is the go-metrics sink to send to
Sink metrics.MetricSink
}

// Convenience alias
type Label = metrics.Label

func (m *ClusterMetricSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
m.Sink.SetGaugeWithLabels(key, val,
append(labels, Label{"cluster", m.ClusterName}))
}

func (m *ClusterMetricSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
m.Sink.IncrCounterWithLabels(key, val,
append(labels, Label{"cluster", m.ClusterName}))
}

func (m *ClusterMetricSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
m.Sink.AddSampleWithLabels(key, val,
append(labels, Label{"cluster", m.ClusterName}))
}

// BlackholeSink is a default suitable for use in unit tests.
func BlackholeSink() *ClusterMetricSink {
return &ClusterMetricSink{
ClusterName: "",
Sink: &metrics.BlackholeSink{},
}
}

// SetDefaultClusterName changes the cluster name from its default value,
// if it has not previously been configured.
func (m *ClusterMetricSink) SetDefaultClusterName(clusterName string) {
if m.ClusterName == "" {
m.ClusterName = clusterName
}
}
102 changes: 102 additions & 0 deletions helper/metricsutil/wrapped_metrics_test.go
@@ -0,0 +1,102 @@
package metricsutil

import (
"testing"
"time"

"github.com/armon/go-metrics"
)

func isLabelPresent(toFind Label, ls []Label) bool {
for _, l := range ls {
if l == toFind {
return true
}
}
return false
}

func TestClusterLabelPresent(t *testing.T) {
testClusterName := "test-cluster"

// Use a ridiculously long time to minimize the chance
// that we have to deal with more than one interval.
// InMemSink rounds down to an interval boundary rather than
// starting one at the time of initialization.
inmemSink := metrics.NewInmemSink(
1000000*time.Hour,
2000000*time.Hour)
clusterSink := &ClusterMetricSink{
ClusterName: testClusterName,
Sink: inmemSink,
}

key1 := []string{"aaa", "bbb"}
key2 := []string{"ccc", "ddd"}
key3 := []string{"eee", "fff"}
labels1 := []Label{{"dim1", "val1"}}
labels2 := []Label{{"dim2", "val2"}}
labels3 := []Label{{"dim3", "val3"}}
clusterLabel := Label{"cluster", testClusterName}
expectedKey1 := "aaa.bbb;dim1=val1;cluster=" + testClusterName
expectedKey2 := "ccc.ddd;dim2=val2;cluster=" + testClusterName
expectedKey3 := "eee.fff;dim3=val3;cluster=" + testClusterName

clusterSink.SetGaugeWithLabels(key1, 1.0, labels1)
clusterSink.IncrCounterWithLabels(key2, 2.0, labels2)
clusterSink.AddSampleWithLabels(key3, 3.0, labels3)

intervals := inmemSink.Data()
// If we start very close to the end of an interval, then our metrics might be
// split across two different buckets. We won't write the code to try to handle that.
// 100000-hours = at most once every 4167 days
if len(intervals) > 1 {
t.Skip("Detected interval crossing.")
}

// Check Gauge
g, ok := intervals[0].Gauges[expectedKey1]
if !ok {
t.Fatal("Key", expectedKey1, "not found in map", intervals[0].Gauges)
}
if g.Value != 1.0 {
t.Error("Gauge value", g.Value, "does not match", 1.0)
}
if !isLabelPresent(labels1[0], g.Labels) {
t.Error("Gauge label", g.Labels, "does not include", labels1)
}
if !isLabelPresent(clusterLabel, g.Labels) {
t.Error("Gauge label", g.Labels, "does not include", clusterLabel)
}

// Check Counter
c, ok := intervals[0].Counters[expectedKey2]
if !ok {
t.Fatal("Key", expectedKey2, "not found in map", intervals[0].Counters)
}
if c.Sum != 2.0 {
t.Error("Counter value", c.Sum, "does not match", 2.0)
}
if !isLabelPresent(labels2[0], c.Labels) {
t.Error("Counter label", c.Labels, "does not include", labels2)
}
if !isLabelPresent(clusterLabel, c.Labels) {
t.Error("Counter label", c.Labels, "does not include", clusterLabel)
}

// Check Sample
s, ok := intervals[0].Samples[expectedKey3]
if !ok {
t.Fatal("Key", expectedKey3, "not found in map", intervals[0].Samples)
}
if s.Sum != 3.0 {
t.Error("Sample value", s.Sum, "does not match", 3.0)
}
if !isLabelPresent(labels3[0], s.Labels) {
t.Error("Sample label", s.Labels, "does not include", labels3)
}
if !isLabelPresent(clusterLabel, s.Labels) {
t.Error("Sample label", s.Labels, "does not include", clusterLabel)
}

}
4 changes: 4 additions & 0 deletions vault/cluster.go
Expand Up @@ -185,6 +185,10 @@ func (c *Core) setupCluster(ctx context.Context) error {
modified = true
}

// This is the first point at which the stored (or newly generated)
// cluster name is known.
c.metricSink.SetDefaultClusterName(cluster.Name)

if cluster.ID == "" {
c.logger.Debug("cluster ID not found, generating new")
// Generate a clusterID
Expand Down
16 changes: 16 additions & 0 deletions vault/core.go
Expand Up @@ -350,6 +350,10 @@ type Core struct {
// metrics emission and sealing leading to a nil pointer
metricsMutex sync.Mutex

// metricSink is the destination for all metrics that have
// a cluster label.
metricSink *metricsutil.ClusterMetricSink

defaultLeaseTTL time.Duration
maxLeaseTTL time.Duration

Expand Down Expand Up @@ -598,6 +602,7 @@ type CoreConfig struct {

// Telemetry objects
MetricsHelper *metricsutil.MetricsHelper
MetricSink *metricsutil.ClusterMetricSink

CounterSyncInterval time.Duration

Expand Down Expand Up @@ -699,6 +704,11 @@ func NewCore(conf *CoreConfig) (*Core, error) {
conf.Logger = logging.NewVaultLogger(log.Trace)
}

// Make a default metric sink if not provided
if conf.MetricSink == nil {
conf.MetricSink = metricsutil.BlackholeSink()
}

// Instantiate a non-nil raw config if none is provided
if conf.RawConfig == nil {
conf.RawConfig = new(server.Config)
Expand Down Expand Up @@ -757,6 +767,7 @@ func NewCore(conf *CoreConfig) (*Core, error) {
neverBecomeActive: new(uint32),
clusterLeaderParams: new(atomic.Value),
metricsHelper: conf.MetricsHelper,
metricSink: conf.MetricSink,
secureRandomReader: conf.SecureRandomReader,
rawConfig: new(atomic.Value),
counters: counters{
Expand Down Expand Up @@ -2340,6 +2351,11 @@ func (c *Core) MetricsHelper() *metricsutil.MetricsHelper {
return c.metricsHelper
}

// MetricSink returns the metrics wrapper with which Core has been configured.
func (c *Core) MetricSink() *metricsutil.ClusterMetricSink {
return c.metricSink
}

// BuiltinRegistry is an interface that allows the "vault" package to use
// the registry of builtin plugins without getting an import cycle. It
// also allows for mocking the registry easily.
Expand Down

0 comments on commit d5b1d5d

Please sign in to comment.