Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Versioned namespaces #332

Merged
merged 9 commits into from Dec 10, 2021
9 changes: 8 additions & 1 deletion internal/dashboard/dashboard.go
Expand Up @@ -131,7 +131,14 @@ func NewHandler(grpcAddr string, grpcTLSEnabled bool, datastoreEngine string, ds
userFound := false
resourceFound := false

nsDefs, err := ds.ListNamespaces(r.Context())
syncRevision, err := ds.SyncRevision(r.Context())
if err != nil {
log.Ctx(r.Context()).Error().Err(err).Msg("Got error when computing datastore revision")
fmt.Fprintf(w, "Internal Error")
return
}

nsDefs, err := ds.ListNamespaces(r.Context(), syncRevision)
if err != nil {
log.Ctx(r.Context()).Error().AnErr("datastore-error", err).Msg("Got error when trying to load namespaces")
fmt.Fprintf(w, "Internal Error")
Expand Down
26 changes: 19 additions & 7 deletions internal/datastore/crdb/namespace.go
Expand Up @@ -64,7 +64,11 @@ func (cds *crdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Name
return hlcNow, nil
}

func (cds *crdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) {
func (cds *crdbDatastore) ReadNamespace(
ctx context.Context,
nsName string,
revision datastore.Revision,
) (*v0.NamespaceDefinition, datastore.Revision, error) {
ctx = datastore.SeparateContextWithTracing(ctx)

tx, err := cds.conn.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})
Expand All @@ -73,6 +77,10 @@ func (cds *crdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0
}
defer tx.Rollback(ctx)

if err := cds.prepareTransaction(ctx, tx, revision); err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
}

config, timestamp, err := loadNamespace(ctx, tx, nsName)
if err != nil {
if errors.As(err, &datastore.ErrNamespaceNotFound{}) {
Expand All @@ -87,10 +95,10 @@ func (cds *crdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0
func (cds *crdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (datastore.Revision, error) {
ctx = datastore.SeparateContextWithTracing(ctx)

var timestamp time.Time
var hlcNow decimal.Decimal
if err := cds.execute(ctx, cds.conn, pgx.TxOptions{}, func(tx pgx.Tx) error {
var err error
_, timestamp, err = loadNamespace(ctx, tx, nsName)
_, timestamp, err := loadNamespace(ctx, tx, nsName)
if err != nil {
return err
}
Expand All @@ -111,22 +119,22 @@ func (cds *crdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (d
}

deleteTupleSQL, deleteTupleArgs, err := queryDeleteTuples.
Suffix(queryReturningTimestamp).
Where(sq.Eq{colNamespace: nsName}).
ToSql()
if err != nil {
return err
}

_, err = tx.Exec(ctx, deleteTupleSQL, deleteTupleArgs...)
return err
return tx.QueryRow(ctx, deleteTupleSQL, deleteTupleArgs...).Scan(&hlcNow)
}); err != nil {
if errors.As(err, &datastore.ErrNamespaceNotFound{}) {
return datastore.NoRevision, err
}
return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err)
}

return revisionFromTimestamp(timestamp), nil
return hlcNow, nil
}

func loadNamespace(ctx context.Context, tx pgx.Tx, nsName string) (*v0.NamespaceDefinition, time.Time, error) {
Expand Down Expand Up @@ -155,7 +163,7 @@ func loadNamespace(ctx context.Context, tx pgx.Tx, nsName string) (*v0.Namespace
return loaded, timestamp, nil
}

func (cds *crdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) {
func (cds *crdbDatastore) ListNamespaces(ctx context.Context, revision datastore.Revision) ([]*v0.NamespaceDefinition, error) {
ctx = datastore.SeparateContextWithTracing(ctx)

tx, err := cds.conn.Begin(ctx)
Expand All @@ -164,6 +172,10 @@ func (cds *crdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDe
}
defer tx.Rollback(ctx)

if err := cds.prepareTransaction(ctx, tx, revision); err != nil {
return nil, fmt.Errorf(errUnableToListNamespaces, err)
}

query := queryReadNamespace

sql, args, err := query.ToSql()
Expand Down
8 changes: 4 additions & 4 deletions internal/datastore/datastore.go
Expand Up @@ -47,15 +47,15 @@ type Datastore interface {
// returning the version of the namespace that was created.
WriteNamespace(ctx context.Context, newConfig *v0.NamespaceDefinition) (Revision, error)

// ReadNamespace reads a namespace definition and version and returns it if
// found.
ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, Revision, error)
// ReadNamespace reads a namespace definition and version and returns it, and the revision at
// which it was created or last written, if found.
ReadNamespace(ctx context.Context, nsName string, revision Revision) (ns *v0.NamespaceDefinition, lastWritten Revision, err error)

// DeleteNamespace deletes a namespace and any associated tuples.
DeleteNamespace(ctx context.Context, nsName string) (Revision, error)

// ListNamespaces lists all namespaces defined.
ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error)
ListNamespaces(ctx context.Context, revision Revision) ([]*v0.NamespaceDefinition, error)

// IsReady returns whether the datastore is ready to accept data. Datastores that require
// database schema creation will return false until the migrations have been run to create
Expand Down
90 changes: 67 additions & 23 deletions internal/datastore/memdb/memdb.go
Expand Up @@ -26,6 +26,7 @@ const (
tableNamespace = "namespaceConfig"

indexID = "id"
indexUnique = "unique"
indexTimestamp = "timestamp"
indexLive = "live"
indexNamespace = "namespace"
Expand All @@ -47,13 +48,28 @@ const (
errUnableToInstantiateTuplestore = "unable to instantiate datastore: %w"
)

type hasLifetime interface {
getCreatedTxn() uint64
getDeletedTxn() uint64
}

type namespace struct {
name string
configBytes []byte
createdTxn uint64
deletedTxn uint64
}

func (ns namespace) getCreatedTxn() uint64 {
return ns.createdTxn
}

func (ns namespace) getDeletedTxn() uint64 {
return ns.deletedTxn
}

var _ hasLifetime = &namespace{}

type transaction struct {
id uint64
timestamp uint64
Expand All @@ -70,6 +86,16 @@ type relationship struct {
deletedTxn uint64
}

func (r relationship) getCreatedTxn() uint64 {
return r.createdTxn
}

func (r relationship) getDeletedTxn() uint64 {
return r.deletedTxn
}

var _ hasLifetime = &relationship{}

func tupleEntryFromRelationship(r *v1.Relationship, created, deleted uint64) *relationship {
return &relationship{
namespace: r.Resource.ObjectType,
Expand All @@ -83,49 +109,49 @@ func tupleEntryFromRelationship(r *v1.Relationship, created, deleted uint64) *re
}
}

func (t relationship) Relationship() *v1.Relationship {
func (r relationship) Relationship() *v1.Relationship {
return &v1.Relationship{
Resource: &v1.ObjectReference{
ObjectType: t.namespace,
ObjectId: t.resourceID,
ObjectType: r.namespace,
ObjectId: r.resourceID,
},
Relation: t.relation,
Relation: r.relation,
Subject: &v1.SubjectReference{
Object: &v1.ObjectReference{
ObjectType: t.subjectNamespace,
ObjectId: t.subjectObjectID,
ObjectType: r.subjectNamespace,
ObjectId: r.subjectObjectID,
},
OptionalRelation: stringz.Default(t.subjectRelation, "", datastore.Ellipsis),
OptionalRelation: stringz.Default(r.subjectRelation, "", datastore.Ellipsis),
},
}
}

func (t relationship) RelationTuple() *v0.RelationTuple {
func (r relationship) RelationTuple() *v0.RelationTuple {
return &v0.RelationTuple{
ObjectAndRelation: &v0.ObjectAndRelation{
Namespace: t.namespace,
ObjectId: t.resourceID,
Relation: t.relation,
Namespace: r.namespace,
ObjectId: r.resourceID,
Relation: r.relation,
},
User: &v0.User{UserOneof: &v0.User_Userset{Userset: &v0.ObjectAndRelation{
Namespace: t.subjectNamespace,
ObjectId: t.subjectObjectID,
Relation: t.subjectRelation,
Namespace: r.subjectNamespace,
ObjectId: r.subjectObjectID,
Relation: r.subjectRelation,
}}},
}
}

func (t relationship) String() string {
func (r relationship) String() string {
return fmt.Sprintf(
"%s:%s#%s@%s:%s#%s[%d-%d)",
t.namespace,
t.resourceID,
t.relation,
t.subjectNamespace,
t.subjectObjectID,
t.subjectRelation,
t.createdTxn,
t.deletedTxn,
r.namespace,
r.resourceID,
r.relation,
r.subjectNamespace,
r.subjectObjectID,
r.subjectRelation,
r.createdTxn,
r.deletedTxn,
)
}

Expand All @@ -137,6 +163,15 @@ var schema = &memdb.DBSchema{
indexID: {
Name: indexID,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: "name"},
},
},
},
indexUnique: {
Name: indexUnique,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: "name"},
Expand Down Expand Up @@ -406,3 +441,12 @@ func createNewTransaction(txn *memdb.Txn) (uint64, error) {

return newTransactionID, nil
}

// filterToLiveObjects creates a memdb.FilterFunc which returns true for the items to remove,
// which is opposite of most filter implementations.
func filterToLiveObjects(revision datastore.Revision) memdb.FilterFunc {
return func(hasLifetimeRaw interface{}) bool {
obj := hasLifetimeRaw.(hasLifetime)
return uint64(revision.IntPart()) < obj.getCreatedTxn() || uint64(revision.IntPart()) >= obj.getDeletedTxn()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind adding a comment here that the filter is inverted?

}
}
27 changes: 21 additions & 6 deletions internal/datastore/memdb/namespace.go
Expand Up @@ -7,6 +7,7 @@ import (

v0 "github.com/authzed/authzed-go/proto/authzed/api/v0"
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/hashicorp/go-memdb"
"google.golang.org/protobuf/proto"

"github.com/authzed/spicedb/internal/datastore"
Expand All @@ -18,7 +19,10 @@ const (
errUnableToDeleteConfig = "unable to delete namespace config: %w"
)

func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.NamespaceDefinition) (datastore.Revision, error) {
func (mds *memdbDatastore) WriteNamespace(
ctx context.Context,
newConfig *v0.NamespaceDefinition,
) (datastore.Revision, error) {
db := mds.db
if db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
Expand Down Expand Up @@ -70,7 +74,11 @@ func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Nam
}

// ReadNamespace reads a namespace definition and version and returns it if found.
func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) {
func (mds *memdbDatastore) ReadNamespace(
ctx context.Context,
nsName string,
revision datastore.Revision,
) (*v0.NamespaceDefinition, datastore.Revision, error) {
db := mds.db
if db == nil {
return nil, datastore.NoRevision, fmt.Errorf("memdb closed")
Expand All @@ -80,11 +88,13 @@ func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v
defer txn.Abort()

time.Sleep(mds.simulatedLatency)
foundRaw, err := txn.First(tableNamespace, indexLive, nsName, deletedTransactionID)
foundIter, err := txn.Get(tableNamespace, indexID, nsName)
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
}

foundFiltered := memdb.NewFilterIterator(foundIter, filterToLiveObjects(revision))
foundRaw := foundFiltered.Next()
if foundRaw == nil {
return nil, datastore.NoRevision, datastore.NewNamespaceNotFoundErr(nsName)
}
Expand Down Expand Up @@ -150,7 +160,10 @@ func (mds *memdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (
return revisionFromVersion(writeTxnID), nil
}

func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) {
func (mds *memdbDatastore) ListNamespaces(
ctx context.Context,
revision datastore.Revision,
) ([]*v0.NamespaceDefinition, error) {
db := mds.db
if db == nil {
return nil, fmt.Errorf("memdb closed")
Expand All @@ -161,11 +174,13 @@ func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceD
txn := db.Txn(false)
defer txn.Abort()

it, err := txn.Get(tableNamespace, indexDeletedTxn, deletedTransactionID)
aliveAndDead, err := txn.LowerBound(tableNamespace, indexID)
if err != nil {
return nsDefs, err
return nil, fmt.Errorf(errUnableToReadConfig, err)
}

it := memdb.NewFilterIterator(aliveAndDead, filterToLiveObjects(revision))

for {
foundRaw := it.Next()
if foundRaw == nil {
Expand Down
7 changes: 3 additions & 4 deletions internal/datastore/memdb/query.go
Expand Up @@ -152,15 +152,14 @@ func (mtq memdbTupleQuery) Execute(ctx context.Context) (datastore.TupleIterator
return !found
}

if uint64(mtq.revision.IntPart()) < tuple.createdTxn || uint64(mtq.revision.IntPart()) >= tuple.deletedTxn {
return true
}
return false
})

filteredAlive := memdb.NewFilterIterator(filteredIterator, filterToLiveObjects(mtq.revision))

iter := &memdbTupleIterator{
txn: txn,
it: filteredIterator,
it: filteredAlive,
limit: mtq.limit,
}

Expand Down
8 changes: 1 addition & 7 deletions internal/datastore/memdb/reverse_query.go
Expand Up @@ -99,13 +99,7 @@ func (mtq memdbReverseTupleQuery) Execute(ctx context.Context) (datastore.TupleI
return nil, fmt.Errorf(errUnableToQueryTuples, err)
}

filteredIterator := memdb.NewFilterIterator(bestIterator, func(tupleRaw interface{}) bool {
tuple := tupleRaw.(*relationship)
if uint64(mtq.revision.IntPart()) < tuple.createdTxn || uint64(mtq.revision.IntPart()) >= tuple.deletedTxn {
return true
}
return false
})
filteredIterator := memdb.NewFilterIterator(bestIterator, filterToLiveObjects(mtq.revision))

iter := &memdbTupleIterator{
txn: txn,
Expand Down