diff --git a/command/server.go b/command/server.go index 8494ec5e8c10b..ab2feccbc38de 100644 --- a/command/server.go +++ b/command/server.go @@ -918,7 +918,7 @@ func (c *ServerCommand) Run(args []string) int { "in a Docker container, provide the IPC_LOCK cap to the container.")) } - metricsHelper, err := c.setupTelemetry(config) + metricsHelper, metricSink, err := c.setupTelemetry(config) if err != nil { c.UI.Error(fmt.Sprintf("Error initializing telemetry: %s", err)) return 1 @@ -1111,6 +1111,7 @@ func (c *ServerCommand) Run(args []string) int { BuiltinRegistry: builtinplugins.Registry, DisableKeyEncodingChecks: config.DisablePrintableCheck, MetricsHelper: metricsHelper, + MetricSink: metricSink, SecureRandomReader: secureRandomReader, } if c.flagDev { @@ -2352,7 +2353,7 @@ func (c *ServerCommand) detectRedirect(detect physical.RedirectDetect, } // setupTelemetry is used to setup the telemetry sub-systems and returns the in-memory sink to be used in http configuration -func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.MetricsHelper, error) { +func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.MetricsHelper, *metricsutil.ClusterMetricSink, error) { /* Setup telemetry Aggregate on 10 second intervals for 1 minute. Expose the metrics over stderr when there is a SIGUSR1 received. @@ -2389,7 +2390,7 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr sink, err := prometheus.NewPrometheusSinkFrom(prometheusOpts) if err != nil { - return nil, err + return nil, nil, err } fanout = append(fanout, sink) } @@ -2399,7 +2400,7 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr if telConfig.StatsiteAddr != "" { sink, err := metrics.NewStatsiteSink(telConfig.StatsiteAddr) if err != nil { - return nil, err + return nil, nil, err } fanout = append(fanout, sink) } @@ -2408,7 +2409,7 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr if telConfig.StatsdAddr != "" { sink, err := metrics.NewStatsdSink(telConfig.StatsdAddr) if err != nil { - return nil, err + return nil, nil, err } fanout = append(fanout, sink) } @@ -2444,7 +2445,7 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr sink, err := circonus.NewCirconusSink(cfg) if err != nil { - return nil, err + return nil, nil, err } sink.Start() fanout = append(fanout, sink) @@ -2459,7 +2460,7 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr sink, err := datadog.NewDogStatsdSink(telConfig.DogStatsDAddr, metricsConf.HostName) if err != nil { - return nil, errwrap.Wrapf("failed to start DogStatsD sink: {{err}}", err) + return nil, nil, errwrap.Wrapf("failed to start DogStatsD sink: {{err}}", err) } sink.SetTags(tags) fanout = append(fanout, sink) @@ -2469,7 +2470,7 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr if telConfig.StackdriverProjectID != "" { client, err := monitoring.NewMetricClient(context.Background(), option.WithUserAgent(useragent.String())) if err != nil { - return nil, fmt.Errorf("Failed to create stackdriver client: %v", err) + return nil, nil, fmt.Errorf("Failed to create stackdriver client: %v", err) } sink := stackdriver.NewSink(client, &stackdriver.Config{ LabelExtractor: stackdrivervault.Extractor, @@ -2495,10 +2496,19 @@ func (c *ServerCommand) setupTelemetry(config *server.Config) (*metricsutil.Metr _, err := metrics.NewGlobal(metricsConf, fanout) if err != nil { - return nil, err + return nil, nil, err + } + + // Intialize a wrapper around the global sink; this will be passed to Core + // and to any backend. + wrapper := &metricsutil.ClusterMetricSink{ + ClusterName: config.ClusterName, + MaxGaugeCardinality: 500, + GaugeInterval: 10 * time.Minute, + Sink: fanout, } - return metricHelper, nil + return metricHelper, wrapper, nil } func (c *ServerCommand) Reload(lock *sync.RWMutex, reloadFuncs *map[string][]reloadutil.ReloadFunc, configPath []string) error { diff --git a/helper/metricsutil/wrapped_metrics.go b/helper/metricsutil/wrapped_metrics.go new file mode 100644 index 0000000000000..5d4b26e549ff8 --- /dev/null +++ b/helper/metricsutil/wrapped_metrics.go @@ -0,0 +1,59 @@ +package metricsutil + +import ( + "time" + + metrics "github.com/armon/go-metrics" +) + +// ClusterMetricSink serves as a shim around go-metrics +// and inserts a "cluster" label. +// +// It also provides a mechanism to limit the cardinality of the labels on a gauge +// (at each reporting interval, which isn't sufficient if there is variability in which +// labels are the top N) and a backoff mechanism for gauge computation. +type ClusterMetricSink struct { + // ClusterName is either the cluster ID, or a name provided + // in the telemetry configuration stanza. + ClusterName string + + MaxGaugeCardinality int + GaugeInterval time.Duration + + // Sink is the go-metrics sink to send to + Sink metrics.MetricSink +} + +// Convenience alias +type Label = metrics.Label + +func (m *ClusterMetricSink) SetGaugeWithLabels(key []string, val float32, labels []Label) { + m.Sink.SetGaugeWithLabels(key, val, + append(labels, Label{"cluster", m.ClusterName})) +} + +func (m *ClusterMetricSink) IncrCounterWithLabels(key []string, val float32, labels []Label) { + m.Sink.IncrCounterWithLabels(key, val, + append(labels, Label{"cluster", m.ClusterName})) +} + +func (m *ClusterMetricSink) AddSampleWithLabels(key []string, val float32, labels []Label) { + m.Sink.AddSampleWithLabels(key, val, + append(labels, Label{"cluster", m.ClusterName})) +} + +// BlackholeSink is a default suitable for use in unit tests. +func BlackholeSink() *ClusterMetricSink { + return &ClusterMetricSink{ + ClusterName: "", + Sink: &metrics.BlackholeSink{}, + } +} + +// SetDefaultClusterName changes the cluster name from its default value, +// if it has not previously been configured. +func (m *ClusterMetricSink) SetDefaultClusterName(clusterName string) { + if m.ClusterName == "" { + m.ClusterName = clusterName + } +} diff --git a/helper/metricsutil/wrapped_metrics_test.go b/helper/metricsutil/wrapped_metrics_test.go new file mode 100644 index 0000000000000..975159984597b --- /dev/null +++ b/helper/metricsutil/wrapped_metrics_test.go @@ -0,0 +1,102 @@ +package metricsutil + +import ( + "testing" + "time" + + "github.com/armon/go-metrics" +) + +func isLabelPresent(toFind Label, ls []Label) bool { + for _, l := range ls { + if l == toFind { + return true + } + } + return false +} + +func TestClusterLabelPresent(t *testing.T) { + testClusterName := "test-cluster" + + // Use a ridiculously long time to minimize the chance + // that we have to deal with more than one interval. + // InMemSink rounds down to an interval boundary rather than + // starting one at the time of initialization. + inmemSink := metrics.NewInmemSink( + 1000000*time.Hour, + 2000000*time.Hour) + clusterSink := &ClusterMetricSink{ + ClusterName: testClusterName, + Sink: inmemSink, + } + + key1 := []string{"aaa", "bbb"} + key2 := []string{"ccc", "ddd"} + key3 := []string{"eee", "fff"} + labels1 := []Label{{"dim1", "val1"}} + labels2 := []Label{{"dim2", "val2"}} + labels3 := []Label{{"dim3", "val3"}} + clusterLabel := Label{"cluster", testClusterName} + expectedKey1 := "aaa.bbb;dim1=val1;cluster=" + testClusterName + expectedKey2 := "ccc.ddd;dim2=val2;cluster=" + testClusterName + expectedKey3 := "eee.fff;dim3=val3;cluster=" + testClusterName + + clusterSink.SetGaugeWithLabels(key1, 1.0, labels1) + clusterSink.IncrCounterWithLabels(key2, 2.0, labels2) + clusterSink.AddSampleWithLabels(key3, 3.0, labels3) + + intervals := inmemSink.Data() + // If we start very close to the end of an interval, then our metrics might be + // split across two different buckets. We won't write the code to try to handle that. + // 100000-hours = at most once every 4167 days + if len(intervals) > 1 { + t.Skip("Detected interval crossing.") + } + + // Check Gauge + g, ok := intervals[0].Gauges[expectedKey1] + if !ok { + t.Fatal("Key", expectedKey1, "not found in map", intervals[0].Gauges) + } + if g.Value != 1.0 { + t.Error("Gauge value", g.Value, "does not match", 1.0) + } + if !isLabelPresent(labels1[0], g.Labels) { + t.Error("Gauge label", g.Labels, "does not include", labels1) + } + if !isLabelPresent(clusterLabel, g.Labels) { + t.Error("Gauge label", g.Labels, "does not include", clusterLabel) + } + + // Check Counter + c, ok := intervals[0].Counters[expectedKey2] + if !ok { + t.Fatal("Key", expectedKey2, "not found in map", intervals[0].Counters) + } + if c.Sum != 2.0 { + t.Error("Counter value", c.Sum, "does not match", 2.0) + } + if !isLabelPresent(labels2[0], c.Labels) { + t.Error("Counter label", c.Labels, "does not include", labels2) + } + if !isLabelPresent(clusterLabel, c.Labels) { + t.Error("Counter label", c.Labels, "does not include", clusterLabel) + } + + // Check Sample + s, ok := intervals[0].Samples[expectedKey3] + if !ok { + t.Fatal("Key", expectedKey3, "not found in map", intervals[0].Samples) + } + if s.Sum != 3.0 { + t.Error("Sample value", s.Sum, "does not match", 3.0) + } + if !isLabelPresent(labels3[0], s.Labels) { + t.Error("Sample label", s.Labels, "does not include", labels3) + } + if !isLabelPresent(clusterLabel, s.Labels) { + t.Error("Sample label", s.Labels, "does not include", clusterLabel) + } + +} diff --git a/vault/cluster.go b/vault/cluster.go index c674dda80b076..b189bc8ca8c34 100644 --- a/vault/cluster.go +++ b/vault/cluster.go @@ -185,6 +185,10 @@ func (c *Core) setupCluster(ctx context.Context) error { modified = true } + // This is the first point at which the stored (or newly generated) + // cluster name is known. + c.metricSink.SetDefaultClusterName(cluster.Name) + if cluster.ID == "" { c.logger.Debug("cluster ID not found, generating new") // Generate a clusterID diff --git a/vault/core.go b/vault/core.go index f70ec59fe4135..48e7ff6b23243 100644 --- a/vault/core.go +++ b/vault/core.go @@ -350,6 +350,10 @@ type Core struct { // metrics emission and sealing leading to a nil pointer metricsMutex sync.Mutex + // metricSink is the destination for all metrics that have + // a cluster label. + metricSink *metricsutil.ClusterMetricSink + defaultLeaseTTL time.Duration maxLeaseTTL time.Duration @@ -598,6 +602,7 @@ type CoreConfig struct { // Telemetry objects MetricsHelper *metricsutil.MetricsHelper + MetricSink *metricsutil.ClusterMetricSink CounterSyncInterval time.Duration @@ -699,6 +704,11 @@ func NewCore(conf *CoreConfig) (*Core, error) { conf.Logger = logging.NewVaultLogger(log.Trace) } + // Make a default metric sink if not provided + if conf.MetricSink == nil { + conf.MetricSink = metricsutil.BlackholeSink() + } + // Instantiate a non-nil raw config if none is provided if conf.RawConfig == nil { conf.RawConfig = new(server.Config) @@ -757,6 +767,7 @@ func NewCore(conf *CoreConfig) (*Core, error) { neverBecomeActive: new(uint32), clusterLeaderParams: new(atomic.Value), metricsHelper: conf.MetricsHelper, + metricSink: conf.MetricSink, secureRandomReader: conf.SecureRandomReader, rawConfig: new(atomic.Value), counters: counters{ @@ -2340,6 +2351,11 @@ func (c *Core) MetricsHelper() *metricsutil.MetricsHelper { return c.metricsHelper } +// MetricSink returns the metrics wrapper with which Core has been configured. +func (c *Core) MetricSink() *metricsutil.ClusterMetricSink { + return c.metricSink +} + // BuiltinRegistry is an interface that allows the "vault" package to use // the registry of builtin plugins without getting an import cycle. It // also allows for mocking the registry easily.