Skip to content

Commit

Permalink
Export the unique, stable, ID from datastore
Browse files Browse the repository at this point in the history
  • Loading branch information
josephschorr committed Apr 22, 2024
1 parent 2f2b7fa commit 4deb0e5
Show file tree
Hide file tree
Showing 21 changed files with 147 additions and 47 deletions.
4 changes: 4 additions & 0 deletions internal/datastore/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.Datasto

type ctxProxy struct{ delegate datastore.Datastore }

func (p *ctxProxy) UniqueID(ctx context.Context) (string, error) {
return p.delegate.UniqueID(SeparateContextWithTracing(ctx))
}

func (p *ctxProxy) ReadWriteTx(
ctx context.Context,
f datastore.TxUserFunc,
Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"regexp"
"strconv"
"sync/atomic"
"time"

"github.com/IBM/pgxpoolprometheus"
Expand Down Expand Up @@ -273,6 +274,8 @@ type crdbDatastore struct {
pruneGroup *errgroup.Group
ctx context.Context
cancel context.CancelFunc

uniqueID atomic.Pointer[string]
}

func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
Expand Down
23 changes: 18 additions & 5 deletions internal/datastore/crdb/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,33 @@ const (

var (
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
uniqueID string
)

func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
if len(uniqueID) == 0 {
func (cds *crdbDatastore) UniqueID(ctx context.Context) (string, error) {
if cds.uniqueID.Load() == nil {
sql, args, err := queryReadUniqueID.ToSql()
if err != nil {
return datastore.Stats{}, fmt.Errorf("unable to prepare unique ID sql: %w", err)
return "", fmt.Errorf("unable to prepare unique ID sql: %w", err)
}

var uniqueID string
if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
return row.Scan(&uniqueID)
}, sql, args...); err != nil {
return datastore.Stats{}, fmt.Errorf("unable to query unique ID: %w", err)
return "", fmt.Errorf("unable to query unique ID: %w", err)
}

cds.uniqueID.Store(&uniqueID)
return uniqueID, nil
}

return *cds.uniqueID.Load(), nil
}

func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
uniqueID, err := cds.UniqueID(ctx)
if err != nil {
return datastore.Stats{}, err
}

var nsDefs []datastore.RevisionedNamespace
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ type snapshot struct {
db *memdb.MemDB
}

func (mdb *memdbDatastore) UniqueID(_ context.Context) (string, error) {
return mdb.uniqueID, nil
}

func (mdb *memdbDatastore) SnapshotReader(dr datastore.Revision) datastore.Reader {
mdb.RLock()
defer mdb.RUnlock()
Expand Down
4 changes: 3 additions & 1 deletion internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ type Datastore struct {
createTxn string
createBaseTxn string

uniqueID atomic.Pointer[string]

*QueryBuilder
*revisions.CachedOptimizedRevisions
revisions.CommonDecoder
Expand Down Expand Up @@ -524,7 +526,7 @@ func (mds *Datastore) isSeeded(ctx context.Context) (bool, error) {
return false, nil
}

_, err = mds.getUniqueID(ctx)
_, err = mds.UniqueID(ctx)
if err != nil {
return false, nil
}
Expand Down
24 changes: 14 additions & 10 deletions internal/datastore/mysql/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
}
}

uniqueID, err := mds.getUniqueID(ctx)
uniqueID, err := mds.UniqueID(ctx)
if err != nil {
return datastore.Stats{}, err
}
Expand Down Expand Up @@ -81,16 +81,20 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
}, nil
}

