Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added HA backend for postgres based on dynamodb model #5731

Merged
merged 26 commits into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
982b139
Added HA backend for postgres
bjorndolk Oct 23, 2018
1768f1d
Docker support for postgres backend testing
bjorndolk Dec 10, 2018
ed6ed5a
Bug in handling of postgres connection url for non docker testing
bjorndolk Dec 10, 2018
058731e
Fix HA enabled check in test
bjorndolk Dec 20, 2018
40bac6f
Merge branch 'physical-postgres-docker-test' into postgres-ha-support
bjorndolk Dec 21, 2018
68c2f9b
Formatting + actually configure test to run HA tests
bjorndolk Dec 21, 2018
2f3f712
Test should fail if it cannot retrieve pg version
bjorndolk Dec 21, 2018
0402379
Merge branch 'physical-postgres-docker-test' into postgres-ha-support
bjorndolk Dec 21, 2018
59d81fa
internal helperfunctions pascalCasing
bjorndolk Dec 21, 2018
aaa6d6b
Merge remote-tracking branch 'upstream/master' into postgres-ha-support
bjorndolk Jan 9, 2019
9baeb6f
Changed lock.Value behaviour to always return held when a lock value …
bjorndolk Jan 10, 2019
e3ed04a
Remove PostgreSQLLock.held: it doesn't add anything except a failure …
ncabatoff Apr 16, 2019
a5f45e1
Merge pull request #1 from ncabatoff/ncabatoff-postgres-ha-support
bjorndolk Apr 17, 2019
b12a13b
Added doc stating need of 9.5 for HA backend
bjorndolk Apr 17, 2019
689a563
Merge remote-tracking branch 'upstream/master' into postgres-ha-support
bjorndolk Apr 18, 2019
4d97c84
Address review comments.
ncabatoff Apr 25, 2019
e4b4fdb
Value(): don't treat "no lock found" case as an error.
ncabatoff Apr 25, 2019
4fa185c
Update physical/postgresql/postgresql.go
kalafut Apr 26, 2019
d61305b
Update physical/postgresql/postgresql.go
kalafut Apr 26, 2019
4e258e4
Update physical/postgresql/postgresql.go
kalafut Apr 26, 2019
090252b
Update physical/postgresql/postgresql.go
kalafut Apr 26, 2019
aaa6848
Update physical/postgresql/postgresql_test.go
kalafut Apr 26, 2019
f5c4d9c
Merge pull request #2 from ncabatoff/postgres-ha-support-review-fixes
bjorndolk Apr 26, 2019
ebbfbf2
Merge remote-tracking branch 'upstream/master' into postgres-ha-support
bjorndolk Apr 26, 2019
993fed1
Verify user has PG >= 9.5 before we allow HA to be enabled.
ncabatoff Apr 26, 2019
48643db
Merge branch 'master' of ssh://github.com-ncabatoff/hashicorp/vault i…
ncabatoff May 10, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
236 changes: 231 additions & 5 deletions physical/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,45 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/hashicorp/errwrap"
"github.com/hashicorp/vault/sdk/physical"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"

metrics "github.com/armon/go-metrics"
"github.com/armon/go-metrics"
"github.com/lib/pq"
)

const (

// The lock TTL matches the default that Consul API uses, 15 seconds.
// Used as part of SQL commands to set/extend lock expiry time relative to
// database clock.
PostgreSQLLockTTLSeconds = 15

// The amount of time to wait between the lock renewals
PostgreSQLLockRenewInterval = 5 * time.Second

// PostgreSQLLockRetryInterval is the amount of time to wait
// if a lock fails before trying again.
PostgreSQLLockRetryInterval = time.Second
)

// Verify PostgreSQLBackend satisfies the correct interfaces
var _ physical.Backend = (*PostgreSQLBackend)(nil)

//
// HA backend was implemented based on the DynamoDB backend pattern
// With distinction using central postgres clock, hereby avoiding
// possible issues with multiple clocks
//
var _ physical.HABackend = (*PostgreSQLBackend)(nil)
var _ physical.Lock = (*PostgreSQLLock)(nil)

