Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DuckDB support to migrate #1077

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
@@ -1,6 +1,6 @@
SOURCE ?= file go_bindata github github_ee bitbucket aws_s3 google_cloud_storage godoc_vfs gitlab
DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite
DATABASE_TEST ?= $(DATABASE) sqlite sqlite3 sqlcipher
DATABASE_TEST ?= $(DATABASE) sqlite sqlite3 sqlcipher duckdb
VERSION ?= $(shell git describe --tags 2>/dev/null | cut -c 2-)
TEST_FLAGS ?=
REPO_OWNER ?= $(shell cd .. && basename "$$(pwd)")
Expand Down
204 changes: 204 additions & 0 deletions database/duckdb/duckdb.go
@@ -0,0 +1,204 @@
package duckdb

import (
"database/sql"
"fmt"
"io"
nurl "net/url"
"strings"

"go.uber.org/atomic"

"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/hashicorp/go-multierror"
_ "github.com/marcboeker/go-duckdb"
)

func init() {
database.Register("duckdb", &DuckDB{})
}

const MigrationTable = "gmg_schema_migrations"

type DuckDB struct {
db *sql.DB
isLocked atomic.Bool
}

func (d *DuckDB) Open(url string) (database.Driver, error) {
purl, err := nurl.Parse(url)
if err != nil {
return nil, fmt.Errorf("parsing url: %w", err)
}
dbfile := strings.Replace(migrate.FilterCustomQuery(purl).String(), "duckdb://", "", 1)
db, err := sql.Open("duckdb", dbfile)
if err != nil {
return nil, fmt.Errorf("opening '%s': %w", dbfile, err)
}

if err := db.Ping(); err != nil {
return nil, fmt.Errorf("pinging: %w", err)
}
d.db = db

if err := d.ensureVersionTable(); err != nil {
return nil, fmt.Errorf("ensuring version table: %w", err)
}

return d, nil
}

func (d *DuckDB) Close() error {
return d.db.Close()
}

func (d *DuckDB) Lock() error {
if !d.isLocked.CAS(false, true) {
return database.ErrLocked
}
return nil
}

func (d *DuckDB) Unlock() error {
if !d.isLocked.CAS(true, false) {
return database.ErrNotLocked
}
return nil
}

func (d *DuckDB) Drop() error {
tablesQuery := `SELECT schema_name, table_name FROM duckdb_tables()`
tables, err := d.db.Query(tablesQuery)
if err != nil {
return &database.Error{OrigErr: err, Query: []byte(tablesQuery)}
}
defer func() {
if errClose := tables.Close(); errClose != nil {
err = multierror.Append(err, errClose)
}
}()

tableNames := []string{}
for tables.Next() {
var (
schemaName string
tableName string
)

if err := tables.Scan(&schemaName, &tableName); err != nil {
return &database.Error{OrigErr: err, Err: "scanning schema and table name"}
}

if len(schemaName) > 0 {
tableNames = append(tableNames, fmt.Sprintf("%s.%s", schemaName, tableName))
} else {
tableNames = append(tableNames, tableName)
}
}
if err := tables.Err(); err != nil {
return &database.Error{OrigErr: err, Query: []byte(tablesQuery), Err: "err in rows after scanning"}
}

for _, t := range tableNames {
dropQuery := fmt.Sprintf("DROP TABLE %s", t)
if _, err := d.db.Exec(dropQuery); err != nil {
return &database.Error{OrigErr: err, Query: []byte(dropQuery)}
}
}

return nil

}

func (d *DuckDB) SetVersion(version int, dirty bool) error {
tx, err := d.db.Begin()
if err != nil {
return &database.Error{OrigErr: err, Err: "transaction start failed"}
}

query := "DELETE FROM " + MigrationTable
if _, err := tx.Exec(query); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}

// Also re-write the schema version for nil dirty versions to prevent
// empty schema version for failed down migration on the first migration
// See: https://github.com/golang-migrate/migrate/issues/330
//
// NOTE: Copied from sqlite implementation, unsure if this is necessary for
// duckdb
if version >= 0 || (version == database.NilVersion && dirty) {
query := fmt.Sprintf(`INSERT INTO %s (version, dirty) VALUES (?, ?)`, MigrationTable)
if _, err := tx.Exec(query, version, dirty); err != nil {
if errRollback := tx.Rollback(); errRollback != nil {
err = multierror.Append(err, errRollback)
}
return &database.Error{OrigErr: err, Query: []byte(query)}
}
}

if err := tx.Commit(); err != nil {
return &database.Error{OrigErr: err, Err: "transaction commit failed"}
}

return nil
}

func (m *DuckDB) Version() (version int, dirty bool, err error) {
query := "SELECT version, dirty FROM " + MigrationTable + " LIMIT 1"
err = m.db.QueryRow(query).Scan(&version, &dirty)
if err != nil {
return database.NilVersion, false, nil
}
return version, dirty, nil
}

