Skip to content

Commit

Permalink
Cleanup and simplify lock usage in database plugin
Browse files Browse the repository at this point in the history
Following up from discussions in #15923 and #15933, I wanted to split
out a separate PR that drastically reduced the complexity of the use of
the databaseBackend lock. We no longer need it at all for the
`credRotationQueue`, and we can move it to be solely used in a few,
small connections map management functions.
  • Loading branch information
Christopher Swenson committed Jun 10, 2022
1 parent 28119df commit bc24972
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 84 deletions.
125 changes: 64 additions & 61 deletions builtin/logical/database/backend.go
Expand Up @@ -58,10 +58,9 @@ func Factory(ctx context.Context, conf *logical.BackendConfig) (logical.Backend,
// Create a context with a cancel method for processing any WAL entries and
// populating the queue
initCtx := context.Background()
ictx, cancel := context.WithCancel(initCtx)
b.cancelQueue = cancel
b.ctx, b.cancelQueue = context.WithCancel(initCtx)
// Load queue and kickoff new periodic ticker
go b.initQueue(ictx, conf, conf.System.ReplicationState())
go b.initQueue(b.ctx, conf, conf.System.ReplicationState())
return b, nil
}

Expand Down Expand Up @@ -110,28 +109,61 @@ func Backend(conf *logical.BackendConfig) *databaseBackend {
}

type databaseBackend struct {
// used to synchronize access to the connections map
connLock sync.RWMutex
// connections holds configured database connections by config name
connections map[string]*dbPluginInstance
logger log.Logger

*framework.Backend
sync.RWMutex
// CredRotationQueue is an in-memory priority queue used to track Static Roles
// credRotationQueue is an in-memory priority queue used to track Static Roles
// that require periodic rotation. Backends will have a PriorityQueue
// initialized on setup, but only backends that are mounted by a primary
// server or mounted as a local mount will perform the rotations.
//
// cancelQueue is used to remove the priority queue and terminate the
// background ticker.
credRotationQueue *queue.PriorityQueue
cancelQueue context.CancelFunc
// context used for canceling operations
ctx context.Context
cancelQueue context.CancelFunc

// roleLocks is used to lock modifications to roles in the queue, to ensure
// concurrent requests are not modifying the same role and possibly causing
// issues with the priority queue.
roleLocks []*locksutil.LockEntry
}

func (b *databaseBackend) connGet(name string) *dbPluginInstance {
b.connLock.RLock()
defer b.connLock.RUnlock()
return b.connections[name]
}

func (b *databaseBackend) connPop(name string) *dbPluginInstance {
b.connLock.Lock()
defer b.connLock.Unlock()
dbi := b.connections[name]
delete(b.connections, name)
return dbi
}

func (b *databaseBackend) connPut(name string, newDbi *dbPluginInstance) *dbPluginInstance {
b.connLock.Lock()
defer b.connLock.Unlock()
dbi := b.connections[name]
b.connections[name] = newDbi
return dbi
}

func (b *databaseBackend) connClear() map[string]*dbPluginInstance {
b.connLock.Lock()
defer b.connLock.Unlock()
old := b.connections
b.connections = make(map[string]*dbPluginInstance)
return old
}

