Skip to content

Commit

Permalink
chore: align implementation with the yugabytedb
Browse files Browse the repository at this point in the history
  • Loading branch information
vgarvardt committed Apr 20, 2024
1 parent 716e5f8 commit 4523c3f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 53 deletions.
36 changes: 18 additions & 18 deletions database/cockroachdb/README.md
Expand Up @@ -2,21 +2,21 @@

`cockroachdb://user:password@host:port/dbname?query` (`cockroach://`, and `crdb-postgres://` work, too)

| URL Query | WithInstance Config | Description |
|-----------------------------|------------------------|---------------------------------------------------------------------------------------------------------------------------|
| `x-migrations-table` | `MigrationsTable` | Name of the migrations table |
| `x-lock-table` | `LockTable` | Name of the table which maintains the migration lock |
| `x-force-lock` | `ForceLock` | Force lock acquisition to fix faulty migrations which may not have released the schema lock (Boolean, default is `false`) |
| `x-lock-wait` | `LockWait` | If the lock is already acquired - wait for it to be released (Boolean, default is `false`) |
| `x-lock-wait-timeout` | `LockWaitTimeout` | Max time to wait for the lock to be released (time.Duration, deefault is `30s`) |
| `x-lock-wait-poll-interval` | `LockWaitPollInterval` | How often should acquire a lock retry to happen (time.Duration, default is `1s`) |
| `dbname` | `DatabaseName` | The name of the database to connect to |
| `user` | | The user to sign in as |
| `password` | | The user's password |
| `host` | | The host to connect to. Values that start with / are for unix domain sockets. (default is localhost) |
| `port` | | The port to bind to. (default is 5432) |
| `connect_timeout` | | Maximum wait for connection, in seconds. Zero or not specified means wait indefinitely. |
| `sslcert` | | Cert file location. The file must contain PEM encoded data. |
| `sslkey` | | Key file location. The file must contain PEM encoded data. |
| `sslrootcert` | | The location of the root certificate file. The file must contain PEM encoded data. |
| `sslmode` | | Whether or not to use SSL (disable\|require\|verify-ca\|verify-full) |
| URL Query | WithInstance Config | Description |
|----------------------------|-----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `x-migrations-table` | `MigrationsTable` | Name of the migrations table |
| `x-lock-table` | `LockTable` | Name of the table which maintains the migration lock |
| `x-force-lock` | `ForceLock` | Force lock acquisition to fix faulty migrations which may not have released the schema lock (Boolean, default is `false`) |
| `x-max-retries` | `MaxRetries` | How many times retry queries on retryable errors (40001, 40P01, 08006, XX000). Default is 0 to keep the backward compatibility with the initial implementation |
| `x-max-retry-interval` | `MaxRetryInterval` | Interval between retries increases exponentially. This option specifies maximum duration between retries. Default is `15s` |
| `x-max-retry-elapsed-time` | `MaxRetryElapsedTime` | Total retries timeout. Default is `30s` |
| `dbname` | `DatabaseName` | The name of the database to connect to |
| `user` | | The user to sign in as |
| `password` | | The user's password |
| `host` | | The host to connect to. Values that start with / are for unix domain sockets. (default is localhost) |
| `port` | | The port to bind to. (default is 5432) |
| `connect_timeout` | | Maximum wait for connection, in seconds. Zero or not specified means wait indefinitely. |
| `sslcert` | | Cert file location. The file must contain PEM encoded data. |
| `sslkey` | | Key file location. The file must contain PEM encoded data. |
| `sslrootcert` | | The location of the root certificate file. The file must contain PEM encoded data. |
| `sslmode` | | Whether or not to use SSL (disable\|require\|verify-ca\|verify-full) |
79 changes: 46 additions & 33 deletions database/cockroachdb/cockroachdb.go
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/cockroachdb/cockroach-go/v2/crdb"
"github.com/hashicorp/go-multierror"
"github.com/lib/pq"
Expand All @@ -27,10 +28,13 @@ func init() {
database.Register("crdb-postgres", &db)
}

const (
DefaultMaxRetryInterval = time.Second * 15
DefaultMaxRetryElapsedTime = time.Second * 30
)

var DefaultMigrationsTable = "schema_migrations"
var DefaultLockTable = "schema_lock"
var DefaultLockWaitTimeout = 30 * time.Second
var DefaultLockWaitPollInterval = 1 * time.Second

