Skip to content

Commit

Permalink
Add database plugin metrics around connections (#16048)
Browse files Browse the repository at this point in the history
Add database plugin metrics around connections

This is a replacement for #15923 that takes into account recent lock
cleanup.

I went ahead and added back in the hanging plugin test, which I meant to
add in #15944 but forgot.

I tested this by spinning up a statsd sink in the tests and verifying I
got a stream of metrics:

```
$ nc -u -l 8125 | grep backend
test.swenson-Q9Q0L72D39.secrets.database.backend.connections.count.pgx.5.:1.000000|g
test.swenson-Q9Q0L72D39.secrets.database.backend.connections.count.pgx.5.:0.000000|g
test.swenson-Q9Q0L72D39.secrets.database.backend.connections.count.pgx.5.:1.000000|g
test.swenson-Q9Q0L72D39.secrets.database.backend.connections.count.pgx.5.:0.000000|g
```

We have to rework the shared gauge code to work without a full
`ClusterMetricSink`, since we don't have access to the core metrics from
within a plugin.

This only reports metrics every 10 minutes by default, but it solves
some problems we would have had with the gauge values becoming stale and
needing to be re-sent.

Co-authored-by: Tom Proctor <tomhjp@users.noreply.github.com>
  • Loading branch information
swenson and tomhjp committed Jun 27, 2022
1 parent 3f9dbab commit 53bfb72
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 28 deletions.
60 changes: 58 additions & 2 deletions builtin/logical/database/backend.go
Expand Up @@ -8,9 +8,12 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/strutil"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/metricsutil"
"github.com/hashicorp/vault/internalshared/configutil"
v4 "github.com/hashicorp/vault/sdk/database/dbplugin"
v5 "github.com/hashicorp/vault/sdk/database/dbplugin/v5"
"github.com/hashicorp/vault/sdk/database/helper/dbutil"
Expand Down Expand Up @@ -57,6 +60,21 @@ func Factory(ctx context.Context, conf *logical.BackendConfig) (logical.Backend,
b.credRotationQueue = queue.New()
// Load queue and kickoff new periodic ticker
go b.initQueue(b.queueCtx, conf, conf.System.ReplicationState())

// collect metrics on number of plugin instances
var err error
b.gaugeCollectionProcess, err = metricsutil.NewGaugeCollectionProcess(
[]string{"secrets", "database", "backend", "pluginInstances", "count"},
[]metricsutil.Label{},
b.collectPluginInstanceGaugeValues,
metrics.Default(),
configutil.UsageGaugeDefaultPeriod, // TODO: add config settings for these, or add plumbing to the main config settings
configutil.MaximumGaugeCardinalityDefault,
b.logger)
if err != nil {
return nil, err
}
go b.gaugeCollectionProcess.Run()
return b, nil
}

Expand Down Expand Up @@ -103,6 +121,36 @@ func Backend(conf *logical.BackendConfig) *databaseBackend {
return &b
}

func (b *databaseBackend) collectPluginInstanceGaugeValues(context.Context) ([]metricsutil.GaugeLabelValues, error) {
// copy the map so we can release the lock
connMapCopy := func() map[string]*dbPluginInstance {
b.connLock.RLock()
defer b.connLock.RUnlock()
mapCopy := map[string]*dbPluginInstance{}
for k, v := range b.connections {
mapCopy[k] = v
}
return mapCopy
}()
counts := map[string]int{}
for _, v := range connMapCopy {
dbType, err := v.database.Type()
if err != nil {
// there's a chance this will already be closed since we don't hold the lock
continue
}
if _, ok := counts[dbType]; !ok {
counts[dbType] = 0
}
counts[dbType] += 1
}
var gauges []metricsutil.GaugeLabelValues
for k, v := range counts {
gauges = append(gauges, metricsutil.GaugeLabelValues{Labels: []metricsutil.Label{{Name: "dbType", Value: k}}, Value: float32(v)})
}
return gauges, nil
}

type databaseBackend struct {
// connLock is used to synchronize access to the connections map
connLock sync.RWMutex
Expand All @@ -125,6 +173,9 @@ type databaseBackend struct {
// concurrent requests are not modifying the same role and possibly causing
// issues with the priority queue.
roleLocks []*locksutil.LockEntry

// the running gauge collection process
gaugeCollectionProcess *metricsutil.GaugeCollectionProcess
}

func (b *databaseBackend) connGet(name string) *dbPluginInstance {
Expand All @@ -136,8 +187,10 @@ func (b *databaseBackend) connGet(name string) *dbPluginInstance {
func (b *databaseBackend) connPop(name string) *dbPluginInstance {
b.connLock.Lock()
defer b.connLock.Unlock()
dbi := b.connections[name]
delete(b.connections, name)
dbi, ok := b.connections[name]
if ok {
delete(b.connections, name)
}
return dbi
}

Expand Down Expand Up @@ -362,6 +415,9 @@ func (b *databaseBackend) clean(_ context.Context) {
for _, db := range connections {
go db.Close()
}
if b.gaugeCollectionProcess != nil {
b.gaugeCollectionProcess.Stop()
}
}

const backendHelp = `
Expand Down
83 changes: 83 additions & 0 deletions builtin/logical/database/backend_test.go
Expand Up @@ -1461,6 +1461,89 @@ func TestBackend_ConnectionURL_redacted(t *testing.T) {
}
}

type hangingPlugin struct{}

func (h hangingPlugin) Initialize(_ context.Context, req v5.InitializeRequest) (v5.InitializeResponse, error) {
return v5.InitializeResponse{
Config: req.Config,
}, nil
}

func (h hangingPlugin) NewUser(_ context.Context, _ v5.NewUserRequest) (v5.NewUserResponse, error) {
return v5.NewUserResponse{}, nil
}

func (h hangingPlugin) UpdateUser(_ context.Context, _ v5.UpdateUserRequest) (v5.UpdateUserResponse, error) {
return v5.UpdateUserResponse{}, nil
}

func (h hangingPlugin) DeleteUser(_ context.Context, _ v5.DeleteUserRequest) (v5.DeleteUserResponse, error) {
return v5.DeleteUserResponse{}, nil
}

func (h hangingPlugin) Type() (string, error) {
return "hanging", nil
}

func (h hangingPlugin) Close() error {
time.Sleep(1000 * time.Second)
return nil
}

var _ v5.Database = (*hangingPlugin)(nil)

func TestBackend_PluginMain_Hanging(t *testing.T) {
if os.Getenv(pluginutil.PluginVaultVersionEnv) == "" {
return
}
v5.Serve(&hangingPlugin{})
}

func TestBackend_AsyncClose(t *testing.T) {
// Test that having a plugin that takes a LONG time to close will not cause the cleanup function to take
// longer than 750ms.
cluster, sys := getCluster(t)
vault.TestAddTestPlugin(t, cluster.Cores[0].Core, "hanging-plugin", consts.PluginTypeDatabase, "TestBackend_PluginMain_Hanging", []string{}, "")
t.Cleanup(cluster.Cleanup)

config := logical.TestBackendConfig()
config.StorageView = &logical.InmemStorage{}
config.System = sys

b, err := Factory(context.Background(), config)
if err != nil {
t.Fatal(err)
}

// Configure a connection
data := map[string]interface{}{
"connection_url": "doesn't matter",
"plugin_name": "hanging-plugin",
"allowed_roles": []string{"plugin-role-test"},
}
req := &logical.Request{
Operation: logical.UpdateOperation,
Path: "config/hang",
Storage: config.StorageView,
Data: data,
}
_, err = b.HandleRequest(namespace.RootContext(nil), req)
if err != nil {
t.Fatalf("err: %v", err)
}
timeout := time.NewTimer(750 * time.Millisecond)
done := make(chan bool)
go func() {
b.Cleanup(context.Background())
done <- true
}()
select {
case <-timeout.C:
t.Error("Hanging plugin caused Close() to take longer than 750ms")
case <-done:
}
}

func testCredsExist(t *testing.T, resp *logical.Response, connURL string) bool {
t.Helper()
var d struct {
Expand Down
1 change: 1 addition & 0 deletions builtin/logical/database/rotation_test.go
Expand Up @@ -1379,6 +1379,7 @@ func setupMockDB(b *databaseBackend) *mockNewDatabase {
mockDB := &mockNewDatabase{}
mockDB.On("Initialize", mock.Anything, mock.Anything).Return(v5.InitializeResponse{}, nil)
mockDB.On("Close").Return(nil)
mockDB.On("Type").Return("mock", nil)
dbw := databaseVersionWrapper{
v5: mockDB,
}
Expand Down
71 changes: 54 additions & 17 deletions helper/metricsutil/gauge_process.go
Expand Up @@ -6,6 +6,7 @@ import (
"sort"
"time"

"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
)

Expand Down Expand Up @@ -60,18 +61,47 @@ type GaugeCollectionProcess struct {
collector GaugeCollector

// destination for metrics
sink *ClusterMetricSink
sink Metrics
logger log.Logger

// time between collections
originalInterval time.Duration
currentInterval time.Duration
ticker *time.Ticker

// used to help limit cardinality
maxGaugeCardinality int

// time source
clock clock
}

// NewGaugeCollectionProcess creates a new collection process for the callback
// function given as an argument, and starts it running.
// A label should be provided for metrics *about* this collection process.
//
// The Run() method must be called to start the process.
func NewGaugeCollectionProcess(
key []string,
id []Label,
collector GaugeCollector,
m metrics.MetricSink,
gaugeInterval time.Duration,
maxGaugeCardinality int,
logger log.Logger,
) (*GaugeCollectionProcess, error) {
return newGaugeCollectionProcessWithClock(
key,
id,
collector,
SinkWrapper{MetricSink: m},
gaugeInterval,
maxGaugeCardinality,
logger,
defaultClock{},
)
}

// NewGaugeCollectionProcess creates a new collection process for the callback
// function given as an argument, and starts it running.
// A label should be provided for metrics *about* this collection process.
Expand All @@ -83,41 +113,48 @@ func (m *ClusterMetricSink) NewGaugeCollectionProcess(
collector GaugeCollector,
logger log.Logger,
) (*GaugeCollectionProcess, error) {
return m.newGaugeCollectionProcessWithClock(
return newGaugeCollectionProcessWithClock(
key,
id,
collector,
m,
m.GaugeInterval,
m.MaxGaugeCardinality,
logger,
defaultClock{},
)
}

// test version allows an alternative clock implementation
func (m *ClusterMetricSink) newGaugeCollectionProcessWithClock(
func newGaugeCollectionProcessWithClock(
key []string,
id []Label,
collector GaugeCollector,
sink Metrics,
gaugeInterval time.Duration,
maxGaugeCardinality int,
logger log.Logger,
clock clock,
) (*GaugeCollectionProcess, error) {
process := &GaugeCollectionProcess{
stop: make(chan struct{}, 1),
stopped: make(chan struct{}, 1),
key: key,
labels: id,
collector: collector,
sink: m,
originalInterval: m.GaugeInterval,
currentInterval: m.GaugeInterval,
logger: logger,
clock: clock,
stop: make(chan struct{}, 1),
stopped: make(chan struct{}, 1),
key: key,
labels: id,
collector: collector,
sink: sink,
originalInterval: gaugeInterval,
currentInterval: gaugeInterval,
maxGaugeCardinality: maxGaugeCardinality,
logger: logger,
clock: clock,
}
return process, nil
}

// delayStart randomly delays by up to one extra interval
// so that collection processes do not all run at the time time.
// If we knew all the procsses in advance, we could just schedule them
// so that collection processes do not all run at the time.
// If we knew all the processes in advance, we could just schedule them
// evenly, but a new one could be added per secret engine.
func (p *GaugeCollectionProcess) delayStart() bool {
randomDelay := time.Duration(rand.Int63n(int64(p.currentInterval)))
Expand Down Expand Up @@ -187,11 +224,11 @@ func (p *GaugeCollectionProcess) collectAndFilterGauges() {
// Filter to top N.
// This does not guarantee total cardinality is <= N, but it does slow things down
// a little if the cardinality *is* too high and the gauge needs to be disabled.
if len(values) > p.sink.MaxGaugeCardinality {
if len(values) > p.maxGaugeCardinality {
sort.Slice(values, func(a, b int) bool {
return values[a].Value > values[b].Value
})
values = values[:p.sink.MaxGaugeCardinality]
values = values[:p.maxGaugeCardinality]
}

p.streamGaugesToSink(values)
Expand Down

0 comments on commit 53bfb72

Please sign in to comment.