Skip to content

Commit

Permalink
change memdb datastore to use a single transaction log for namespaces…
Browse files Browse the repository at this point in the history
… and relationships
  • Loading branch information
jakedt committed Dec 3, 2021
1 parent 4d979a3 commit 95b4228
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 186 deletions.
126 changes: 79 additions & 47 deletions internal/datastore/memdb/memdb.go
Expand Up @@ -21,10 +21,9 @@ import (
const DisableGC = time.Duration(math.MaxInt64)

const (
tableTuple = "tuple"
tableChangelog = "changelog"
tableNamespaceChangelog = "namespaceChangelog"
tableNamespaceConfig = "namespaceConfig"
tableRelationship = "relationship"
tableTransaction = "transaction"
tableNamespace = "namespaceConfig"

indexID = "id"
indexTimestamp = "timestamp"
Expand All @@ -38,32 +37,29 @@ const (
indexUsersetNamespace = "usersetNamespace"
indexUsersetRelation = "usersetRelation"
indexUserset = "userset"
indexCreatedTxn = "createdTxn"
indexDeletedTxn = "deletedTxn"

defaultWatchBufferLength = 128

deletedTransactionID = ^uint64(0)

errUnableToInstantiateTuplestore = "unable to instantiate datastore: %w"
)

type changelog struct {
id uint64
name string
replaces []byte
oldVersion uint64
}

type namespace struct {
name string
configBytes []byte
version uint64
createdTxn uint64
deletedTxn uint64
}

type tupleChangelog struct {
type transaction struct {
id uint64
timestamp uint64
changes []*v0.RelationTupleUpdate
}

type tupleEntry struct {
type relationship struct {
namespace string
objectID string
relation string
Expand All @@ -74,8 +70,8 @@ type tupleEntry struct {
deletedTxn uint64
}

func tupleEntryFromRelationship(r *v1.Relationship, created, deleted uint64) *tupleEntry {
return &tupleEntry{
func tupleEntryFromRelationship(r *v1.Relationship, created, deleted uint64) *relationship {
return &relationship{
namespace: r.Resource.ObjectType,
objectID: r.Resource.ObjectId,
relation: r.Relation,
Expand All @@ -87,7 +83,7 @@ func tupleEntryFromRelationship(r *v1.Relationship, created, deleted uint64) *tu
}
}

func (t tupleEntry) Relationship() *v1.Relationship {
func (t relationship) Relationship() *v1.Relationship {
return &v1.Relationship{
Resource: &v1.ObjectReference{
ObjectType: t.namespace,
Expand All @@ -104,7 +100,7 @@ func (t tupleEntry) Relationship() *v1.Relationship {
}
}

func (t tupleEntry) RelationTuple() *v0.RelationTuple {
func (t relationship) RelationTuple() *v0.RelationTuple {
return &v0.RelationTuple{
ObjectAndRelation: &v0.ObjectAndRelation{
Namespace: t.namespace,
Expand All @@ -119,7 +115,7 @@ func (t tupleEntry) RelationTuple() *v0.RelationTuple {
}
}

func (t tupleEntry) String() string {
func (t relationship) String() string {
return fmt.Sprintf(
"%s:%s#%s@%s:%s#%s[%d-%d)",
t.namespace,
Expand All @@ -135,28 +131,38 @@ func (t tupleEntry) String() string {

var schema = &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
tableNamespaceChangelog: {
Name: tableNamespaceChangelog,
tableNamespace: {
Name: tableNamespace,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
Unique: true,
Indexer: &memdb.UintFieldIndex{Field: "id"},
Name: indexID,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: "name"},
&memdb.UintFieldIndex{Field: "createdTxn"},
},
},
},
},
},
tableNamespaceConfig: {
Name: tableNamespaceConfig,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "name"},
indexLive: {
Name: indexLive,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: "name"},
&memdb.UintFieldIndex{Field: "deletedTxn"},
},
},
},
indexDeletedTxn: {
Name: indexDeletedTxn,
Unique: false,
Indexer: &memdb.UintFieldIndex{Field: "deletedTxn"},
},
},
},
tableChangelog: {
Name: tableChangelog,
tableTransaction: {
Name: tableTransaction,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
Expand All @@ -170,8 +176,8 @@ var schema = &memdb.DBSchema{
},
},
},
tableTuple: {
Name: tableTuple,
tableRelationship: {
Name: tableRelationship,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
Expand Down Expand Up @@ -294,6 +300,16 @@ var schema = &memdb.DBSchema{
},
},
},
indexCreatedTxn: {
Name: indexCreatedTxn,
Unique: false,
Indexer: &memdb.UintFieldIndex{Field: "createdTxn"},
},
indexDeletedTxn: {
Name: indexDeletedTxn,
Unique: false,
Indexer: &memdb.UintFieldIndex{Field: "deletedTxn"},
},
},
},
},
Expand Down Expand Up @@ -333,19 +349,11 @@ func NewMemdbDatastore(

// Add a changelog entry to make the first revision non-zero, matching the other datastore
// implementations.
newChangelogID, err := nextTupleChangelogID(txn)
_, err = createNewTransaction(txn)
if err != nil {
return nil, fmt.Errorf(errUnableToInstantiateTuplestore, err)
}

newChangelogEntry := &tupleChangelog{
id: newChangelogID,
timestamp: uint64(time.Now().UnixNano()),
}
if err := txn.Insert(tableChangelog, newChangelogEntry); err != nil {
return nil, fmt.Errorf(errUnableToInstantiateTuplestore, err)
}

txn.Commit()

if watchBufferLength == 0 {
Expand Down Expand Up @@ -374,3 +382,27 @@ func (mds *memdbDatastore) Close() error {
mds.db = nil
return nil
}

func createNewTransaction(txn *memdb.Txn) (uint64, error) {
var newTransactionID uint64 = 1

lastChangeRaw, err := txn.Last(tableTransaction, indexID)
if err != nil {
return 0, err
}

if lastChangeRaw != nil {
newTransactionID = lastChangeRaw.(*transaction).id + 1
}

newChangelogEntry := &transaction{
id: newTransactionID,
timestamp: uint64(time.Now().UnixNano()),
}

if err := txn.Insert(tableTransaction, newChangelogEntry); err != nil {
return 0, err
}

return newTransactionID, nil
}
82 changes: 26 additions & 56 deletions internal/datastore/memdb/namespace.go
Expand Up @@ -6,7 +6,7 @@ import (
"time"

v0 "github.com/authzed/authzed-go/proto/authzed/api/v0"
"github.com/hashicorp/go-memdb"
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"google.golang.org/protobuf/proto"

"github.com/authzed/spicedb/internal/datastore"
Expand All @@ -28,22 +28,23 @@ func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Nam
defer txn.Abort()

time.Sleep(mds.simulatedLatency)
newVersion, err := nextChangelogID(txn)
newVersion, err := createNewTransaction(txn)
if err != nil {
return datastore.NoRevision, fmt.Errorf(errUnableToWriteConfig, err)
}

foundRaw, err := txn.First(tableNamespaceConfig, indexID, newConfig.Name)
foundRaw, err := txn.First(tableNamespace, indexLive, newConfig.Name, deletedTransactionID)
if err != nil {
return datastore.NoRevision, fmt.Errorf(errUnableToWriteConfig, err)
}

var replacing []byte
var oldVersion uint64
if foundRaw != nil {
found := foundRaw.(*namespace)
replacing = found.configBytes
oldVersion = found.version
// Mark the old one as deleted
var toDelete namespace = *(foundRaw.(*namespace))
toDelete.deletedTxn = newVersion
if err := txn.Insert(tableNamespace, &toDelete); err != nil {
return datastore.NoRevision, fmt.Errorf(errUnableToWriteConfig, err)
}
}

serialized, err := proto.Marshal(newConfig)
Expand All @@ -54,22 +55,12 @@ func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Nam
newConfigEntry := &namespace{
name: newConfig.Name,
configBytes: serialized,
version: newVersion,
}
changeLogEntry := &changelog{
id: newVersion,
name: newConfig.Name,
replaces: replacing,
oldVersion: oldVersion,
createdTxn: newVersion,
deletedTxn: deletedTransactionID,
}

time.Sleep(mds.simulatedLatency)
if err := txn.Insert(tableNamespaceConfig, newConfigEntry); err != nil {
return datastore.NoRevision, fmt.Errorf(errUnableToWriteConfig, err)
}

time.Sleep(mds.simulatedLatency)
if err := txn.Insert(tableNamespaceChangelog, changeLogEntry); err != nil {
if err := txn.Insert(tableNamespace, newConfigEntry); err != nil {
return datastore.NoRevision, fmt.Errorf(errUnableToWriteConfig, err)
}

Expand All @@ -89,7 +80,7 @@ func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v
defer txn.Abort()

time.Sleep(mds.simulatedLatency)
foundRaw, err := txn.First(tableNamespaceConfig, indexID, nsName)
foundRaw, err := txn.First(tableNamespace, indexLive, nsName, deletedTransactionID)
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
}
Expand All @@ -105,7 +96,7 @@ func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
}

return &loaded, revisionFromVersion(found.version), nil
return &loaded, revisionFromVersion(found.createdTxn), nil
}

func (mds *memdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (datastore.Revision, error) {
Expand All @@ -118,7 +109,7 @@ func (mds *memdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (
defer txn.Abort()

time.Sleep(mds.simulatedLatency)
foundRaw, err := txn.First(tableNamespaceConfig, indexID, nsName)
foundRaw, err := txn.First(tableNamespace, indexLive, nsName, deletedTransactionID)
if err != nil {
return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err)
}
Expand All @@ -129,42 +120,34 @@ func (mds *memdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (
found := foundRaw.(*namespace)

time.Sleep(mds.simulatedLatency)
newChangelogID, err := nextChangelogID(txn)
newChangelogID, err := createNewTransaction(txn)
if err != nil {
return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err)
}

changeLogEntry := &changelog{
id: newChangelogID,
name: nsName,
replaces: found.configBytes,
oldVersion: found.version,
}

// Delete the namespace config
// Mark the namespace as deleted
time.Sleep(mds.simulatedLatency)
err = txn.Delete(tableNamespaceConfig, found)
if err != nil {
return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err)
}

// Write the changelog that we delete the namespace
time.Sleep(mds.simulatedLatency)
err = txn.Insert(tableNamespaceChangelog, changeLogEntry)
var markedDeleted namespace = *found
markedDeleted.deletedTxn = newChangelogID
err = txn.Insert(tableNamespace, &markedDeleted)
if err != nil {
return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err)
}

// Delete the tuples in this namespace
time.Sleep(mds.simulatedLatency)
_, err = txn.DeleteAll(tableTuple, indexNamespace, nsName)

writeTxnID, err := mds.delete(ctx, txn, &v1.RelationshipFilter{
ResourceType: markedDeleted.name,
})
if err != nil {
return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err)
}

txn.Commit()

return revisionFromVersion(found.version), nil
return revisionFromVersion(writeTxnID), nil
}

func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) {
Expand All @@ -178,7 +161,7 @@ func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceD
txn := db.Txn(false)
defer txn.Abort()

it, err := txn.Get(tableNamespaceConfig, indexID)
it, err := txn.Get(tableNamespace, indexDeletedTxn, deletedTransactionID)
if err != nil {
return nsDefs, err
}
Expand All @@ -200,16 +183,3 @@ func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceD

return nsDefs, nil
}

func nextChangelogID(txn *memdb.Txn) (uint64, error) {
lastChangeRaw, err := txn.Last(tableNamespaceChangelog, indexID)
if err != nil {
return 0, err
}

if lastChangeRaw == nil {
return 1, nil
}

return lastChangeRaw.(*changelog).id + 1, nil
}

0 comments on commit 95b4228

Please sign in to comment.