func (d *DuckDB) Run(migration io.Reader) error {
migr, err := io.ReadAll(migration)
if err != nil {
return fmt.Errorf("reading migration: %w", err)
}
query := string(migr[:])

tx, err := d.db.Begin()
if err != nil {
return &database.Error{OrigErr: err, Err: "transaction start failed"}
}
if _, err := tx.Exec(query); err != nil {
if errRollback := tx.Rollback(); errRollback != nil {
err = multierror.Append(err, errRollback)
}
return &database.Error{OrigErr: err, Query: []byte(query)}
}
if err := tx.Commit(); err != nil {
return &database.Error{OrigErr: err, Err: "transaction commit failed"}
}
return nil
}

// ensureVersionTable checks if versions table exists and, if not, creates it.
// Note that this function locks the database, which deviates from the usual
// convention of "caller locks" in the Sqlite type.
func (d *DuckDB) ensureVersionTable() (err error) {
if err = d.Lock(); err != nil {
return err
}

defer func() {
if e := d.Unlock(); e != nil {
if err == nil {
err = e
} else {
err = multierror.Append(err, e)
}
}
}()

query := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (version BIGINT, dirty BOOLEAN);`, MigrationTable)

if _, err := d.db.Exec(query); err != nil {
return fmt.Errorf("creating version table via '%s': %w", query, err)
}
return nil
}
49 changes: 49 additions & 0 deletions database/duckdb/duckdb_test.go
@@ -0,0 +1,49 @@
package duckdb

import (
"fmt"
"path/filepath"
"testing"

"github.com/golang-migrate/migrate/v4"
dt "github.com/golang-migrate/migrate/v4/database/testing"
_ "github.com/golang-migrate/migrate/v4/source/file"
_ "github.com/marcboeker/go-duckdb"
)

func Test(t *testing.T) {
dir := t.TempDir()
dbFile := filepath.Join(dir, "test.duckdb")
addr := fmt.Sprintf("duckdb://%s", dbFile)

ddb := &DuckDB{}
d, err := ddb.Open(addr)
if err != nil {
t.Fatalf("calling Open() on addr %s: %s", addr, err)
}

dt.Test(t, d, []byte(`CREATE TABLE t (Qty int, Name string);`))
}

func TestMigrate(t *testing.T) {
dir := t.TempDir()
dbFile := filepath.Join(dir, "test.duckdb")
addr := fmt.Sprintf("duckdb://%s", dbFile)

ddb := &DuckDB{}
d, err := ddb.Open(addr)
if err != nil {
t.Fatalf("calling Open() on addr %s: %s", addr, err)
}

m, err := migrate.NewWithDatabaseInstance(
"file://./examples/migrations",
"main",
d,
)
if err != nil {
t.Fatalf("new migrate: %s", err)
}

dt.TestMigrate(t, m)
}
@@ -0,0 +1 @@
DROP TABLE IF EXISTS pets;
3 changes: 3 additions & 0 deletions database/duckdb/examples/migrations/33_create_table.up.sql
@@ -0,0 +1,3 @@
CREATE TABLE pets (
name string
);
@@ -0,0 +1 @@
DROP TABLE IF EXISTS pets;
1 change: 1 addition & 0 deletions database/duckdb/examples/migrations/44_alter_table.up.sql
@@ -0,0 +1 @@
ALTER TABLE pets ADD predator bool;
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -25,6 +25,7 @@ require (
github.com/jackc/pgx/v5 v5.5.4
github.com/ktrysmt/go-bitbucket v0.6.4
github.com/lib/pq v1.10.9
github.com/marcboeker/go-duckdb v1.4.1
github.com/markbates/pkger v0.15.1
github.com/mattn/go-sqlite3 v1.14.16
github.com/microsoft/go-mssqldb v1.0.0
Expand Down Expand Up @@ -132,7 +133,7 @@ require (
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mitchellh/mapstructure v1.1.2 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/mtibben/percent v0.2.1 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Expand Up @@ -436,6 +436,8 @@ github.com/lib/pq v1.10.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/marcboeker/go-duckdb v1.4.1 h1:NJ0kfgtOD8QUADp6Pwe/9f3e4qANet6m/YHXhx+3das=
github.com/marcboeker/go-duckdb v1.4.1/go.mod h1:wm91jO2GNKa6iO9NTcjXIRsW+/ykPoJbQcHSXhdAl28=
github.com/markbates/pkger v0.15.1 h1:3MPelV53RnGSW07izx5xGxl4e/sdRD6zqseIk0rMASY=
github.com/markbates/pkger v0.15.1/go.mod h1:0JoVlrol20BSywW79rN3kdFFsE5xYM+rSCQDXbLhiuI=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
Expand All @@ -462,8 +464,8 @@ github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcs
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/mitchellh/mapstructure v0.0.0-20180220230111-00c29f56e238/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6/go.mod h1:E2VnQOmVuvZB6UYnnDB0qG5Nq/1tD9acaOpo6xmt0Kw=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
Expand Down
8 changes: 8 additions & 0 deletions internal/cli/build_duckdb.go
@@ -0,0 +1,8 @@
//go:build duckdb
// +build duckdb

package cli

import (
_ "github.com/golang-migrate/migrate/v4/database/duckdb"
)