Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres: use pg_try_advisory_lock instead of pg_advisory_lock #962

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions database/postgres/README.md
Expand Up @@ -9,6 +9,7 @@
| `x-statement-timeout` | `StatementTimeout` | Abort any statement that takes more than the specified number of milliseconds |
| `x-multi-statement` | `MultiStatementEnabled` | Enable multi-statement execution (default: false) |
| `x-multi-statement-max-size` | `MultiStatementMaxSize` | Maximum size of single statement in bytes (default: 10MB) |
| `x-lock-retry-max-interval` | `Locking` | When acquiring a lock fails, retries are used with an exponential backoff. This parameter specifies what is the maximum interval between retries (default: 1000ms) |
| `dbname` | `DatabaseName` | The name of the database to connect to |
| `search_path` | | This variable specifies the order in which schemas are searched when an object is referenced by a simple name with no schema specified. |
| `user` | | The user to sign in as |
Expand Down
86 changes: 74 additions & 12 deletions database/postgres/postgres.go
Expand Up @@ -16,6 +16,7 @@ import (

"go.uber.org/atomic"

"github.com/cenkalti/backoff/v4"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/multistmt"
Expand All @@ -34,6 +35,9 @@ var (

DefaultMigrationsTable = "schema_migrations"
DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB

DefaultLockInitialRetryInterval = 100 * time.Millisecond
DefaultLockMaxRetryInterval = 1000 * time.Millisecond
)

var (
Expand All @@ -53,6 +57,17 @@ type Config struct {
migrationsTableName string
StatementTimeout time.Duration
MultiStatementMaxSize int
Locking LockConfig
}

type LockConfig struct {
// InitialRetryInterval the initial (minimum) retry interval used for exponential backoff
// to try acquire a lock
InitialRetryInterval time.Duration

// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches this limit,
// the retry interval remains the same
MaxRetryInterval time.Duration
}

type Postgres struct {
Expand Down Expand Up @@ -167,7 +182,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) {
if s := purl.Query().Get("x-migrations-table-quoted"); len(s) > 0 {
migrationsTableQuoted, err = strconv.ParseBool(s)
if err != nil {
return nil, fmt.Errorf("Unable to parse option x-migrations-table-quoted: %w", err)
return nil, fmt.Errorf("unable to parse option x-migrations-table-quoted: %w", err)
}
}
if (len(migrationsTable) > 0) && (migrationsTableQuoted) && ((migrationsTable[0] != '"') || (migrationsTable[len(migrationsTable)-1] != '"')) {
Expand Down Expand Up @@ -198,7 +213,22 @@ func (p *Postgres) Open(url string) (database.Driver, error) {
if s := purl.Query().Get("x-multi-statement"); len(s) > 0 {
multiStatementEnabled, err = strconv.ParseBool(s)
if err != nil {
return nil, fmt.Errorf("Unable to parse option x-multi-statement: %w", err)
return nil, fmt.Errorf("unable to parse option x-multi-statement: %w", err)
}
}

lockConfig := LockConfig{
InitialRetryInterval: DefaultLockInitialRetryInterval,
MaxRetryInterval: DefaultLockMaxRetryInterval,
}
if s := purl.Query().Get("x-lock-retry-max-interval"); len(s) > 0 {
maxRetryIntervalMillis, err := strconv.Atoi(s)
if err != nil {
return nil, fmt.Errorf("unable to parse option x-lock-retry-max-interval: %w", err)
}
maxRetryInterval := time.Duration(maxRetryIntervalMillis)
if maxRetryInterval > DefaultLockInitialRetryInterval {
lockConfig.MaxRetryInterval = maxRetryInterval
}
}

Expand All @@ -209,6 +239,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) {
StatementTimeout: time.Duration(statementTimeout) * time.Millisecond,
MultiStatementEnabled: multiStatementEnabled,
MultiStatementMaxSize: multiStatementMaxSize,
Locking: lockConfig,
})

if err != nil {
Expand All @@ -231,24 +262,45 @@ func (p *Postgres) Close() error {
return nil
}

// Lock tries to acquire an advisory lock and retries indefinitely with an exponential backoff strategy
// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
func (p *Postgres) Lock() error {
return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error {
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
if err != nil {
return err
}
backOff := p.config.Locking.nonStopBackoff()
err := backoff.Retry(func() error {
ok, err := p.tryLock()
if err != nil {
return fmt.Errorf("p.tryLock: %w", err)
}

// This will wait indefinitely until the lock can be acquired.
query := `SELECT pg_advisory_lock($1)`
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
}
if ok {
return nil
}

return nil
return fmt.Errorf("could not acquire lock")
}, backOff)

return err
})
}

func (p *Postgres) tryLock() (bool, error) {
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
if err != nil {
return false, err
}

// https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
// should always return true or false
query := `SELECT pg_try_advisory_lock($1)`
var ok bool
if err := p.conn.QueryRowContext(context.Background(), query, aid).Scan(&ok); err != nil {
return false, &database.Error{OrigErr: err, Err: "pg_try_advisory_lock failed", Query: []byte(query)}
}

return ok, nil
}

func (p *Postgres) Unlock() error {
return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error {
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
Expand Down Expand Up @@ -491,3 +543,13 @@ func (p *Postgres) ensureVersionTable() (err error) {

return nil
}

func (l *LockConfig) nonStopBackoff() backoff.BackOff {
b := backoff.NewExponentialBackOff()
b.InitialInterval = l.InitialRetryInterval
b.MaxInterval = l.MaxRetryInterval
b.MaxElapsedTime = 0 // this backoff won't stop
b.Reset()

return b
}
45 changes: 45 additions & 0 deletions database/postgres/postgres_test.go
Expand Up @@ -96,6 +96,7 @@ func Test(t *testing.T) {
t.Run("testFailToCreateTableWithoutPermissions", testFailToCreateTableWithoutPermissions)
t.Run("testCheckBeforeCreateTable", testCheckBeforeCreateTable)
t.Run("testParallelSchema", testParallelSchema)
t.Run("testPostgresConcurrentMigrations", testPostgresConcurrentMigrations)
t.Run("testPostgresLock", testPostgresLock)
t.Run("testWithInstanceConcurrent", testWithInstanceConcurrent)
t.Run("testWithConnection", testWithConnection)
Expand Down Expand Up @@ -628,6 +629,50 @@ func testParallelSchema(t *testing.T) {
})
}

func testPostgresConcurrentMigrations(t *testing.T) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
// GIVEN - a set of concurrent processes running migrations
const concurrency = 3
var wg sync.WaitGroup

ip, port, err := c.FirstPort()
if err != nil {
t.Fatal(err)
}
addr := pgConnectionString(ip, port, "x-lock-retry-max-interval=2000")

// WHEN
for i := 0; i < concurrency; i++ {
wg.Add(1)

go func() {
defer wg.Done()

p := &Postgres{}
d, err := p.Open(addr)
if err != nil {
t.Error(err)
}
defer func() {
if err := d.Close(); err != nil {
t.Error(err)
}
}()

m, err := migrate.NewWithDatabaseInstance("file://./examples/migrations", "postgres", d)
if err != nil {
t.Error(err)
}
dt.TestMigrate(t, m)
}()
}

wg.Wait()

// THEN
})
}

func testPostgresLock(t *testing.T) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
ip, port, err := c.FirstPort()
Expand Down