func (mds *Datastore) getUniqueID(ctx context.Context) (string, error) {
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
if err != nil {
return "", fmt.Errorf("unable to generate query sql: %w", err)
}
func (mds *Datastore) UniqueID(ctx context.Context) (string, error) {
if mds.uniqueID.Load() == nil {
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
if err != nil {
return "", fmt.Errorf("unable to generate query sql: %w", err)
}

var uniqueID string
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
return "", fmt.Errorf("unable to query unique ID: %w", err)
var uniqueID string
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
return "", fmt.Errorf("unable to query unique ID: %w", err)
}
mds.uniqueID.Store(&uniqueID)
return uniqueID, nil
}

return uniqueID, nil
return *mds.uniqueID.Load(), nil
}
3 changes: 2 additions & 1 deletion internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ type pgDatastore struct {
watchEnabled bool

credentialsProvider datastore.CredentialsProvider
uniqueID atomic.Pointer[string]

gcGroup *errgroup.Group
gcCtx context.Context
Expand Down Expand Up @@ -569,7 +570,7 @@ func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, e

if version == headMigration {
// Ensure a datastore ID is present. This ensures the tables have not been truncated.
uniqueID, err := pgd.datastoreUniqueID(ctx)
uniqueID, err := pgd.UniqueID(ctx)
if err != nil {
return datastore.ReadyState{}, fmt.Errorf("database validation failed: %w; if you have previously run `TRUNCATE`, this database is no longer valid and must be remigrated. See: https://spicedb.dev/d/truncate-unsupported", err)
}
Expand Down
25 changes: 17 additions & 8 deletions internal/datastore/postgres/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,25 @@ var (
Where(sq.Eq{colRelname: tableTuple})
)

func (pgd *pgDatastore) datastoreUniqueID(ctx context.Context) (string, error) {
idSQL, idArgs, err := queryUniqueID.ToSql()
if err != nil {
return "", fmt.Errorf("unable to generate query sql: %w", err)
func (pgd *pgDatastore) UniqueID(ctx context.Context) (string, error) {
if pgd.uniqueID.Load() == nil {
idSQL, idArgs, err := queryUniqueID.ToSql()
if err != nil {
return "", fmt.Errorf("unable to generate query sql: %w", err)
}

var uniqueID string
if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
}); err != nil {
return "", fmt.Errorf("unable to query unique ID: %w", err)
}

pgd.uniqueID.Store(&uniqueID)
return uniqueID, nil
}

var uniqueID string
return uniqueID, pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
})
return *pgd.uniqueID.Load(), nil
}

