Skip to content

Commit

Permalink
Cleanup and simplify lock usage in database plugin (#15944)
Browse files Browse the repository at this point in the history
Cleanup and simplify lock usage in database plugin

Following up from discussions in #15923 and #15933, I wanted to split
out a separate PR that drastically reduced the complexity of the use of
the databaseBackend lock. We no longer need it at all for the
`credRotationQueue`, and we can move it to be solely used in a few,
small connections map management functions.

Co-authored-by: Calvin Leung Huang <1883212+calvn@users.noreply.github.com>
  • Loading branch information
swenson and calvn committed Jun 17, 2022
1 parent e83e608 commit 78373fa
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 95 deletions.
149 changes: 80 additions & 69 deletions builtin/logical/database/backend.go
Expand Up @@ -55,13 +55,8 @@ func Factory(ctx context.Context, conf *logical.BackendConfig) (logical.Backend,
}

b.credRotationQueue = queue.New()
// Create a context with a cancel method for processing any WAL entries and
// populating the queue
initCtx := context.Background()
ictx, cancel := context.WithCancel(initCtx)
b.cancelQueue = cancel
// Load queue and kickoff new periodic ticker
go b.initQueue(ictx, conf, conf.System.ReplicationState())
go b.initQueue(b.queueCtx, conf, conf.System.ReplicationState())
return b, nil
}

Expand Down Expand Up @@ -103,35 +98,76 @@ func Backend(conf *logical.BackendConfig) *databaseBackend {

b.logger = conf.Logger
b.connections = make(map[string]*dbPluginInstance)

b.queueCtx, b.cancelQueueCtx = context.WithCancel(context.Background())
b.roleLocks = locksutil.CreateLocks()

return &b
}

type databaseBackend struct {
// connLock is used to synchronize access to the connections map
connLock sync.RWMutex
// connections holds configured database connections by config name
connections map[string]*dbPluginInstance
logger log.Logger

*framework.Backend
sync.RWMutex
// CredRotationQueue is an in-memory priority queue used to track Static Roles
// credRotationQueue is an in-memory priority queue used to track Static Roles
// that require periodic rotation. Backends will have a PriorityQueue
// initialized on setup, but only backends that are mounted by a primary
// server or mounted as a local mount will perform the rotations.
//
// cancelQueue is used to remove the priority queue and terminate the
// background ticker.
credRotationQueue *queue.PriorityQueue
cancelQueue context.CancelFunc
// queueCtx is the context for the priority queue
queueCtx context.Context
// cancelQueueCtx is used to terminate the background ticker
cancelQueueCtx context.CancelFunc

// roleLocks is used to lock modifications to roles in the queue, to ensure
// concurrent requests are not modifying the same role and possibly causing
// issues with the priority queue.
roleLocks []*locksutil.LockEntry
}

func (b *databaseBackend) connGet(name string) *dbPluginInstance {
b.connLock.RLock()
defer b.connLock.RUnlock()
return b.connections[name]
}

func (b *databaseBackend) connPop(name string) *dbPluginInstance {
b.connLock.Lock()
defer b.connLock.Unlock()
dbi := b.connections[name]
delete(b.connections, name)
return dbi
}

func (b *databaseBackend) connPopIfEqual(name, id string) *dbPluginInstance {
b.connLock.Lock()
defer b.connLock.Unlock()
dbi, ok := b.connections[name]
if ok && dbi.id == id {
delete(b.connections, name)
return dbi
}
return nil
}

func (b *databaseBackend) connPut(name string, newDbi *dbPluginInstance) *dbPluginInstance {
b.connLock.Lock()
defer b.connLock.Unlock()
dbi := b.connections[name]
b.connections[name] = newDbi
return dbi
}

func (b *databaseBackend) connClear() map[string]*dbPluginInstance {
b.connLock.Lock()
defer b.connLock.Unlock()
old := b.connections
b.connections = make(map[string]*dbPluginInstance)
return old
}