// PostgreSQL Backend is a physical backend that stores data
// within a PostgreSQL database.
type PostgreSQLBackend struct {
Expand All @@ -29,8 +54,34 @@ type PostgreSQLBackend struct {
get_query string
delete_query string
list_query string
logger log.Logger
permitPool *physical.PermitPool

ha_table string
haGetLockValueQuery string
haUpsertLockIdentityExec string
haDeleteLockExec string

ncabatoff marked this conversation as resolved.
Show resolved Hide resolved
haEnabled bool
logger log.Logger
permitPool *physical.PermitPool
}

// PostgreSQLLock implements a lock using an PostgreSQL client.
type PostgreSQLLock struct {
backend *PostgreSQLBackend
value, key string
identity string
lock sync.Mutex

renewTicker *time.Ticker

// ttlSeconds is how long a lock is valid for
ttlSeconds int

// renewInterval is how much time to wait between lock renewals. must be << ttl
renewInterval time.Duration

// retryInterval is how much time to wait between attempts to grab the lock
retryInterval time.Duration
}

// NewPostgreSQLBackend constructs a PostgreSQL backend using the given
Expand Down Expand Up @@ -63,6 +114,12 @@ func NewPostgreSQLBackend(conf map[string]string, logger log.Logger) (physical.B
maxParInt = physical.DefaultParallelOperations
}

var hae bool = false
bjorndolk marked this conversation as resolved.
Show resolved Hide resolved
haestr, ok := conf["ha_enabled"]
bjorndolk marked this conversation as resolved.
Show resolved Hide resolved
if ok && haestr == "true" {
hae = true
}

// Create PostgreSQL handle for the database.
db, err := sql.Open("postgres", connURL)
if err != nil {
Expand All @@ -88,6 +145,12 @@ func NewPostgreSQLBackend(conf map[string]string, logger log.Logger) (physical.B
" UPDATE SET (parent_path, path, key, value) = ($1, $2, $3, $4)"
}

unquoted_ha_table, ok := conf["ha_table"]
if !ok {
unquoted_ha_table = "vault_ha_locks"
}
quoted_ha_table := pq.QuoteIdentifier(unquoted_ha_table)

// Setup the backend.
m := &PostgreSQLBackend{
table: quoted_table,
Expand All @@ -96,10 +159,25 @@ func NewPostgreSQLBackend(conf map[string]string, logger log.Logger) (physical.B
get_query: "SELECT value FROM " + quoted_table + " WHERE path = $1 AND key = $2",
delete_query: "DELETE FROM " + quoted_table + " WHERE path = $1 AND key = $2",
list_query: "SELECT key FROM " + quoted_table + " WHERE path = $1" +
"UNION SELECT DISTINCT substring(substr(path, length($1)+1) from '^.*?/') FROM " +
quoted_table + " WHERE parent_path LIKE $1 || '%'",
" UNION SELECT DISTINCT substring(substr(path, length($1)+1) from '^.*?/') FROM " + quoted_table +
" WHERE parent_path LIKE $1 || '%'",
haGetLockValueQuery:
//only read non expired data
bjorndolk marked this conversation as resolved.
Show resolved Hide resolved
" SELECT ha_value FROM " + quoted_ha_table + " WHERE NOW() <= valid_until AND ha_key = $1 ",
haUpsertLockIdentityExec:
// $1=identity $2=ha_key $3=ha_value $4=TTL in seconds
//update either steal expired lock OR update expiry for lock owned by me
bjorndolk marked this conversation as resolved.
Show resolved Hide resolved
" INSERT INTO " + quoted_ha_table + " as t (ha_identity, ha_key, ha_value, valid_until) VALUES ($1, $2, $3, NOW() + $4 * INTERVAL '1 seconds' ) " +
" ON CONFLICT (ha_key) DO " +
" UPDATE SET (ha_identity, ha_key, ha_value, valid_until) = ($1, $2, $3, NOW() + $4 * INTERVAL '1 seconds') " +
" WHERE (t.valid_until < NOW() AND t.ha_key = $2) OR " +
bjorndolk marked this conversation as resolved.
Show resolved Hide resolved
" (t.ha_identity = $1 AND t.ha_key = $2) ",
haDeleteLockExec:
//$1=ha_identity $2=ha_key
bjorndolk marked this conversation as resolved.
Show resolved Hide resolved
" DELETE FROM " + quoted_ha_table + " WHERE ha_identity=$1 AND ha_key=$2 ",
logger: logger,
permitPool: physical.NewPermitPool(maxParInt),
haEnabled: hae,
}

return m, nil
Expand Down Expand Up @@ -213,3 +291,151 @@ func (m *PostgreSQLBackend) List(ctx context.Context, prefix string) ([]string,

return keys, nil
}

// LockWith is used for mutual exclusion based on the given key.
func (p *PostgreSQLBackend) LockWith(key, value string) (physical.Lock, error) {
identity, err := uuid.GenerateUUID()
if err != nil {
return nil, err
}
return &PostgreSQLLock{
backend: p,
key: key,
value: value,
identity: identity,
ttlSeconds: PostgreSQLLockTTLSeconds,
renewInterval: PostgreSQLLockRenewInterval,
retryInterval: PostgreSQLLockRetryInterval,
}, nil
}

func (p *PostgreSQLBackend) HAEnabled() bool {
return p.haEnabled
}

// Lock tries to acquire the lock by repeatedly trying to create a record in the
// PostgreSQL table. It will block until either the stop channel is closed or
// the lock could be acquired successfully. The returned channel will be closed
// once the lock in the PostgreSQL table cannot be renewed, either due to an
// error speaking to PostgresSQL or because someone else has taken it.
func (l *PostgreSQLLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
l.lock.Lock()
defer l.lock.Unlock()

var (
success = make(chan struct{})
errors = make(chan error)
leader = make(chan struct{})
)
// try to acquire the lock asynchronously
go l.tryToLock(stopCh, success, errors)
bjorndolk marked this conversation as resolved.
Show resolved Hide resolved

select {
case <-success:
// after acquiring it successfully, we must renew the lock periodically
l.renewTicker = time.NewTicker(l.renewInterval)
go l.periodicallyRenewLock(leader)
bjorndolk marked this conversation as resolved.
Show resolved Hide resolved
case err := <-errors:
return nil, err
case <-stopCh:
return nil, nil
}

return leader, nil
}

// Unlock releases the lock by deleting the lock record from the
// PostgreSQL table.
func (l *PostgreSQLLock) Unlock() error {
pg := l.backend
pg.permitPool.Acquire()
ncabatoff marked this conversation as resolved.
Show resolved Hide resolved
defer pg.permitPool.Release()

if l.renewTicker != nil {
l.renewTicker.Stop()
}

//Delete lock owned by me
bjorndolk marked this conversation as resolved.
Show resolved Hide resolved
_, err := pg.client.Exec(pg.haDeleteLockExec, l.identity, l.key)
return err
}

// Value checks whether or not the lock is held by any instance of PostgreSQLLock,
// including this one, and returns the current value.
func (l *PostgreSQLLock) Value() (bool, string, error) {
pg := l.backend
pg.permitPool.Acquire()
defer pg.permitPool.Release()
var result string
err := pg.client.QueryRow(pg.haGetLockValueQuery, l.key).Scan(&result)

if err != nil {
bjorndolk marked this conversation as resolved.
Show resolved Hide resolved
return false, "", err
}

return true, result, nil
}

// tryToLock tries to create a new item in PostgreSQL every `retryInterval`.
// As long as the item cannot be created (because it already exists), it will
// be retried. If the operation fails due to an error, it is sent to the errors
// channel. When the lock could be acquired successfully, the success channel
// is closed.
func (l *PostgreSQLLock) tryToLock(stop <-chan struct{}, success chan struct{}, errors chan error) {
ticker := time.NewTicker(l.retryInterval)

for {
select {
case <-stop:
ticker.Stop()
case <-ticker.C:
gotlock, err := l.writeItem()
switch {
case err != nil:
errors <- err
return
bjorndolk marked this conversation as resolved.
Show resolved Hide resolved
case gotlock:
ticker.Stop()
close(success)
return
}
}
}
}

func (l *PostgreSQLLock) periodicallyRenewLock(done chan struct{}) {
for range l.renewTicker.C {
gotlock, err := l.writeItem()
if err != nil || !gotlock {
close(done)
l.renewTicker.Stop()
return
}
}
}

// Attempts to put/update the PostgreSQL item using condition expressions to
// evaluate the TTL. Returns true if the lock was obtained, false if not.
// If false error may be nil or non-nil: nil indicates simply that someone
// else has the lock, whereas non-nil means that something unexpected happened.
func (l *PostgreSQLLock) writeItem() (bool, error) {
pg := l.backend
pg.permitPool.Acquire()
defer pg.permitPool.Release()

//Try steal lock or update expiry on my lock
bjorndolk marked this conversation as resolved.
Show resolved Hide resolved

sqlResult, err := pg.client.Exec(pg.haUpsertLockIdentityExec, l.identity, l.key, l.value, l.ttlSeconds)
if err != nil {
return false, err
}
if sqlResult == nil {
return false, fmt.Errorf("no error but no sql result")
bjorndolk marked this conversation as resolved.
Show resolved Hide resolved
}

ar, err := sqlResult.RowsAffected()
if err != nil {
return false, err
}
return ar == 1, nil
}