func (pgd *pgDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func NewObservableDatastoreProxy(d datastore.Datastore) datastore.Datastore {

type observableProxy struct{ delegate datastore.Datastore }

func (p *observableProxy) UniqueID(ctx context.Context) (string, error) {
return p.delegate.UniqueID(ctx)
}

func (p *observableProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
delegateReader := p.delegate.SnapshotReader(rev)
return &observableReader{delegateReader}
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/proxy_test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ type MockDatastore struct {
mock.Mock
}

func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) {
return "mockds", nil
}

func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
args := dm.Called(rev)
return args.Get(0).(datastore.Reader)
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/schemacaching/watchingcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ type fakeDatastore struct {
lock sync.RWMutex
}

func (fds *fakeDatastore) UniqueID(_ context.Context) (string, error) {
return "fakedsforwatch", nil
}

func (fds *fakeDatastore) updateNamespace(name string, def *corev1.NamespaceDefinition, revision datastore.Revision) {
fds.lock.Lock()
defer fds.lock.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type singleflightProxy struct {

var _ datastore.Datastore = (*singleflightProxy)(nil)

func (p *singleflightProxy) UniqueID(ctx context.Context) (string, error) {
return p.delegate.UniqueID(ctx)
}

func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
return p.delegate.SnapshotReader(rev)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/datastore/spanner/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var ParseRevisionString = revisions.RevisionParser(revisions.Timestamp)

func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
func (sd *spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
now, err := sd.now(ctx)
if err != nil {
return datastore.NoRevision, fmt.Errorf(errRevision, err)
Expand All @@ -22,11 +22,11 @@ func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.
return revisions.NewForTime(now), nil
}

func (sd spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
func (sd *spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
return sd.headRevisionInternal(ctx)
}

func (sd spannerDatastore) now(ctx context.Context) (time.Time, error) {
func (sd *spannerDatastore) now(ctx context.Context) (time.Time, error) {
var timestamp time.Time
if err := sd.client.Single().Query(ctx, spanner.NewStatement("SELECT CURRENT_TIMESTAMP()")).Do(func(r *spanner.Row) error {
return r.Columns(&timestamp)
Expand Down
15 changes: 9 additions & 6 deletions internal/datastore/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"regexp"
"strconv"
"sync/atomic"
"time"

"cloud.google.com/go/spanner"
Expand Down Expand Up @@ -78,6 +79,8 @@ type spannerDatastore struct {
client *spanner.Client
config spannerOptions
database string

uniqueID atomic.Pointer[string]
}

// NewSpannerDatastore returns a datastore backed by cloud spanner
Expand Down Expand Up @@ -143,7 +146,7 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())*
config.maxRevisionStalenessPercent) * time.Nanosecond

ds := spannerDatastore{
ds := &spannerDatastore{
RemoteClockRevisions: revisions.NewRemoteClockRevisions(
defaultChangeStreamRetention,
maxRevisionStaleness,
Expand Down Expand Up @@ -195,7 +198,7 @@ func (t *traceableRTX) Query(ctx context.Context, statement spanner.Statement) *
return t.delegate.Query(ctx, statement)
}

func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader {
func (sd *spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader {
r := revisionRaw.(revisions.TimestampRevision)

txSource := func() readTX {
Expand All @@ -205,7 +208,7 @@ func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datast
return spannerReader{executor, txSource}
}

func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
func (sd *spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
config := options.NewRWTOptionsWithOptions(opts...)

ctx, span := tracer.Start(ctx, "ReadWriteTx")
Expand Down Expand Up @@ -248,7 +251,7 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF
return revisions.NewForTime(ts), nil
}

func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
func (sd *spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
headMigration, err := migrations.SpannerMigrations.HeadRevision()
if err != nil {
return datastore.ReadyState{}, fmt.Errorf("invalid head migration found for spanner: %w", err)
Expand All @@ -275,11 +278,11 @@ func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState
}, nil
}

func (sd spannerDatastore) Features(_ context.Context) (*datastore.Features, error) {
func (sd *spannerDatastore) Features(_ context.Context) (*datastore.Features, error) {
return &datastore.Features{Watch: datastore.Feature{Enabled: true}}, nil
}

func (sd spannerDatastore) Close() error {
func (sd *spannerDatastore) Close() error {
sd.client.Close()
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/spanner/spanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

// Implement TestableDatastore interface
func (sd spannerDatastore) ExampleRetryableError() error {
func (sd *spannerDatastore) ExampleRetryableError() error {
return status.New(codes.Aborted, "retryable").Err()
}

Expand Down
33 changes: 23 additions & 10 deletions internal/datastore/spanner/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,29 @@ var (
rng = rand.NewSource(time.Now().UnixNano())
)

func (sd spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
var uniqueID string
if err := sd.client.Single().Read(
context.Background(),
tableMetadata,
spanner.AllKeys(),
[]string{colUniqueID},
).Do(func(r *spanner.Row) error {
return r.Columns(&uniqueID)
}); err != nil {
func (sd *spannerDatastore) UniqueID(ctx context.Context) (string, error) {
if sd.uniqueID.Load() == nil {
var uniqueID string
if err := sd.client.Single().Read(
ctx,
tableMetadata,
spanner.AllKeys(),
[]string{colUniqueID},
).Do(func(r *spanner.Row) error {
return r.Columns(&uniqueID)
}); err != nil {
return "", fmt.Errorf("unable to read unique ID: %w", err)
}
sd.uniqueID.Store(&uniqueID)
return uniqueID, nil
}

return *sd.uniqueID.Load(), nil
}

func (sd *spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
uniqueID, err := sd.UniqueID(ctx)
if err != nil {
return datastore.Stats{}, fmt.Errorf("unable to read unique ID: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/spanner/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func parseDatabaseName(db string) (project, instance, database string, err error
return matches[1], matches[2], matches[3], nil
}

func (sd spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (sd *spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
watchBufferLength := opts.WatchBufferLength
if watchBufferLength <= 0 {
watchBufferLength = sd.watchBufferLength
Expand All @@ -65,7 +65,7 @@ func (sd spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Re
return updates, errs
}

func (sd spannerDatastore) watch(
func (sd *spannerDatastore) watch(
ctx context.Context,
afterRevisionRaw datastore.Revision,
opts datastore.WatchOptions,
Expand Down
5 changes: 5 additions & 0 deletions pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,11 @@ func (wo WatchOptions) WithCheckpointInterval(interval time.Duration) WatchOptio

// Datastore represents tuple access for a single namespace.
type Datastore interface {
// UniqueID returns a unique identifier for the datastore. This identifier
// must be stable across restarts of the datastore if the datastore is
// persistent.
UniqueID(context.Context) (string, error)

// SnapshotReader creates a read-only handle that reads the datastore at the specified revision.
// Any errors establishing the reader will be returned by subsequent calls.
SnapshotReader(Revision) Reader
Expand Down

0 comments on commit 4deb0e5

Please sign in to comment.