func (b *databaseBackend) DatabaseConfig(ctx context.Context, s logical.Storage, name string) (*DatabaseConfig, error) {
entry, err := s.Get(ctx, fmt.Sprintf("config/%s", name))
if err != nil {
Expand Down Expand Up @@ -236,22 +272,8 @@ func (b *databaseBackend) GetConnection(ctx context.Context, s logical.Storage,
}

func (b *databaseBackend) GetConnectionWithConfig(ctx context.Context, name string, config *DatabaseConfig) (*dbPluginInstance, error) {
b.RLock()
unlockFunc := b.RUnlock
defer func() { unlockFunc() }()

dbi, ok := b.connections[name]
if ok {
return dbi, nil
}

// Upgrade lock
b.RUnlock()
b.Lock()
unlockFunc = b.Unlock

dbi, ok = b.connections[name]
if ok {
dbi := b.connGet(name)
if dbi != nil {
return dbi, nil
}

Expand Down Expand Up @@ -280,38 +302,34 @@ func (b *databaseBackend) GetConnectionWithConfig(ctx context.Context, name stri
id: id,
name: name,
}
b.connections[name] = dbi
return dbi, nil
}

// invalidateQueue cancels any background queue loading and destroys the queue.
func (b *databaseBackend) invalidateQueue() {
// cancel context before grabbing lock to start closing any open connections
// this is safe to do without the lock since it is only written to once in initialization
// and can be canceled multiple times safely
if b.cancelQueue != nil {
b.cancelQueue()
oldConn := b.connPut(name, dbi)
if oldConn != nil {
err := oldConn.Close()
if err != nil {
b.Logger().Warn("Error closing database connection", "error", err)
}
}
b.Lock()
defer b.Unlock()

b.credRotationQueue = nil
return dbi, nil
}

// ClearConnection closes the database connection and
// removes it from the b.connections map.
func (b *databaseBackend) ClearConnection(name string) error {
b.Lock()
defer b.Unlock()
return b.clearConnection(name)
db := b.connPop(name)
if db != nil {
// Ignore error here since the database client is always killed
db.Close()
}
return nil
}

func (b *databaseBackend) clearConnection(name string) error {
db, ok := b.connections[name]
if ok {
// ClearConnectionId closes the database connection with a specific id and
// removes it from the b.connections map.
func (b *databaseBackend) ClearConnectionId(name, id string) error {
db := b.connPopIfEqual(name, id)
if db != nil {
// Ignore error here since the database client is always killed
db.Close()
delete(b.connections, name)
}
return nil
}
Expand All @@ -324,33 +342,26 @@ func (b *databaseBackend) CloseIfShutdown(db *dbPluginInstance, err error) {
// and simply defer the unlock. Since we are attaching the instance and matching
// the id in the connection map, we can safely do this.
go func() {
b.Lock()
defer b.Unlock()
db.Close()

// Ensure we are deleting the correct connection
mapDB, ok := b.connections[db.name]
if ok && db.id == mapDB.id {
delete(b.connections, db.name)
}
// Delete the connection if it is still active.
b.connPopIfEqual(db.name, db.id)
}()
}
}

// clean closes all connections from all database types
// and cancels any rotation queue loading operation.
func (b *databaseBackend) clean(ctx context.Context) {
// invalidateQueue acquires it's own lock on the backend, removes queue, and
// terminates the background ticker
b.invalidateQueue()

b.Lock()
defer b.Unlock()
func (b *databaseBackend) clean(_ context.Context) {
// kill the queue and terminate the background ticker
if b.cancelQueueCtx != nil {
b.cancelQueueCtx()
}

for _, db := range b.connections {
db.Close()
connections := b.connClear()
for _, db := range connections {
go db.Close()
}
b.connections = make(map[string]*dbPluginInstance)
}

const backendHelp = `
Expand Down
10 changes: 4 additions & 6 deletions builtin/logical/database/path_config_connection.go
Expand Up @@ -344,16 +344,14 @@ func (b *databaseBackend) connectionWriteHandler() framework.OperationFunc {

b.Logger().Debug("created database object", "name", name, "plugin_name", config.PluginName)

b.Lock()
defer b.Unlock()

// Close and remove the old connection
b.clearConnection(name)

b.connections[name] = &dbPluginInstance{
oldConn := b.connPut(name, &dbPluginInstance{
database: dbw,
name: name,
id: id,
})
if oldConn != nil {
oldConn.Close()
}

err = storeConfig(ctx, req.Storage, name, config)
Expand Down
13 changes: 5 additions & 8 deletions builtin/logical/database/path_rotate_credentials.go
Expand Up @@ -78,22 +78,19 @@ func (b *databaseBackend) pathRotateRootCredentialsUpdate() framework.OperationF
return nil, err
}

// Take out the backend lock since we are swapping out the connection
b.Lock()
defer b.Unlock()

// Take the write lock on the instance
dbi.Lock()
defer dbi.Unlock()

defer func() {
dbi.Unlock()
// Even on error, still remove the connection
b.ClearConnectionId(name, dbi.id)
}()
defer func() {
// Close the plugin
dbi.closed = true
if err := dbi.database.Close(); err != nil {
b.Logger().Error("error closing the database plugin connection", "err", err)
}
// Even on error, still remove the connection
delete(b.connections, name)
}()

generator, err := newPasswordGenerator(nil)
Expand Down
21 changes: 9 additions & 12 deletions builtin/logical/database/rotation.go
Expand Up @@ -642,14 +642,11 @@ func (b *databaseBackend) loadStaticWALs(ctx context.Context, s logical.Storage)
// actually available. This is needed because both runTicker and initQueue
// operate in go-routines, and could be accessing the queue concurrently
func (b *databaseBackend) pushItem(item *queue.Item) error {
b.RLock()
unlockFunc := b.RUnlock
defer func() { unlockFunc() }()

if b.credRotationQueue != nil {
select {
case <-b.queueCtx.Done():
default:
return b.credRotationQueue.Push(item)
}

b.Logger().Warn("no queue found during push item")
return nil
}
Expand All @@ -658,9 +655,9 @@ func (b *databaseBackend) pushItem(item *queue.Item) error {
// actually available. This is needed because both runTicker and initQueue
// operate in go-routines, and could be accessing the queue concurrently
func (b *databaseBackend) popFromRotationQueue() (*queue.Item, error) {
b.RLock()
defer b.RUnlock()
if b.credRotationQueue != nil {
select {
case <-b.queueCtx.Done():
default:
return b.credRotationQueue.Pop()
}
return nil, queue.ErrEmpty
Expand All @@ -670,9 +667,9 @@ func (b *databaseBackend) popFromRotationQueue() (*queue.Item, error) {
// actually available. This is needed because both runTicker and initQueue
// operate in go-routines, and could be accessing the queue concurrently
func (b *databaseBackend) popFromRotationQueueByKey(name string) (*queue.Item, error) {
b.RLock()
defer b.RUnlock()
if b.credRotationQueue != nil {
select {
case <-b.queueCtx.Done():
default:
item, err := b.credRotationQueue.PopByKey(name)
if err != nil {
return nil, err
Expand Down

0 comments on commit 78373fa

Please sign in to comment.