func (b *databaseBackend) DatabaseConfig(ctx context.Context, s logical.Storage, name string) (*DatabaseConfig, error) {
entry, err := s.Get(ctx, fmt.Sprintf("config/%s", name))
if err != nil {
Expand Down Expand Up @@ -236,22 +268,8 @@ func (b *databaseBackend) GetConnection(ctx context.Context, s logical.Storage,
}

func (b *databaseBackend) GetConnectionWithConfig(ctx context.Context, name string, config *DatabaseConfig) (*dbPluginInstance, error) {
b.RLock()
unlockFunc := b.RUnlock
defer func() { unlockFunc() }()

dbi, ok := b.connections[name]
if ok {
return dbi, nil
}

// Upgrade lock
b.RUnlock()
b.Lock()
unlockFunc = b.Unlock

dbi, ok = b.connections[name]
if ok {
dbi := b.connGet(name)
if dbi != nil {
return dbi, nil
}

Expand Down Expand Up @@ -280,38 +298,23 @@ func (b *databaseBackend) GetConnectionWithConfig(ctx context.Context, name stri
id: id,
name: name,
}
b.connections[name] = dbi
return dbi, nil
}

// invalidateQueue cancels any background queue loading and destroys the queue.
func (b *databaseBackend) invalidateQueue() {
// cancel context before grabbing lock to start closing any open connections
// this is safe to do without the lock since it is only written to once in initialization
// and can be canceled multiple times safely
if b.cancelQueue != nil {
b.cancelQueue()
oldConn := b.connPut(name, dbi)
if oldConn != nil {
err := oldConn.Close()
if err != nil {
b.Logger().Warn("Error closing database connection", "error", err)
}
}
b.Lock()
defer b.Unlock()

b.credRotationQueue = nil
return dbi, nil
}

// ClearConnection closes the database connection and
// removes it from the b.connections map.
func (b *databaseBackend) ClearConnection(name string) error {
b.Lock()
defer b.Unlock()
return b.clearConnection(name)
}

func (b *databaseBackend) clearConnection(name string) error {
db, ok := b.connections[name]
if ok {
db := b.connPop(name)
if db != nil {
// Ignore error here since the database client is always killed
db.Close()
delete(b.connections, name)
}
return nil
}
Expand All @@ -324,33 +327,33 @@ func (b *databaseBackend) CloseIfShutdown(db *dbPluginInstance, err error) {
// and simply defer the unlock. Since we are attaching the instance and matching
// the id in the connection map, we can safely do this.
go func() {
b.Lock()
defer b.Unlock()
db.Close()

// Ensure we are deleting the correct connection
mapDB, ok := b.connections[db.name]
if ok && db.id == mapDB.id {
delete(b.connections, db.name)
mapDB := b.connPop(db.name)
if mapDB != nil && db.id != mapDB.id {
// oops, put it back
oldDbi := b.connPut(db.name, mapDB)
if oldDbi != nil {
// there is a small chance that something else was inserted in that slot during that time
// if so, clean it up
oldDbi.Close()
}
}
}()
}
}

// clean closes all connections from all database types
// and cancels any rotation queue loading operation.
func (b *databaseBackend) clean(ctx context.Context) {
// invalidateQueue acquires it's own lock on the backend, removes queue, and
// terminates the background ticker
b.invalidateQueue()
func (b *databaseBackend) clean(_ context.Context) {
// kill the queue and terminate the background ticker
b.cancelQueue()

b.Lock()
defer b.Unlock()

for _, db := range b.connections {
db.Close()
connections := b.connClear()
for _, db := range connections {
go db.Close()
}
b.connections = make(map[string]*dbPluginInstance)
}

const backendHelp = `
Expand Down
10 changes: 4 additions & 6 deletions builtin/logical/database/path_config_connection.go
Expand Up @@ -344,16 +344,14 @@ func (b *databaseBackend) connectionWriteHandler() framework.OperationFunc {

b.Logger().Debug("created database object", "name", name, "plugin_name", config.PluginName)

b.Lock()
defer b.Unlock()

// Close and remove the old connection
b.clearConnection(name)

b.connections[name] = &dbPluginInstance{
oldConn := b.connPut(name, &dbPluginInstance{
database: dbw,
name: name,
id: id,
})
if oldConn != nil {
oldConn.Close()
}

err = storeConfig(ctx, req.Storage, name, config)
Expand Down
6 changes: 1 addition & 5 deletions builtin/logical/database/path_rotate_credentials.go
Expand Up @@ -78,10 +78,6 @@ func (b *databaseBackend) pathRotateRootCredentialsUpdate() framework.OperationF
return nil, err
}

// Take out the backend lock since we are swapping out the connection
b.Lock()
defer b.Unlock()

// Take the write lock on the instance
dbi.Lock()
defer dbi.Unlock()
Expand All @@ -93,7 +89,7 @@ func (b *databaseBackend) pathRotateRootCredentialsUpdate() framework.OperationF
b.Logger().Error("error closing the database plugin connection", "err", err)
}
// Even on error, still remove the connection
delete(b.connections, name)
b.ClearConnection(name)
}()

generator, err := newPasswordGenerator(nil)
Expand Down
21 changes: 9 additions & 12 deletions builtin/logical/database/rotation.go
Expand Up @@ -642,14 +642,11 @@ func (b *databaseBackend) loadStaticWALs(ctx context.Context, s logical.Storage)
// actually available. This is needed because both runTicker and initQueue
// operate in go-routines, and could be accessing the queue concurrently
func (b *databaseBackend) pushItem(item *queue.Item) error {
b.RLock()
unlockFunc := b.RUnlock
defer func() { unlockFunc() }()

if b.credRotationQueue != nil {
select {
case <-b.ctx.Done():
default:
return b.credRotationQueue.Push(item)
}

b.Logger().Warn("no queue found during push item")
return nil
}
Expand All @@ -658,9 +655,9 @@ func (b *databaseBackend) pushItem(item *queue.Item) error {
// actually available. This is needed because both runTicker and initQueue
// operate in go-routines, and could be accessing the queue concurrently
func (b *databaseBackend) popFromRotationQueue() (*queue.Item, error) {
b.RLock()
defer b.RUnlock()
if b.credRotationQueue != nil {
select {
case <-b.ctx.Done():
default:
return b.credRotationQueue.Pop()
}
return nil, queue.ErrEmpty
Expand All @@ -670,9 +667,9 @@ func (b *databaseBackend) popFromRotationQueue() (*queue.Item, error) {
// actually available. This is needed because both runTicker and initQueue
// operate in go-routines, and could be accessing the queue concurrently
func (b *databaseBackend) popFromRotationQueueByKey(name string) (*queue.Item, error) {
b.RLock()
defer b.RUnlock()
if b.credRotationQueue != nil {
select {
case <-b.ctx.Done():
default:
item, err := b.credRotationQueue.PopByKey(name)
if err != nil {
return nil, err
Expand Down

0 comments on commit bc24972

Please sign in to comment.