Skip to content

Commit

Permalink
Merge pull request #332 from authzed/versioned-namespaces
Browse files Browse the repository at this point in the history
Versioned namespaces
  • Loading branch information
jakedt committed Dec 10, 2021
2 parents 3c0af08 + 289c471 commit 1369fed
Show file tree
Hide file tree
Showing 39 changed files with 612 additions and 389 deletions.
9 changes: 8 additions & 1 deletion internal/dashboard/dashboard.go
Expand Up @@ -131,7 +131,14 @@ func NewHandler(grpcAddr string, grpcTLSEnabled bool, datastoreEngine string, ds
userFound := false
resourceFound := false

nsDefs, err := ds.ListNamespaces(r.Context())
syncRevision, err := ds.SyncRevision(r.Context())
if err != nil {
log.Ctx(r.Context()).Error().Err(err).Msg("Got error when computing datastore revision")
fmt.Fprintf(w, "Internal Error")
return
}

nsDefs, err := ds.ListNamespaces(r.Context(), syncRevision)
if err != nil {
log.Ctx(r.Context()).Error().AnErr("datastore-error", err).Msg("Got error when trying to load namespaces")
fmt.Fprintf(w, "Internal Error")
Expand Down
26 changes: 19 additions & 7 deletions internal/datastore/crdb/namespace.go
Expand Up @@ -64,7 +64,11 @@ func (cds *crdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Name
return hlcNow, nil
}

func (cds *crdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) {
func (cds *crdbDatastore) ReadNamespace(
ctx context.Context,
nsName string,
revision datastore.Revision,
) (*v0.NamespaceDefinition, datastore.Revision, error) {
ctx = datastore.SeparateContextWithTracing(ctx)

tx, err := cds.conn.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})
Expand All @@ -73,6 +77,10 @@ func (cds *crdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0
}
defer tx.Rollback(ctx)

if err := cds.prepareTransaction(ctx, tx, revision); err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
}

config, timestamp, err := loadNamespace(ctx, tx, nsName)
if err != nil {
if errors.As(err, &datastore.ErrNamespaceNotFound{}) {
Expand All @@ -87,10 +95,10 @@ func (cds *crdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0
func (cds *crdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (datastore.Revision, error) {
ctx = datastore.SeparateContextWithTracing(ctx)

var timestamp time.Time
var hlcNow decimal.Decimal
if err := cds.execute(ctx, cds.conn, pgx.TxOptions{}, func(tx pgx.Tx) error {
var err error
_, timestamp, err = loadNamespace(ctx, tx, nsName)
_, timestamp, err := loadNamespace(ctx, tx, nsName)
if err != nil {
return err
}
Expand All @@ -111,22 +119,22 @@ func (cds *crdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (d
}

deleteTupleSQL, deleteTupleArgs, err := queryDeleteTuples.
Suffix(queryReturningTimestamp).
Where(sq.Eq{colNamespace: nsName}).
ToSql()
if err != nil {
return err
}

_, err = tx.Exec(ctx, deleteTupleSQL, deleteTupleArgs...)
return err
return tx.QueryRow(ctx, deleteTupleSQL, deleteTupleArgs...).Scan(&hlcNow)
}); err != nil {
if errors.As(err, &datastore.ErrNamespaceNotFound{}) {
return datastore.NoRevision, err
}
return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err)
}

return revisionFromTimestamp(timestamp), nil
return hlcNow, nil
}

func loadNamespace(ctx context.Context, tx pgx.Tx, nsName string) (*v0.NamespaceDefinition, time.Time, error) {
Expand Down Expand Up @@ -155,7 +163,7 @@ func loadNamespace(ctx context.Context, tx pgx.Tx, nsName string) (*v0.Namespace
return loaded, timestamp, nil
}

func (cds *crdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) {
func (cds *crdbDatastore) ListNamespaces(ctx context.Context, revision datastore.Revision) ([]*v0.NamespaceDefinition, error) {
ctx = datastore.SeparateContextWithTracing(ctx)

tx, err := cds.conn.Begin(ctx)
Expand All @@ -164,6 +172,10 @@ func (cds *crdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDe
}
defer tx.Rollback(ctx)

if err := cds.prepareTransaction(ctx, tx, revision); err != nil {
return nil, fmt.Errorf(errUnableToListNamespaces, err)
}

query := queryReadNamespace

sql, args, err := query.ToSql()
Expand Down
8 changes: 4 additions & 4 deletions internal/datastore/datastore.go
Expand Up @@ -47,15 +47,15 @@ type Datastore interface {
// returning the version of the namespace that was created.
WriteNamespace(ctx context.Context, newConfig *v0.NamespaceDefinition) (Revision, error)

// ReadNamespace reads a namespace definition and version and returns it if
// found.
ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, Revision, error)
// ReadNamespace reads a namespace definition and version and returns it, and the revision at
// which it was created or last written, if found.
ReadNamespace(ctx context.Context, nsName string, revision Revision) (ns *v0.NamespaceDefinition, lastWritten Revision, err error)

// DeleteNamespace deletes a namespace and any associated tuples.
DeleteNamespace(ctx context.Context, nsName string) (Revision, error)

// ListNamespaces lists all namespaces defined.
ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error)
ListNamespaces(ctx context.Context, revision Revision) ([]*v0.NamespaceDefinition, error)

// IsReady returns whether the datastore is ready to accept data. Datastores that require
// database schema creation will return false until the migrations have been run to create
Expand Down
90 changes: 67 additions & 23 deletions internal/datastore/memdb/memdb.go
Expand Up @@ -26,6 +26,7 @@ const (
tableNamespace = "namespaceConfig"

indexID = "id"
indexUnique = "unique"
indexTimestamp = "timestamp"
indexLive = "live"
indexNamespace = "namespace"
Expand All @@ -47,13 +48,28 @@ const (
errUnableToInstantiateTuplestore = "unable to instantiate datastore: %w"
)

type hasLifetime interface {
getCreatedTxn() uint64
getDeletedTxn() uint64
}

type namespace struct {
name string
configBytes []byte
createdTxn uint64
deletedTxn uint64
}

func (ns namespace) getCreatedTxn() uint64 {
return ns.createdTxn
}

func (ns namespace) getDeletedTxn() uint64 {
return ns.deletedTxn
}

var _ hasLifetime = &namespace{}

type transaction struct {
id uint64
timestamp uint64
Expand All @@ -70,6 +86,16 @@ type relationship struct {
deletedTxn uint64
}

func (r relationship) getCreatedTxn() uint64 {
return r.createdTxn
}

func (r relationship) getDeletedTxn() uint64 {
return r.deletedTxn
}

var _ hasLifetime = &relationship{}

func tupleEntryFromRelationship(r *v1.Relationship, created, deleted uint64) *relationship {
return &relationship{
namespace: r.Resource.ObjectType,
Expand All @@ -83,49 +109,49 @@ func tupleEntryFromRelationship(r *v1.Relationship, created, deleted uint64) *re
}
}

func (t relationship) Relationship() *v1.Relationship {
func (r relationship) Relationship() *v1.Relationship {
return &v1.Relationship{
Resource: &v1.ObjectReference{
ObjectType: t.namespace,
ObjectId: t.resourceID,
ObjectType: r.namespace,
ObjectId: r.resourceID,
},
Relation: t.relation,
Relation: r.relation,
Subject: &v1.SubjectReference{
Object: &v1.ObjectReference{
ObjectType: t.subjectNamespace,
ObjectId: t.subjectObjectID,
ObjectType: r.subjectNamespace,
ObjectId: r.subjectObjectID,
},
OptionalRelation: stringz.Default(t.subjectRelation, "", datastore.Ellipsis),
OptionalRelation: stringz.Default(r.subjectRelation, "", datastore.Ellipsis),
},
}
}

func (t relationship) RelationTuple() *v0.RelationTuple {
func (r relationship) RelationTuple() *v0.RelationTuple {
return &v0.RelationTuple{
ObjectAndRelation: &v0.ObjectAndRelation{
Namespace: t.namespace,
ObjectId: t.resourceID,
Relation: t.relation,
Namespace: r.namespace,
ObjectId: r.resourceID,
Relation: r.relation,
},
User: &v0.User{UserOneof: &v0.User_Userset{Userset: &v0.ObjectAndRelation{
Namespace: t.subjectNamespace,
ObjectId: t.subjectObjectID,
Relation: t.subjectRelation,
Namespace: r.subjectNamespace,
ObjectId: r.subjectObjectID,
Relation: r.subjectRelation,
}}},
}
}

func (t relationship) String() string {
func (r relationship) String() string {
return fmt.Sprintf(
"%s:%s#%s@%s:%s#%s[%d-%d)",
t.namespace,
t.resourceID,
t.relation,
t.subjectNamespace,
t.subjectObjectID,
t.subjectRelation,
t.createdTxn,
t.deletedTxn,
r.namespace,
r.resourceID,
r.relation,
r.subjectNamespace,
r.subjectObjectID,
r.subjectRelation,
r.createdTxn,
r.deletedTxn,
)
}

Expand All @@ -137,6 +163,15 @@ var schema = &memdb.DBSchema{
indexID: {
Name: indexID,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: "name"},
},
},
},
indexUnique: {
Name: indexUnique,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: "name"},
Expand Down Expand Up @@ -406,3 +441,12 @@ func createNewTransaction(txn *memdb.Txn) (uint64, error) {

return newTransactionID, nil
}

// filterToLiveObjects creates a memdb.FilterFunc which returns true for the items to remove,
// which is opposite of most filter implementations.
func filterToLiveObjects(revision datastore.Revision) memdb.FilterFunc {
return func(hasLifetimeRaw interface{}) bool {
obj := hasLifetimeRaw.(hasLifetime)
return uint64(revision.IntPart()) < obj.getCreatedTxn() || uint64(revision.IntPart()) >= obj.getDeletedTxn()
}
}
27 changes: 21 additions & 6 deletions internal/datastore/memdb/namespace.go
Expand Up @@ -7,6 +7,7 @@ import (

v0 "github.com/authzed/authzed-go/proto/authzed/api/v0"
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/hashicorp/go-memdb"
"google.golang.org/protobuf/proto"

"github.com/authzed/spicedb/internal/datastore"
Expand All @@ -18,7 +19,10 @@ const (
errUnableToDeleteConfig = "unable to delete namespace config: %w"
)

func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.NamespaceDefinition) (datastore.Revision, error) {
func (mds *memdbDatastore) WriteNamespace(
ctx context.Context,
newConfig *v0.NamespaceDefinition,
) (datastore.Revision, error) {
db := mds.db
if db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
Expand Down Expand Up @@ -70,7 +74,11 @@ func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Nam
}

// ReadNamespace reads a namespace definition and version and returns it if found.
func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) {
func (mds *memdbDatastore) ReadNamespace(
ctx context.Context,
nsName string,
revision datastore.Revision,
) (*v0.NamespaceDefinition, datastore.Revision, error) {
db := mds.db
if db == nil {
return nil, datastore.NoRevision, fmt.Errorf("memdb closed")
Expand All @@ -80,11 +88,13 @@ func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v
defer txn.Abort()

time.Sleep(mds.simulatedLatency)
foundRaw, err := txn.First(tableNamespace, indexLive, nsName, deletedTransactionID)
foundIter, err := txn.Get(tableNamespace, indexID, nsName)
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
}

foundFiltered := memdb.NewFilterIterator(foundIter, filterToLiveObjects(revision))
foundRaw := foundFiltered.Next()
if foundRaw == nil {
return nil, datastore.NoRevision, datastore.NewNamespaceNotFoundErr(nsName)
}
Expand Down Expand Up @@ -150,7 +160,10 @@ func (mds *memdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (
return revisionFromVersion(writeTxnID), nil
}

func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) {
func (mds *memdbDatastore) ListNamespaces(
ctx context.Context,
revision datastore.Revision,
) ([]*v0.NamespaceDefinition, error) {
db := mds.db
if db == nil {
return nil, fmt.Errorf("memdb closed")
Expand All @@ -161,11 +174,13 @@ func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceD
txn := db.Txn(false)
defer txn.Abort()

it, err := txn.Get(tableNamespace, indexDeletedTxn, deletedTransactionID)
aliveAndDead, err := txn.LowerBound(tableNamespace, indexID)
if err != nil {
return nsDefs, err
return nil, fmt.Errorf(errUnableToReadConfig, err)
}

it := memdb.NewFilterIterator(aliveAndDead, filterToLiveObjects(revision))

for {
foundRaw := it.Next()
if foundRaw == nil {
Expand Down
7 changes: 3 additions & 4 deletions internal/datastore/memdb/query.go
Expand Up @@ -152,15 +152,14 @@ func (mtq memdbTupleQuery) Execute(ctx context.Context) (datastore.TupleIterator
return !found
}

if uint64(mtq.revision.IntPart()) < tuple.createdTxn || uint64(mtq.revision.IntPart()) >= tuple.deletedTxn {
return true
}
return false
})

filteredAlive := memdb.NewFilterIterator(filteredIterator, filterToLiveObjects(mtq.revision))

iter := &memdbTupleIterator{
txn: txn,
it: filteredIterator,
it: filteredAlive,
limit: mtq.limit,
}

Expand Down
8 changes: 1 addition & 7 deletions internal/datastore/memdb/reverse_query.go
Expand Up @@ -99,13 +99,7 @@ func (mtq memdbReverseTupleQuery) Execute(ctx context.Context) (datastore.TupleI
return nil, fmt.Errorf(errUnableToQueryTuples, err)
}

filteredIterator := memdb.NewFilterIterator(bestIterator, func(tupleRaw interface{}) bool {
tuple := tupleRaw.(*relationship)
if uint64(mtq.revision.IntPart()) < tuple.createdTxn || uint64(mtq.revision.IntPart()) >= tuple.deletedTxn {
return true
}
return false
})
filteredIterator := memdb.NewFilterIterator(bestIterator, filterToLiveObjects(mtq.revision))

iter := &memdbTupleIterator{
txn: txn,
Expand Down

0 comments on commit 1369fed

Please sign in to comment.