var (
ErrNilConfig = fmt.Errorf("no config")
Expand All @@ -43,14 +47,9 @@ type Config struct {
ForceLock bool
DatabaseName string

// LockWait enables a blocking lock wait for the migration lock to mimic the behavior of advisory locks in pg.
LockWait bool
// LockWaitTimeout is the maximum time to wait for the lock to be acquired.
// Default value is controlled by the package-level DefaultLockWaitTimeout variable.
LockWaitTimeout time.Duration
// LockWaitPollInterval is the time to wait between attempts to acquire the lock.
// Default value is controlled by the package-level DefaultLockWaitPollInterval variable.
LockWaitPollInterval time.Duration
MaxRetryInterval time.Duration
MaxRetryElapsedTime time.Duration
MaxRetries int
}

type CockroachDb struct {
Expand Down Expand Up @@ -92,12 +91,12 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
config.LockTable = DefaultLockTable
}

if config.LockWaitTimeout == 0 {
config.LockWaitTimeout = DefaultLockWaitTimeout
if config.MaxRetryInterval == 0 {
config.MaxRetryInterval = DefaultMaxRetryInterval
}

if config.LockWaitPollInterval == 0 {
config.LockWaitPollInterval = DefaultLockWaitPollInterval
if config.MaxRetryElapsedTime == 0 {
config.MaxRetryElapsedTime = DefaultMaxRetryElapsedTime
}

px := &CockroachDb{
Expand Down Expand Up @@ -149,22 +148,22 @@ func (c *CockroachDb) Open(url string) (database.Driver, error) {
forceLock = false
}

lockWaitQuery := purl.Query().Get("x-lock-wait")
lockWait, err := strconv.ParseBool(lockWaitQuery)
maxIntervalStr := purl.Query().Get("x-max-retry-interval")
maxInterval, err := time.ParseDuration(maxIntervalStr)
if err != nil {
lockWait = false
maxInterval = DefaultMaxRetryInterval
}

lockWaitTimeoutQuery := purl.Query().Get("x-lock-wait-timeout")
lockWaitTimeout, err := time.ParseDuration(lockWaitTimeoutQuery)
maxElapsedTimeStr := purl.Query().Get("x-max-retry-elapsed-time")
maxElapsedTime, err := time.ParseDuration(maxElapsedTimeStr)
if err != nil {
lockWaitTimeout = 0
maxElapsedTime = DefaultMaxRetryElapsedTime
}

lockWaitPollIntervalQuery := purl.Query().Get("x-lock-wait-poll-interval")
lockWaitPollInterval, err := time.ParseDuration(lockWaitPollIntervalQuery)
maxRetriesStr := purl.Query().Get("x-max-retries")
maxRetries, err := strconv.Atoi(maxRetriesStr)
if err != nil {
lockWaitPollInterval = 0
maxRetries = 0
}

px, err := WithInstance(db, &Config{
Expand All @@ -173,9 +172,9 @@ func (c *CockroachDb) Open(url string) (database.Driver, error) {
LockTable: lockTable,
ForceLock: forceLock,

LockWait: lockWait,
LockWaitTimeout: lockWaitTimeout,
LockWaitPollInterval: lockWaitPollInterval,
MaxRetryInterval: maxInterval,
MaxRetryElapsedTime: maxElapsedTime,
MaxRetries: maxRetries,
})
if err != nil {
return nil, err
Expand All @@ -191,18 +190,16 @@ func (c *CockroachDb) Close() error {
// Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed
// See: https://github.com/cockroachdb/cockroach/issues/13546
func (c *CockroachDb) Lock() error {
startedAt := time.Now()

// CRDB is using SERIALIZABLE isolation level by default, that means we can not run a loop inside the transaction,
// because transaction started when the lock is acquired does not see it being released
for {
return backoff.Retry(func() error {
err := c.lock()
if err == nil || !errors.Is(err, database.ErrLocked) || !c.config.LockWait || time.Since(startedAt) >= c.config.LockWaitTimeout {
return err
if err != nil && !errors.Is(err, database.ErrLocked) {
return backoff.Permanent(err)
}

time.Sleep(c.config.LockWaitPollInterval)
}
return err
}, c.newBackoff())
}

func (c *CockroachDb) lock() error {
Expand Down Expand Up @@ -423,3 +420,19 @@ func (c *CockroachDb) ensureLockTable() error {

return nil
}

func (c *CockroachDb) newBackoff() backoff.BackOff {
retrier := backoff.WithMaxRetries(backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: backoff.DefaultInitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: c.config.MaxRetryInterval,
MaxElapsedTime: c.config.MaxRetryElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, context.Background()), uint64(c.config.MaxRetries))

retrier.Reset()

return retrier
}
4 changes: 2 additions & 2 deletions database/cockroachdb/cockroachdb_test.go
Expand Up @@ -224,7 +224,7 @@ func TestLockWait(t *testing.T) {
t.Fatal(err)
}

addr := fmt.Sprintf("cockroach://root@%v:%v/migrate?sslmode=disable&x-lock-wait=true", ip, port)
addr := fmt.Sprintf("cockroach://root@%v:%v/migrate?sslmode=disable&x-max-retries=10", ip, port)
c := &CockroachDb{}
d1, err := c.Open(addr)
if err != nil {
Expand Down Expand Up @@ -261,7 +261,7 @@ func TestLockWait(t *testing.T) {
select {
case <-done: // we should get here once the d2 lock is acquired
break
case <-time.After(DefaultLockWaitPollInterval * 2): // wait for at least one poll
case <-time.After(DefaultMaxRetryInterval): // wait for at least one poll
t.Fatal("expected lock to be acquired by d2")
}
})
Expand Down

0 comments on commit 4523c3f

Please sign in to comment.