From f13ae900d799628eaba231aad6f9c06dbbebcbaa Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Fri, 3 Dec 2021 17:06:08 -0500 Subject: [PATCH 1/5] extract out change tracking across revisions from postgres --- internal/datastore/common/changes.go | 101 +++++++ internal/datastore/common/changes_test.go | 313 ++++++++++++++++++++++ internal/datastore/postgres/watch.go | 79 +----- 3 files changed, 419 insertions(+), 74 deletions(-) create mode 100644 internal/datastore/common/changes.go create mode 100644 internal/datastore/common/changes_test.go diff --git a/internal/datastore/common/changes.go b/internal/datastore/common/changes.go new file mode 100644 index 0000000000..a53873bcc0 --- /dev/null +++ b/internal/datastore/common/changes.go @@ -0,0 +1,101 @@ +package common + +import ( + "context" + "sort" + + v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" + "github.com/rs/zerolog/log" + "github.com/shopspring/decimal" + + "github.com/authzed/spicedb/internal/datastore" + "github.com/authzed/spicedb/pkg/tuple" +) + +// Changes represents a set of tuple mutations that are kept self-consistent +// across one or more transaction revisions. +type Changes map[uint64]*changeRecord + +type changeRecord struct { + tupleTouches map[string]*v0.RelationTuple + tupleDeletes map[string]*v0.RelationTuple +} + +// NewChanges creates a new Changes object for change tracking and de-duplication. +func NewChanges() Changes { + return make(Changes) +} + +// AddChange adds a specific change to the complete list of tracked changes +func (ch Changes) AddChange( + ctx context.Context, + revTxID uint64, + tpl *v0.RelationTuple, + op v0.RelationTupleUpdate_Operation, +) { + revisionChanges, ok := ch[revTxID] + if !ok { + revisionChanges = &changeRecord{ + tupleTouches: make(map[string]*v0.RelationTuple), + tupleDeletes: make(map[string]*v0.RelationTuple), + } + ch[revTxID] = revisionChanges + } + + tplKey := tuple.String(tpl) + + switch op { + case v0.RelationTupleUpdate_TOUCH: + // If there was a delete for the same tuple at the same revision, drop it + delete(revisionChanges.tupleDeletes, tplKey) + + revisionChanges.tupleTouches[tplKey] = tpl + + case v0.RelationTupleUpdate_DELETE: + _, alreadyTouched := revisionChanges.tupleTouches[tplKey] + if !alreadyTouched { + revisionChanges.tupleDeletes[tplKey] = tpl + } + default: + log.Ctx(ctx).Fatal().Stringer("operation", op).Msg("unknown change operation") + } +} + +// AsRevisionChanges returns the list of changes processes so far as a datastore watch +// compatible, ordered changelist. +func (ch Changes) AsRevisionChanges() (changes []*datastore.RevisionChanges) { + revisionsWithChanges := make([]uint64, 0, len(ch)) + for revTxID := range ch { + revisionsWithChanges = append(revisionsWithChanges, revTxID) + } + sort.Slice(revisionsWithChanges, func(i int, j int) bool { + return revisionsWithChanges[i] < revisionsWithChanges[j] + }) + + for _, revTxID := range revisionsWithChanges { + revisionChange := &datastore.RevisionChanges{ + Revision: revisionFromTransactionID(revTxID), + } + + revisionChangeRecord := ch[revTxID] + for _, tpl := range revisionChangeRecord.tupleTouches { + revisionChange.Changes = append(revisionChange.Changes, &v0.RelationTupleUpdate{ + Operation: v0.RelationTupleUpdate_TOUCH, + Tuple: tpl, + }) + } + for _, tpl := range revisionChangeRecord.tupleDeletes { + revisionChange.Changes = append(revisionChange.Changes, &v0.RelationTupleUpdate{ + Operation: v0.RelationTupleUpdate_DELETE, + Tuple: tpl, + }) + } + changes = append(changes, revisionChange) + } + + return +} + +func revisionFromTransactionID(txID uint64) datastore.Revision { + return decimal.NewFromInt(int64(txID)) +} diff --git a/internal/datastore/common/changes_test.go b/internal/datastore/common/changes_test.go new file mode 100644 index 0000000000..2778c657f0 --- /dev/null +++ b/internal/datastore/common/changes_test.go @@ -0,0 +1,313 @@ +package common + +import ( + "context" + "sort" + "strings" + "testing" + + v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" + "github.com/shopspring/decimal" + "github.com/stretchr/testify/require" + + "github.com/authzed/spicedb/internal/datastore" + "github.com/authzed/spicedb/pkg/tuple" +) + +const ( + tuple1 = "docs:1#reader@user:1" + tuple2 = "docs:2#editor@user:2" +) + +var ( + rev1 = decimal.NewFromInt(1) + rev2 = decimal.NewFromInt(2) + revOneMillion = decimal.NewFromInt(1_000_000) +) + +func TestChanges(t *testing.T) { + type changeEntry struct { + revision uint64 + relationship string + op v0.RelationTupleUpdate_Operation + } + + testCases := []struct { + name string + script []changeEntry + expected []*datastore.RevisionChanges + }{ + { + "empty", + []changeEntry{}, + []*datastore.RevisionChanges{}, + }, + { + "create", + []changeEntry{ + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1)}}, + }, + }, + { + "delete", + []changeEntry{ + {1, tuple1, v0.RelationTupleUpdate_DELETE}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{del(tuple1)}}, + }, + }, + { + "in-order touch", + []changeEntry{ + {1, tuple1, v0.RelationTupleUpdate_DELETE}, + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1)}}, + }, + }, + { + "reverse-order touch", + []changeEntry{ + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + {1, tuple1, v0.RelationTupleUpdate_DELETE}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1)}}, + }, + }, + { + "create and delete", + []changeEntry{ + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + {1, tuple2, v0.RelationTupleUpdate_DELETE}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1), del(tuple2)}}, + }, + }, + { + "multiple creates", + []changeEntry{ + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + {1, tuple2, v0.RelationTupleUpdate_TOUCH}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1), touch(tuple2)}}, + }, + }, + { + "duplicates", + []changeEntry{ + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1)}}, + }, + }, + { + "create then touch", + []changeEntry{ + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + {2, tuple1, v0.RelationTupleUpdate_DELETE}, + {2, tuple1, v0.RelationTupleUpdate_TOUCH}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1)}}, + {Revision: rev2, Changes: []*v0.RelationTupleUpdate{touch(tuple1)}}, + }, + }, + { + "big revision gap", + []changeEntry{ + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + {1_000_000, tuple1, v0.RelationTupleUpdate_DELETE}, + {1_000_000, tuple1, v0.RelationTupleUpdate_TOUCH}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1)}}, + {Revision: revOneMillion, Changes: []*v0.RelationTupleUpdate{touch(tuple1)}}, + }, + }, + { + "out of order", + []changeEntry{ + {1_000_000, tuple1, v0.RelationTupleUpdate_TOUCH}, + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + {1_000_000, tuple1, v0.RelationTupleUpdate_DELETE}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1)}}, + {Revision: revOneMillion, Changes: []*v0.RelationTupleUpdate{touch(tuple1)}}, + }, + }, + { + "kitchen sink", + []changeEntry{ + {1, tuple1, v0.RelationTupleUpdate_TOUCH}, + {2, tuple1, v0.RelationTupleUpdate_DELETE}, + {1_000_000, tuple1, v0.RelationTupleUpdate_TOUCH}, + + {1, tuple2, v0.RelationTupleUpdate_DELETE}, + {2, tuple2, v0.RelationTupleUpdate_TOUCH}, + {1_000_000, tuple2, v0.RelationTupleUpdate_DELETE}, + {1_000_000, tuple2, v0.RelationTupleUpdate_TOUCH}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1), del(tuple2)}}, + {Revision: rev2, Changes: []*v0.RelationTupleUpdate{touch(tuple2), del(tuple1)}}, + {Revision: revOneMillion, Changes: []*v0.RelationTupleUpdate{touch(tuple1), touch(tuple2)}}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require := require.New(t) + + ctx := context.Background() + ch := NewChanges() + for _, step := range tc.script { + rel := tuple.MustParse(step.relationship) + ch.AddChange(ctx, step.revision, rel, step.op) + } + + require.Equal(canonicalize(tc.expected), canonicalize(ch.AsRevisionChanges())) + }) + } +} + +func TestCanonicalize(t *testing.T) { + testCases := []struct { + name string + input, expected []*datastore.RevisionChanges + }{ + { + "empty", + []*datastore.RevisionChanges{}, + []*datastore.RevisionChanges{}, + }, + { + "single entries", + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1)}}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1)}}, + }, + }, + { + "tuples out of order", + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{del(tuple2), touch(tuple1)}}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1), del(tuple2)}}, + }, + }, + { + "operations out of order", + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{del(tuple1), touch(tuple1)}}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1), del(tuple1)}}, + }, + }, + { + "equal entries", + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1), touch(tuple1)}}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1), touch(tuple1)}}, + }, + }, + { + "already canonical", + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1), del(tuple2)}}, + {Revision: rev2, Changes: []*v0.RelationTupleUpdate{del(tuple1), touch(tuple2)}}, + {Revision: revOneMillion, Changes: []*v0.RelationTupleUpdate{touch(tuple1), touch(tuple2)}}, + }, + []*datastore.RevisionChanges{ + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1), del(tuple2)}}, + {Revision: rev2, Changes: []*v0.RelationTupleUpdate{del(tuple1), touch(tuple2)}}, + {Revision: revOneMillion, Changes: []*v0.RelationTupleUpdate{touch(tuple1), touch(tuple2)}}, + }, + }, + { + "revisions allowed out of order", + []*datastore.RevisionChanges{ + {Revision: revOneMillion, Changes: []*v0.RelationTupleUpdate{touch(tuple1), touch(tuple2)}}, + {Revision: rev2, Changes: []*v0.RelationTupleUpdate{del(tuple1), touch(tuple2)}}, + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1), del(tuple2)}}, + }, + []*datastore.RevisionChanges{ + {Revision: revOneMillion, Changes: []*v0.RelationTupleUpdate{touch(tuple1), touch(tuple2)}}, + {Revision: rev2, Changes: []*v0.RelationTupleUpdate{del(tuple1), touch(tuple2)}}, + {Revision: rev1, Changes: []*v0.RelationTupleUpdate{touch(tuple1), del(tuple2)}}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require := require.New(t) + require.Equal(tc.expected, canonicalize(tc.input)) + }) + } +} + +func touch(relationship string) *v0.RelationTupleUpdate { + return &v0.RelationTupleUpdate{ + Operation: v0.RelationTupleUpdate_TOUCH, + Tuple: tuple.MustParse(relationship), + } +} + +func del(relationship string) *v0.RelationTupleUpdate { + return &v0.RelationTupleUpdate{ + Operation: v0.RelationTupleUpdate_DELETE, + Tuple: tuple.MustParse(relationship), + } +} + +func canonicalize(in []*datastore.RevisionChanges) []*datastore.RevisionChanges { + out := make([]*datastore.RevisionChanges, 0, len(in)) + + for _, rev := range in { + outChanges := make([]*v0.RelationTupleUpdate, 0, len(rev.Changes)) + + outChanges = append(outChanges, rev.Changes...) + sort.Slice(outChanges, func(i, j int) bool { + // Return if i < j + left, right := outChanges[i], outChanges[j] + tupleCompareResult := strings.Compare(tuple.String(left.Tuple), tuple.String(right.Tuple)) + if tupleCompareResult < 0 { + return true + } + if tupleCompareResult > 0 { + return false + } + + // Tuples are equal, sort by op + return left.Operation < right.Operation + }) + + out = append(out, &datastore.RevisionChanges{ + Revision: rev.Revision, + Changes: outChanges, + }) + } + + return out +} diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index 4a39a28695..fe6073f18f 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -3,15 +3,13 @@ package postgres import ( "context" "errors" - "sort" "time" sq "github.com/Masterminds/squirrel" v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" - "github.com/rs/zerolog/log" "github.com/authzed/spicedb/internal/datastore" - "github.com/authzed/spicedb/pkg/tuple" + "github.com/authzed/spicedb/internal/datastore/common" ) const ( @@ -116,7 +114,7 @@ func (pgd *pgDatastore) loadChanges( return } - stagedChanges := make(map[uint64]*changeRecord) + stagedChanges := common.NewChanges() for rows.Next() { userset := &v0.ObjectAndRelation{} @@ -146,85 +144,18 @@ func (pgd *pgDatastore) loadChanges( } if createdTxn > afterRevision && createdTxn <= newRevision { - addChange(ctx, stagedChanges, createdTxn, tpl, v0.RelationTupleUpdate_TOUCH) + stagedChanges.AddChange(ctx, createdTxn, tpl, v0.RelationTupleUpdate_TOUCH) } if deletedTxn > afterRevision && deletedTxn <= newRevision { - addChange(ctx, stagedChanges, deletedTxn, tpl, v0.RelationTupleUpdate_DELETE) + stagedChanges.AddChange(ctx, deletedTxn, tpl, v0.RelationTupleUpdate_DELETE) } } if err = rows.Err(); err != nil { return } - revisionsWithChanges := make([]uint64, 0, len(stagedChanges)) - for k := range stagedChanges { - revisionsWithChanges = append(revisionsWithChanges, k) - } - sort.Slice(revisionsWithChanges, func(i int, j int) bool { - return revisionsWithChanges[i] < revisionsWithChanges[j] - }) - - for _, rev := range revisionsWithChanges { - revisionChange := &datastore.RevisionChanges{ - Revision: revisionFromTransaction(rev), - } - - revisionChangeRecord := stagedChanges[rev] - for _, tpl := range revisionChangeRecord.tupleTouches { - revisionChange.Changes = append(revisionChange.Changes, &v0.RelationTupleUpdate{ - Operation: v0.RelationTupleUpdate_TOUCH, - Tuple: tpl, - }) - } - for _, tpl := range revisionChangeRecord.tupleDeletes { - revisionChange.Changes = append(revisionChange.Changes, &v0.RelationTupleUpdate{ - Operation: v0.RelationTupleUpdate_DELETE, - Tuple: tpl, - }) - } - changes = append(changes, revisionChange) - } + changes = stagedChanges.AsRevisionChanges() return } - -type changeRecord struct { - tupleTouches map[string]*v0.RelationTuple - tupleDeletes map[string]*v0.RelationTuple -} - -func addChange( - ctx context.Context, - changes map[uint64]*changeRecord, - revision uint64, - tpl *v0.RelationTuple, - op v0.RelationTupleUpdate_Operation, -) { - revisionChanges, ok := changes[revision] - if !ok { - revisionChanges = &changeRecord{ - tupleTouches: make(map[string]*v0.RelationTuple), - tupleDeletes: make(map[string]*v0.RelationTuple), - } - changes[revision] = revisionChanges - } - - tplKey := tuple.String(tpl) - - switch op { - case v0.RelationTupleUpdate_TOUCH: - // If there was a delete for the same tuple at the same revision, drop it - delete(revisionChanges.tupleDeletes, tplKey) - - revisionChanges.tupleTouches[tplKey] = tpl - - case v0.RelationTupleUpdate_DELETE: - _, alreadyTouched := revisionChanges.tupleTouches[tplKey] - if !alreadyTouched { - revisionChanges.tupleDeletes[tplKey] = tpl - } - default: - log.Ctx(ctx).Fatal().Stringer("operation", op).Msg("unknown change operation") - } -} From 3e33ff9c78993c9a9476e5b1b6fc137909676651 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Fri, 3 Dec 2021 17:07:52 -0500 Subject: [PATCH 2/5] test that namespaces are removed from list when deleted --- internal/datastore/common/changes.go | 4 ++-- internal/datastore/test/namespace.go | 6 ++++++ internal/datastore/test/watch.go | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/internal/datastore/common/changes.go b/internal/datastore/common/changes.go index a53873bcc0..4a04bb3625 100644 --- a/internal/datastore/common/changes.go +++ b/internal/datastore/common/changes.go @@ -61,8 +61,8 @@ func (ch Changes) AddChange( } } -// AsRevisionChanges returns the list of changes processes so far as a datastore watch -// compatible, ordered changelist. +// AsRevisionChanges returns the list of changes processed so far as a datastore watch +// compatible, ordered, changelist. func (ch Changes) AsRevisionChanges() (changes []*datastore.RevisionChanges) { revisionsWithChanges := make([]uint64, 0, len(ch)) for revTxID := range ch { diff --git a/internal/datastore/test/namespace.go b/internal/datastore/test/namespace.go index 2c5d90b6bf..e459bd0604 100644 --- a/internal/datastore/test/namespace.go +++ b/internal/datastore/test/namespace.go @@ -101,6 +101,12 @@ func NamespaceDeleteTest(t *testing.T, tester DatastoreTester) { require.True(ver.GreaterThan(datastore.NoRevision)) require.NoError(err) + allNamespaces, err := ds.ListNamespaces(ctx) + require.NoError(err) + for _, ns := range allNamespaces { + require.NotEqual(testfixtures.DocumentNS.Name, ns.Name, "deleted namespace '%s' should not be in namespace list", ns.Name) + } + deletedRevision, err := ds.SyncRevision(ctx) require.NoError(err) diff --git a/internal/datastore/test/watch.go b/internal/datastore/test/watch.go index f636e218b4..14e687f7d5 100644 --- a/internal/datastore/test/watch.go +++ b/internal/datastore/test/watch.go @@ -177,7 +177,7 @@ func WatchCancelTest(t *testing.T, tester DatastoreTester) { case created, ok := <-changes: if ok { require.Equal( - []*v0.RelationTupleUpdate{tuple.Create(makeTestTuple("test", "test"))}, + []*v0.RelationTupleUpdate{tuple.Touch(makeTestTuple("test", "test"))}, created.Changes, ) require.True(created.Revision.GreaterThan(datastore.NoRevision)) From a1cef577e382b957b14a8546adb04a300aaf5c2a Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Fri, 3 Dec 2021 17:08:22 -0500 Subject: [PATCH 3/5] fix v1 watch test to not delete non-existent relationship --- internal/services/v1/watch_test.go | 55 +++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/internal/services/v1/watch_test.go b/internal/services/v1/watch_test.go index 1bae97d3d2..529484eb47 100644 --- a/internal/services/v1/watch_test.go +++ b/internal/services/v1/watch_test.go @@ -5,6 +5,8 @@ import ( "fmt" "io" "net" + "sort" + "strings" "testing" "time" @@ -21,6 +23,7 @@ import ( "github.com/authzed/spicedb/internal/datastore" "github.com/authzed/spicedb/internal/datastore/memdb" "github.com/authzed/spicedb/internal/testfixtures" + "github.com/authzed/spicedb/pkg/tuple" "github.com/authzed/spicedb/pkg/zedtoken" ) @@ -64,12 +67,12 @@ func TestWatch(t *testing.T) { expectedCode: codes.OK, mutations: []*v1.RelationshipUpdate{ update(v1.RelationshipUpdate_OPERATION_CREATE, "document", "document1", "viewer", "user", "user1"), - update(v1.RelationshipUpdate_OPERATION_DELETE, "folder", "folder1", "viewer", "user", "user1"), + update(v1.RelationshipUpdate_OPERATION_DELETE, "folder", "auditors", "viewer", "user", "auditor"), update(v1.RelationshipUpdate_OPERATION_TOUCH, "folder", "folder2", "viewer", "user", "user1"), }, expectedUpdates: []*v1.RelationshipUpdate{ - update(v1.RelationshipUpdate_OPERATION_CREATE, "document", "document1", "viewer", "user", "user1"), - update(v1.RelationshipUpdate_OPERATION_DELETE, "folder", "folder1", "viewer", "user", "user1"), + update(v1.RelationshipUpdate_OPERATION_TOUCH, "document", "document1", "viewer", "user", "user1"), + update(v1.RelationshipUpdate_OPERATION_DELETE, "folder", "auditors", "viewer", "user", "auditor"), update(v1.RelationshipUpdate_OPERATION_TOUCH, "folder", "folder2", "viewer", "user", "user1"), }, }, @@ -80,10 +83,10 @@ func TestWatch(t *testing.T) { mutations: []*v1.RelationshipUpdate{ update(v1.RelationshipUpdate_OPERATION_CREATE, "document", "document1", "viewer", "user", "user1"), update(v1.RelationshipUpdate_OPERATION_TOUCH, "document", "document2", "viewer", "user", "user1"), - update(v1.RelationshipUpdate_OPERATION_DELETE, "folder", "folder1", "viewer", "user", "user1"), + update(v1.RelationshipUpdate_OPERATION_DELETE, "folder", "auditors", "viewer", "user", "auditor"), }, expectedUpdates: []*v1.RelationshipUpdate{ - update(v1.RelationshipUpdate_OPERATION_CREATE, "document", "document1", "viewer", "user", "user1"), + update(v1.RelationshipUpdate_OPERATION_TOUCH, "document", "document1", "viewer", "user", "user1"), update(v1.RelationshipUpdate_OPERATION_TOUCH, "document", "document2", "viewer", "user", "user1"), }, }, @@ -106,10 +109,7 @@ func TestWatch(t *testing.T) { rawDS, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC, 0) require.NoError(err) - ds, _ := testfixtures.StandardDatastoreWithSchema(rawDS, require) - - revision, err := ds.SyncRevision(context.Background()) - require.NoError(err) + ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) require.True(revision.GreaterThan(decimal.Zero)) client, stop := newWatchServicer(require, ds) @@ -162,16 +162,18 @@ func TestWatch(t *testing.T) { require.NoError(err) var receivedUpdates []*v1.RelationshipUpdate - select { - case updates := <-updatesChan: - receivedUpdates = updates - case <-time.After(3 * time.Second): - require.FailNow("timed out waiting for updates") - return - } - require.Equal(len(tc.expectedUpdates), len(receivedUpdates)) + for len(receivedUpdates) < len(tc.expectedUpdates) { + select { + case updates := <-updatesChan: + receivedUpdates = append(receivedUpdates, updates...) + case <-time.After(1 * time.Second): + require.FailNow("timed out waiting for updates") + return + } + } + require.Equal(sortUpdates(tc.expectedUpdates), sortUpdates(receivedUpdates)) } else { _, err := stream.Recv() grpcutil.RequireStatus(t, tc.expectedCode, err) @@ -206,3 +208,22 @@ func newWatchServicer( require.NoError(lis.Close()) } } + +func sortUpdates(in []*v1.RelationshipUpdate) []*v1.RelationshipUpdate { + out := make([]*v1.RelationshipUpdate, 0, len(in)) + out = append(out, in...) + sort.Slice(out, func(i, j int) bool { + left, right := out[i], out[j] + compareResult := strings.Compare(tuple.MustRelString(left.Relationship), tuple.MustRelString(right.Relationship)) + if compareResult < 0 { + return true + } + if compareResult > 0 { + return false + } + + return left.Operation < right.Operation + }) + + return out +} From d6dadff82a628f1528ce78321d8a73144a38d230 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Fri, 3 Dec 2021 17:08:49 -0500 Subject: [PATCH 4/5] change memdb datastore to use a single transaction log for namespaces and relationships --- internal/datastore/memdb/memdb.go | 126 ++++++++++++++-------- internal/datastore/memdb/namespace.go | 82 +++++--------- internal/datastore/memdb/query.go | 12 +-- internal/datastore/memdb/reverse_query.go | 12 +-- internal/datastore/memdb/tuple.go | 96 +++++++---------- internal/datastore/memdb/watch.go | 38 ++++--- 6 files changed, 180 insertions(+), 186 deletions(-) diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index 0c258895c1..6a032c4e81 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -21,10 +21,9 @@ import ( const DisableGC = time.Duration(math.MaxInt64) const ( - tableTuple = "tuple" - tableChangelog = "changelog" - tableNamespaceChangelog = "namespaceChangelog" - tableNamespaceConfig = "namespaceConfig" + tableRelationship = "relationship" + tableTransaction = "transaction" + tableNamespace = "namespaceConfig" indexID = "id" indexTimestamp = "timestamp" @@ -38,32 +37,29 @@ const ( indexUsersetNamespace = "usersetNamespace" indexUsersetRelation = "usersetRelation" indexUserset = "userset" + indexCreatedTxn = "createdTxn" + indexDeletedTxn = "deletedTxn" defaultWatchBufferLength = 128 + deletedTransactionID = ^uint64(0) + errUnableToInstantiateTuplestore = "unable to instantiate datastore: %w" ) -type changelog struct { - id uint64 - name string - replaces []byte - oldVersion uint64 -} - type namespace struct { name string configBytes []byte - version uint64 + createdTxn uint64 + deletedTxn uint64 } -type tupleChangelog struct { +type transaction struct { id uint64 timestamp uint64 - changes []*v0.RelationTupleUpdate } -type tupleEntry struct { +type relationship struct { namespace string objectID string relation string @@ -74,8 +70,8 @@ type tupleEntry struct { deletedTxn uint64 } -func tupleEntryFromRelationship(r *v1.Relationship, created, deleted uint64) *tupleEntry { - return &tupleEntry{ +func tupleEntryFromRelationship(r *v1.Relationship, created, deleted uint64) *relationship { + return &relationship{ namespace: r.Resource.ObjectType, objectID: r.Resource.ObjectId, relation: r.Relation, @@ -87,7 +83,7 @@ func tupleEntryFromRelationship(r *v1.Relationship, created, deleted uint64) *tu } } -func (t tupleEntry) Relationship() *v1.Relationship { +func (t relationship) Relationship() *v1.Relationship { return &v1.Relationship{ Resource: &v1.ObjectReference{ ObjectType: t.namespace, @@ -104,7 +100,7 @@ func (t tupleEntry) Relationship() *v1.Relationship { } } -func (t tupleEntry) RelationTuple() *v0.RelationTuple { +func (t relationship) RelationTuple() *v0.RelationTuple { return &v0.RelationTuple{ ObjectAndRelation: &v0.ObjectAndRelation{ Namespace: t.namespace, @@ -119,7 +115,7 @@ func (t tupleEntry) RelationTuple() *v0.RelationTuple { } } -func (t tupleEntry) String() string { +func (t relationship) String() string { return fmt.Sprintf( "%s:%s#%s@%s:%s#%s[%d-%d)", t.namespace, @@ -135,28 +131,38 @@ func (t tupleEntry) String() string { var schema = &memdb.DBSchema{ Tables: map[string]*memdb.TableSchema{ - tableNamespaceChangelog: { - Name: tableNamespaceChangelog, + tableNamespace: { + Name: tableNamespace, Indexes: map[string]*memdb.IndexSchema{ indexID: { - Name: indexID, - Unique: true, - Indexer: &memdb.UintFieldIndex{Field: "id"}, + Name: indexID, + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "name"}, + &memdb.UintFieldIndex{Field: "createdTxn"}, + }, + }, }, - }, - }, - tableNamespaceConfig: { - Name: tableNamespaceConfig, - Indexes: map[string]*memdb.IndexSchema{ - indexID: { - Name: indexID, - Unique: true, - Indexer: &memdb.StringFieldIndex{Field: "name"}, + indexLive: { + Name: indexLive, + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "name"}, + &memdb.UintFieldIndex{Field: "deletedTxn"}, + }, + }, + }, + indexDeletedTxn: { + Name: indexDeletedTxn, + Unique: false, + Indexer: &memdb.UintFieldIndex{Field: "deletedTxn"}, }, }, }, - tableChangelog: { - Name: tableChangelog, + tableTransaction: { + Name: tableTransaction, Indexes: map[string]*memdb.IndexSchema{ indexID: { Name: indexID, @@ -170,8 +176,8 @@ var schema = &memdb.DBSchema{ }, }, }, - tableTuple: { - Name: tableTuple, + tableRelationship: { + Name: tableRelationship, Indexes: map[string]*memdb.IndexSchema{ indexID: { Name: indexID, @@ -294,6 +300,16 @@ var schema = &memdb.DBSchema{ }, }, }, + indexCreatedTxn: { + Name: indexCreatedTxn, + Unique: false, + Indexer: &memdb.UintFieldIndex{Field: "createdTxn"}, + }, + indexDeletedTxn: { + Name: indexDeletedTxn, + Unique: false, + Indexer: &memdb.UintFieldIndex{Field: "deletedTxn"}, + }, }, }, }, @@ -333,19 +349,11 @@ func NewMemdbDatastore( // Add a changelog entry to make the first revision non-zero, matching the other datastore // implementations. - newChangelogID, err := nextTupleChangelogID(txn) + _, err = createNewTransaction(txn) if err != nil { return nil, fmt.Errorf(errUnableToInstantiateTuplestore, err) } - newChangelogEntry := &tupleChangelog{ - id: newChangelogID, - timestamp: uint64(time.Now().UnixNano()), - } - if err := txn.Insert(tableChangelog, newChangelogEntry); err != nil { - return nil, fmt.Errorf(errUnableToInstantiateTuplestore, err) - } - txn.Commit() if watchBufferLength == 0 { @@ -374,3 +382,27 @@ func (mds *memdbDatastore) Close() error { mds.db = nil return nil } + +func createNewTransaction(txn *memdb.Txn) (uint64, error) { + var newTransactionID uint64 = 1 + + lastChangeRaw, err := txn.Last(tableTransaction, indexID) + if err != nil { + return 0, err + } + + if lastChangeRaw != nil { + newTransactionID = lastChangeRaw.(*transaction).id + 1 + } + + newChangelogEntry := &transaction{ + id: newTransactionID, + timestamp: uint64(time.Now().UnixNano()), + } + + if err := txn.Insert(tableTransaction, newChangelogEntry); err != nil { + return 0, err + } + + return newTransactionID, nil +} diff --git a/internal/datastore/memdb/namespace.go b/internal/datastore/memdb/namespace.go index 96729fb176..369eb0aad6 100644 --- a/internal/datastore/memdb/namespace.go +++ b/internal/datastore/memdb/namespace.go @@ -6,7 +6,7 @@ import ( "time" v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" - "github.com/hashicorp/go-memdb" + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" "google.golang.org/protobuf/proto" "github.com/authzed/spicedb/internal/datastore" @@ -28,22 +28,23 @@ func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Nam defer txn.Abort() time.Sleep(mds.simulatedLatency) - newVersion, err := nextChangelogID(txn) + newVersion, err := createNewTransaction(txn) if err != nil { return datastore.NoRevision, fmt.Errorf(errUnableToWriteConfig, err) } - foundRaw, err := txn.First(tableNamespaceConfig, indexID, newConfig.Name) + foundRaw, err := txn.First(tableNamespace, indexLive, newConfig.Name, deletedTransactionID) if err != nil { return datastore.NoRevision, fmt.Errorf(errUnableToWriteConfig, err) } - var replacing []byte - var oldVersion uint64 if foundRaw != nil { - found := foundRaw.(*namespace) - replacing = found.configBytes - oldVersion = found.version + // Mark the old one as deleted + var toDelete namespace = *(foundRaw.(*namespace)) + toDelete.deletedTxn = newVersion + if err := txn.Insert(tableNamespace, &toDelete); err != nil { + return datastore.NoRevision, fmt.Errorf(errUnableToWriteConfig, err) + } } serialized, err := proto.Marshal(newConfig) @@ -54,22 +55,12 @@ func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Nam newConfigEntry := &namespace{ name: newConfig.Name, configBytes: serialized, - version: newVersion, - } - changeLogEntry := &changelog{ - id: newVersion, - name: newConfig.Name, - replaces: replacing, - oldVersion: oldVersion, + createdTxn: newVersion, + deletedTxn: deletedTransactionID, } time.Sleep(mds.simulatedLatency) - if err := txn.Insert(tableNamespaceConfig, newConfigEntry); err != nil { - return datastore.NoRevision, fmt.Errorf(errUnableToWriteConfig, err) - } - - time.Sleep(mds.simulatedLatency) - if err := txn.Insert(tableNamespaceChangelog, changeLogEntry); err != nil { + if err := txn.Insert(tableNamespace, newConfigEntry); err != nil { return datastore.NoRevision, fmt.Errorf(errUnableToWriteConfig, err) } @@ -89,7 +80,7 @@ func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v defer txn.Abort() time.Sleep(mds.simulatedLatency) - foundRaw, err := txn.First(tableNamespaceConfig, indexID, nsName) + foundRaw, err := txn.First(tableNamespace, indexLive, nsName, deletedTransactionID) if err != nil { return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err) } @@ -105,7 +96,7 @@ func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err) } - return &loaded, revisionFromVersion(found.version), nil + return &loaded, revisionFromVersion(found.createdTxn), nil } func (mds *memdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (datastore.Revision, error) { @@ -118,7 +109,7 @@ func (mds *memdbDatastore) DeleteNamespace(ctx context.Context, nsName string) ( defer txn.Abort() time.Sleep(mds.simulatedLatency) - foundRaw, err := txn.First(tableNamespaceConfig, indexID, nsName) + foundRaw, err := txn.First(tableNamespace, indexLive, nsName, deletedTransactionID) if err != nil { return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err) } @@ -129,42 +120,34 @@ func (mds *memdbDatastore) DeleteNamespace(ctx context.Context, nsName string) ( found := foundRaw.(*namespace) time.Sleep(mds.simulatedLatency) - newChangelogID, err := nextChangelogID(txn) + newChangelogID, err := createNewTransaction(txn) if err != nil { return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err) } - changeLogEntry := &changelog{ - id: newChangelogID, - name: nsName, - replaces: found.configBytes, - oldVersion: found.version, - } - - // Delete the namespace config + // Mark the namespace as deleted time.Sleep(mds.simulatedLatency) - err = txn.Delete(tableNamespaceConfig, found) - if err != nil { - return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err) - } - // Write the changelog that we delete the namespace - time.Sleep(mds.simulatedLatency) - err = txn.Insert(tableNamespaceChangelog, changeLogEntry) + var markedDeleted namespace = *found + markedDeleted.deletedTxn = newChangelogID + err = txn.Insert(tableNamespace, &markedDeleted) if err != nil { return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err) } // Delete the tuples in this namespace time.Sleep(mds.simulatedLatency) - _, err = txn.DeleteAll(tableTuple, indexNamespace, nsName) + + writeTxnID, err := mds.delete(ctx, txn, &v1.RelationshipFilter{ + ResourceType: markedDeleted.name, + }) if err != nil { return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err) } txn.Commit() - return revisionFromVersion(found.version), nil + return revisionFromVersion(writeTxnID), nil } func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) { @@ -178,7 +161,7 @@ func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceD txn := db.Txn(false) defer txn.Abort() - it, err := txn.Get(tableNamespaceConfig, indexID) + it, err := txn.Get(tableNamespace, indexDeletedTxn, deletedTransactionID) if err != nil { return nsDefs, err } @@ -200,16 +183,3 @@ func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceD return nsDefs, nil } - -func nextChangelogID(txn *memdb.Txn) (uint64, error) { - lastChangeRaw, err := txn.Last(tableNamespaceChangelog, indexID) - if err != nil { - return 0, err - } - - if lastChangeRaw == nil { - return 1, nil - } - - return lastChangeRaw.(*changelog).id + 1, nil -} diff --git a/internal/datastore/memdb/query.go b/internal/datastore/memdb/query.go index 4b47b178b6..76b46c59d7 100644 --- a/internal/datastore/memdb/query.go +++ b/internal/datastore/memdb/query.go @@ -69,14 +69,14 @@ func iteratorForFilter(txn *memdb.Txn, filter *v1.RelationshipFilter) (memdb.Res switch { case filter.OptionalResourceId != "": return txn.Get( - tableTuple, + tableRelationship, indexNamespaceAndObjectID, filter.ResourceType, filter.OptionalResourceId, ) case filter.OptionalSubjectFilter != nil && filter.OptionalSubjectFilter.OptionalSubjectId != "": return txn.Get( - tableTuple, + tableRelationship, indexNamespaceAndUsersetID, filter.ResourceType, filter.OptionalSubjectFilter.SubjectType, @@ -84,14 +84,14 @@ func iteratorForFilter(txn *memdb.Txn, filter *v1.RelationshipFilter) (memdb.Res ) case filter.OptionalRelation != "": return txn.Get( - tableTuple, + tableRelationship, indexNamespaceAndRelation, filter.ResourceType, filter.OptionalRelation, ) } - return txn.Get(tableTuple, indexNamespace, filter.ResourceType) + return txn.Get(tableRelationship, indexNamespace, filter.ResourceType) } func (mtq memdbTupleQuery) Execute(ctx context.Context) (datastore.TupleIterator, error) { @@ -118,7 +118,7 @@ func (mtq memdbTupleQuery) Execute(ctx context.Context) (datastore.TupleIterator } filteredIterator := memdb.NewFilterIterator(bestIterator, func(tupleRaw interface{}) bool { - tuple := tupleRaw.(*tupleEntry) + tuple := tupleRaw.(*relationship) filter := relationshipFilter switch { @@ -191,7 +191,7 @@ func (mti *memdbTupleIterator) Next() *v0.RelationTuple { } mti.count++ - return foundRaw.(*tupleEntry).RelationTuple() + return foundRaw.(*relationship).RelationTuple() } func (mti *memdbTupleIterator) Err() error { diff --git a/internal/datastore/memdb/reverse_query.go b/internal/datastore/memdb/reverse_query.go index 03afb5c0f7..5de45f6000 100644 --- a/internal/datastore/memdb/reverse_query.go +++ b/internal/datastore/memdb/reverse_query.go @@ -53,7 +53,7 @@ func (mtq memdbReverseTupleQuery) Execute(ctx context.Context) (datastore.TupleI if mtq.objNamespaceName != "" { if mtq.subObjectID != "" { bestIterator, err = txn.Get( - tableTuple, + tableRelationship, indexRelationAndUserset, mtq.subNamespaceName, mtq.subObjectID, @@ -63,7 +63,7 @@ func (mtq memdbReverseTupleQuery) Execute(ctx context.Context) (datastore.TupleI ) } else { bestIterator, err = txn.Get( - tableTuple, + tableRelationship, indexRelationAndRelation, mtq.subNamespaceName, mtq.subRelationName, @@ -73,7 +73,7 @@ func (mtq memdbReverseTupleQuery) Execute(ctx context.Context) (datastore.TupleI } } else if mtq.subObjectID != "" { bestIterator, err = txn.Get( - tableTuple, + tableRelationship, indexUserset, mtq.subNamespaceName, mtq.subObjectID, @@ -81,14 +81,14 @@ func (mtq memdbReverseTupleQuery) Execute(ctx context.Context) (datastore.TupleI ) } else if mtq.subRelationName != "" { bestIterator, err = txn.Get( - tableTuple, + tableRelationship, indexUsersetRelation, mtq.subNamespaceName, mtq.subRelationName, ) } else { bestIterator, err = txn.Get( - tableTuple, + tableRelationship, indexUsersetNamespace, mtq.subNamespaceName, ) @@ -100,7 +100,7 @@ func (mtq memdbReverseTupleQuery) Execute(ctx context.Context) (datastore.TupleI } filteredIterator := memdb.NewFilterIterator(bestIterator, func(tupleRaw interface{}) bool { - tuple := tupleRaw.(*tupleEntry) + tuple := tupleRaw.(*relationship) if uint64(mtq.revision.IntPart()) < tuple.createdTxn || uint64(mtq.revision.IntPart()) >= tuple.deletedTxn { return true } diff --git a/internal/datastore/memdb/tuple.go b/internal/datastore/memdb/tuple.go index edb06e6a0c..5d9877e7c6 100644 --- a/internal/datastore/memdb/tuple.go +++ b/internal/datastore/memdb/tuple.go @@ -12,7 +12,6 @@ import ( "github.com/jzelinskie/stringz" "github.com/authzed/spicedb/internal/datastore" - "github.com/authzed/spicedb/pkg/tuple" ) const ( @@ -23,8 +22,6 @@ const ( errCheckRevision = "unable to check revision: %w" ) -const deletedTransactionID = ^uint64(0) - func (mds *memdbDatastore) checkPrecondition(txn *memdb.Txn, preconditions []*v1.Precondition) error { for _, precond := range preconditions { switch precond.Operation { @@ -75,26 +72,11 @@ func (mds *memdbDatastore) WriteTuples(ctx context.Context, preconditions []*v1. func (mds *memdbDatastore) write(ctx context.Context, txn *memdb.Txn, mutations []*v1.RelationshipUpdate) (uint64, error) { // Create the changelog entry time.Sleep(mds.simulatedLatency) - newChangelogID, err := nextTupleChangelogID(txn) + newTxnID, err := createNewTransaction(txn) if err != nil { return 0, err } - changes := make([]*v0.RelationTupleUpdate, 0, len(mutations)) - for _, mut := range mutations { - changes = append(changes, tuple.UpdateFromRelationshipUpdate(mut)) - } - - newChangelogEntry := &tupleChangelog{ - id: newChangelogID, - timestamp: uint64(time.Now().UnixNano()), - changes: changes, - } - - if err := txn.Insert(tableChangelog, newChangelogEntry); err != nil { - return 0, err - } - // Apply the mutations for _, mutation := range mutations { existing, err := findRelationship(txn, mutation.Relationship) @@ -102,35 +84,35 @@ func (mds *memdbDatastore) write(ctx context.Context, txn *memdb.Txn, mutations return 0, err } - var deletedExisting tupleEntry + var deletedExisting relationship if existing != nil { deletedExisting = *existing - deletedExisting.deletedTxn = newChangelogID + deletedExisting.deletedTxn = newTxnID } - newVersion := tupleEntryFromRelationship(mutation.Relationship, newChangelogID, deletedTransactionID) + newVersion := tupleEntryFromRelationship(mutation.Relationship, newTxnID, deletedTransactionID) switch mutation.Operation { case v1.RelationshipUpdate_OPERATION_CREATE: if existing != nil { return 0, fmt.Errorf("duplicate relationship found for create operation") } - if err := txn.Insert(tableTuple, newVersion); err != nil { + if err := txn.Insert(tableRelationship, newVersion); err != nil { return 0, err } case v1.RelationshipUpdate_OPERATION_DELETE: if existing != nil { - if err := txn.Insert(tableTuple, &deletedExisting); err != nil { + if err := txn.Insert(tableRelationship, &deletedExisting); err != nil { return 0, err } } case v1.RelationshipUpdate_OPERATION_TOUCH: if existing != nil { - if err := txn.Insert(tableTuple, &deletedExisting); err != nil { + if err := txn.Insert(tableRelationship, &deletedExisting); err != nil { return 0, err } } - if err := txn.Insert(tableTuple, newVersion); err != nil { + if err := txn.Insert(tableRelationship, newVersion); err != nil { return 0, err } default: @@ -138,7 +120,7 @@ func (mds *memdbDatastore) write(ctx context.Context, txn *memdb.Txn, mutations } } - return newChangelogID, nil + return newTxnID, nil } func (mds *memdbDatastore) DeleteRelationships(ctx context.Context, preconditions []*v1.Precondition, filter *v1.RelationshipFilter) (datastore.Revision, error) { @@ -154,10 +136,21 @@ func (mds *memdbDatastore) DeleteRelationships(ctx context.Context, precondition return datastore.NoRevision, fmt.Errorf(errUnableToDeleteTuples, err) } + newChangelogID, err := mds.delete(ctx, txn, filter) + if err != nil { + return datastore.NoRevision, fmt.Errorf(errUnableToDeleteTuples, err) + } + + txn.Commit() + + return revisionFromVersion(newChangelogID), nil +} + +func (mds *memdbDatastore) delete(ctx context.Context, txn *memdb.Txn, filter *v1.RelationshipFilter) (uint64, error) { // Create an iterator to find the relevant tuples bestIter, err := iteratorForFilter(txn, filter) if err != nil { - return datastore.NoRevision, fmt.Errorf(errUnableToDeleteTuples, err) + return 0, err } filteredIter := memdb.NewFilterIterator(bestIter, relationshipFilterFilterFunc(filter)) @@ -166,18 +159,16 @@ func (mds *memdbDatastore) DeleteRelationships(ctx context.Context, precondition for row := filteredIter.Next(); row != nil; row = filteredIter.Next() { mutations = append(mutations, &v1.RelationshipUpdate{ Operation: v1.RelationshipUpdate_OPERATION_DELETE, - Relationship: row.(*tupleEntry).Relationship(), + Relationship: row.(*relationship).Relationship(), }) } - newChangelogID, err := mds.write(ctx, txn, mutations) + newTxnID, err := mds.write(ctx, txn, mutations) if err != nil { - return datastore.NoRevision, fmt.Errorf(errUnableToDeleteTuples, err) + return 0, err } - txn.Commit() - - return revisionFromVersion(newChangelogID), nil + return newTxnID, nil } func (mds *memdbDatastore) QueryTuples(filter datastore.TupleQueryResourceFilter, revision datastore.Revision) datastore.TupleQuery { @@ -236,12 +227,12 @@ func (mds *memdbDatastore) SyncRevision(ctx context.Context) (datastore.Revision txn := db.Txn(false) defer txn.Abort() - lastRaw, err := txn.Last(tableChangelog, indexID) + lastRaw, err := txn.Last(tableTransaction, indexID) if err != nil { return datastore.NoRevision, fmt.Errorf(errRevision, err) } if lastRaw != nil { - return revisionFromVersion(lastRaw.(*tupleChangelog).id), nil + return revisionFromVersion(lastRaw.(*transaction).id), nil } return datastore.NoRevision, nil } @@ -258,14 +249,14 @@ func (mds *memdbDatastore) Revision(ctx context.Context) (datastore.Revision, er lowerBound := uint64(time.Now().Add(-1 * mds.revisionFuzzingTimedelta).UnixNano()) time.Sleep(mds.simulatedLatency) - iter, err := txn.LowerBound(tableChangelog, indexTimestamp, lowerBound) + iter, err := txn.LowerBound(tableTransaction, indexTimestamp, lowerBound) if err != nil { return datastore.NoRevision, fmt.Errorf(errRevision, err) } var candidates []datastore.Revision for oneChange := iter.Next(); oneChange != nil; oneChange = iter.Next() { - candidates = append(candidates, revisionFromVersion(oneChange.(*tupleChangelog).id)) + candidates = append(candidates, revisionFromVersion(oneChange.(*transaction).id)) } if len(candidates) > 0 { @@ -285,7 +276,7 @@ func (mds *memdbDatastore) CheckRevision(ctx context.Context, revision datastore // We need to know the highest possible revision time.Sleep(mds.simulatedLatency) - lastRaw, err := txn.Last(tableChangelog, indexID) + lastRaw, err := txn.Last(tableTransaction, indexID) if err != nil { return fmt.Errorf(errCheckRevision, err) } @@ -293,7 +284,7 @@ func (mds *memdbDatastore) CheckRevision(ctx context.Context, revision datastore return datastore.NewInvalidRevisionErr(revision, datastore.CouldNotDetermineRevision) } - highest := revisionFromVersion(lastRaw.(*tupleChangelog).id) + highest := revisionFromVersion(lastRaw.(*transaction).id) if revision.GreaterThan(highest) { return datastore.NewInvalidRevisionErr(revision, datastore.RevisionInFuture) @@ -301,7 +292,7 @@ func (mds *memdbDatastore) CheckRevision(ctx context.Context, revision datastore lowerBound := uint64(time.Now().Add(mds.gcWindowInverted).UnixNano()) time.Sleep(mds.simulatedLatency) - iter, err := txn.LowerBound(tableChangelog, indexTimestamp, lowerBound) + iter, err := txn.LowerBound(tableTransaction, indexTimestamp, lowerBound) if err != nil { return fmt.Errorf(errCheckRevision, err) } @@ -311,7 +302,7 @@ func (mds *memdbDatastore) CheckRevision(ctx context.Context, revision datastore return datastore.NewInvalidRevisionErr(revision, datastore.RevisionStale) } - if firstValid != nil && revision.LessThan(revisionFromVersion(firstValid.(*tupleChangelog).id)) { + if firstValid != nil && revision.LessThan(revisionFromVersion(firstValid.(*transaction).id)) { return datastore.NewInvalidRevisionErr(revision, datastore.RevisionStale) } @@ -320,7 +311,7 @@ func (mds *memdbDatastore) CheckRevision(ctx context.Context, revision datastore func relationshipFilterFilterFunc(filter *v1.RelationshipFilter) func(interface{}) bool { return func(tupleRaw interface{}) bool { - tuple := tupleRaw.(*tupleEntry) + tuple := tupleRaw.(*relationship) // If it's already dead, filter it. if tuple.deletedTxn != deletedTransactionID { @@ -354,9 +345,9 @@ func relationshipFilterFilterFunc(filter *v1.RelationshipFilter) func(interface{ } } -func findRelationship(txn *memdb.Txn, toFind *v1.Relationship) (*tupleEntry, error) { +func findRelationship(txn *memdb.Txn, toFind *v1.Relationship) (*relationship, error) { foundRaw, err := txn.First( - tableTuple, + tableRelationship, indexLive, toFind.Resource.ObjectType, toFind.Resource.ObjectId, @@ -374,18 +365,5 @@ func findRelationship(txn *memdb.Txn, toFind *v1.Relationship) (*tupleEntry, err return nil, nil } - return foundRaw.(*tupleEntry), nil -} - -func nextTupleChangelogID(txn *memdb.Txn) (uint64, error) { - lastChangeRaw, err := txn.Last(tableChangelog, indexID) - if err != nil { - return 0, err - } - - if lastChangeRaw == nil { - return 1, nil - } - - return lastChangeRaw.(*tupleChangelog).id + 1, nil + return foundRaw.(*relationship), nil } diff --git a/internal/datastore/memdb/watch.go b/internal/datastore/memdb/watch.go index 5453ac5804..9e62e7c586 100644 --- a/internal/datastore/memdb/watch.go +++ b/internal/datastore/memdb/watch.go @@ -4,9 +4,11 @@ import ( "context" "fmt" + v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" "github.com/hashicorp/go-memdb" "github.com/authzed/spicedb/internal/datastore" + "github.com/authzed/spicedb/internal/datastore/common" ) const errWatchError = "watch error: %w" @@ -25,7 +27,7 @@ func (mds *memdbDatastore) Watch(ctx context.Context, afterRevision datastore.Re var stagedUpdates []*datastore.RevisionChanges var watchChan <-chan struct{} var err error - stagedUpdates, currentTxn, watchChan, err = mds.loadChanges(currentTxn) + stagedUpdates, currentTxn, watchChan, err = mds.loadChanges(ctx, currentTxn) if err != nil { errors <- err return @@ -61,29 +63,41 @@ func (mds *memdbDatastore) Watch(ctx context.Context, afterRevision datastore.Re return updates, errors } -func (mds *memdbDatastore) loadChanges(currentTxn uint64) ([]*datastore.RevisionChanges, uint64, <-chan struct{}, error) { +func (mds *memdbDatastore) loadChanges(ctx context.Context, currentTxn uint64) ([]*datastore.RevisionChanges, uint64, <-chan struct{}, error) { loadNewTxn := mds.db.Txn(false) defer loadNewTxn.Abort() - it, err := loadNewTxn.LowerBound(tableChangelog, indexID, currentTxn+1) + it, err := loadNewTxn.LowerBound(tableTransaction, indexID, currentTxn+1) if err != nil { return nil, 0, nil, fmt.Errorf(errWatchError, err) } - var stagedUpdates []*datastore.RevisionChanges + stagedChanges := make(common.Changes) for newChangeRaw := it.Next(); newChangeRaw != nil; newChangeRaw = it.Next() { - newChange := newChangeRaw.(*tupleChangelog) - stagedUpdates = append(stagedUpdates, &datastore.RevisionChanges{ - Revision: revisionFromVersion(newChange.id), - Changes: newChange.changes, - }) - currentTxn = newChange.id + currentTxn = newChangeRaw.(*transaction).id + createdIt, err := loadNewTxn.Get(tableRelationship, indexCreatedTxn, currentTxn) + if err != nil { + return nil, 0, nil, fmt.Errorf(errWatchError, err) + } + for rawCreated := createdIt.Next(); rawCreated != nil; rawCreated = createdIt.Next() { + created := rawCreated.(*relationship) + stagedChanges.AddChange(ctx, currentTxn, created.RelationTuple(), v0.RelationTupleUpdate_TOUCH) + } + + deletedIt, err := loadNewTxn.Get(tableRelationship, indexDeletedTxn, currentTxn) + if err != nil { + return nil, 0, nil, fmt.Errorf(errWatchError, err) + } + for rawDeleted := deletedIt.Next(); rawDeleted != nil; rawDeleted = deletedIt.Next() { + deleted := rawDeleted.(*relationship) + stagedChanges.AddChange(ctx, currentTxn, deleted.RelationTuple(), v0.RelationTupleUpdate_DELETE) + } } - watchChan, _, err := loadNewTxn.LastWatch(tableChangelog, indexID) + watchChan, _, err := loadNewTxn.LastWatch(tableTransaction, indexID) if err != nil { return nil, 0, nil, fmt.Errorf(errWatchError, err) } - return stagedUpdates, currentTxn, watchChan, nil + return stagedChanges.RevisionChanges(), currentTxn, watchChan, nil } From 9673916c24e3e4774c228faa32fcff518c6e9d26 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Mon, 6 Dec 2021 14:45:29 -0500 Subject: [PATCH 5/5] use canonical terms for memdb datastore --- internal/datastore/memdb/memdb.go | 136 +++++++++++----------- internal/datastore/memdb/query.go | 36 +++--- internal/datastore/memdb/reverse_query.go | 8 +- internal/datastore/memdb/tuple.go | 8 +- internal/datastore/memdb/watch.go | 2 +- 5 files changed, 95 insertions(+), 95 deletions(-) diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index 6a032c4e81..d94c8c8bd2 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -25,20 +25,20 @@ const ( tableTransaction = "transaction" tableNamespace = "namespaceConfig" - indexID = "id" - indexTimestamp = "timestamp" - indexLive = "live" - indexNamespace = "namespace" - indexNamespaceAndObjectID = "namespaceAndObjectID" - indexNamespaceAndRelation = "namespaceAndRelation" - indexNamespaceAndUsersetID = "namespaceAndUsersetID" - indexRelationAndUserset = "relationAndUserset" - indexRelationAndRelation = "relationAndRelation" - indexUsersetNamespace = "usersetNamespace" - indexUsersetRelation = "usersetRelation" - indexUserset = "userset" - indexCreatedTxn = "createdTxn" - indexDeletedTxn = "deletedTxn" + indexID = "id" + indexTimestamp = "timestamp" + indexLive = "live" + indexNamespace = "namespace" + indexNamespaceAndResourceID = "namespaceAndResourceID" + indexNamespaceAndRelation = "namespaceAndRelation" + indexNamespaceAndSubjectID = "namespaceAndSubjectID" + indexRelationAndSubject = "relationAndSubject" + indexRelationAndRelation = "relationAndRelation" + indexSubjectNamespace = "subjectNamespace" + indexSubjectRelation = "subjectRelation" + indexSubject = "subject" + indexCreatedTxn = "createdTxn" + indexDeletedTxn = "deletedTxn" defaultWatchBufferLength = 128 @@ -61,11 +61,11 @@ type transaction struct { type relationship struct { namespace string - objectID string + resourceID string relation string - usersetNamespace string - usersetObjectID string - usersetRelation string + subjectNamespace string + subjectObjectID string + subjectRelation string createdTxn uint64 deletedTxn uint64 } @@ -73,11 +73,11 @@ type relationship struct { func tupleEntryFromRelationship(r *v1.Relationship, created, deleted uint64) *relationship { return &relationship{ namespace: r.Resource.ObjectType, - objectID: r.Resource.ObjectId, + resourceID: r.Resource.ObjectId, relation: r.Relation, - usersetNamespace: r.Subject.Object.ObjectType, - usersetObjectID: r.Subject.Object.ObjectId, - usersetRelation: stringz.DefaultEmpty(r.Subject.OptionalRelation, "..."), + subjectNamespace: r.Subject.Object.ObjectType, + subjectObjectID: r.Subject.Object.ObjectId, + subjectRelation: stringz.DefaultEmpty(r.Subject.OptionalRelation, "..."), createdTxn: created, deletedTxn: deleted, } @@ -87,15 +87,15 @@ func (t relationship) Relationship() *v1.Relationship { return &v1.Relationship{ Resource: &v1.ObjectReference{ ObjectType: t.namespace, - ObjectId: t.objectID, + ObjectId: t.resourceID, }, Relation: t.relation, Subject: &v1.SubjectReference{ Object: &v1.ObjectReference{ - ObjectType: t.usersetNamespace, - ObjectId: t.usersetObjectID, + ObjectType: t.subjectNamespace, + ObjectId: t.subjectObjectID, }, - OptionalRelation: stringz.Default(t.usersetRelation, "", datastore.Ellipsis), + OptionalRelation: stringz.Default(t.subjectRelation, "", datastore.Ellipsis), }, } } @@ -104,13 +104,13 @@ func (t relationship) RelationTuple() *v0.RelationTuple { return &v0.RelationTuple{ ObjectAndRelation: &v0.ObjectAndRelation{ Namespace: t.namespace, - ObjectId: t.objectID, + ObjectId: t.resourceID, Relation: t.relation, }, User: &v0.User{UserOneof: &v0.User_Userset{Userset: &v0.ObjectAndRelation{ - Namespace: t.usersetNamespace, - ObjectId: t.usersetObjectID, - Relation: t.usersetRelation, + Namespace: t.subjectNamespace, + ObjectId: t.subjectObjectID, + Relation: t.subjectRelation, }}}, } } @@ -119,11 +119,11 @@ func (t relationship) String() string { return fmt.Sprintf( "%s:%s#%s@%s:%s#%s[%d-%d)", t.namespace, - t.objectID, + t.resourceID, t.relation, - t.usersetNamespace, - t.usersetObjectID, - t.usersetRelation, + t.subjectNamespace, + t.subjectObjectID, + t.subjectRelation, t.createdTxn, t.deletedTxn, ) @@ -185,11 +185,11 @@ var schema = &memdb.DBSchema{ Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ &memdb.StringFieldIndex{Field: "namespace"}, - &memdb.StringFieldIndex{Field: "objectID"}, + &memdb.StringFieldIndex{Field: "resourceID"}, &memdb.StringFieldIndex{Field: "relation"}, - &memdb.StringFieldIndex{Field: "usersetNamespace"}, - &memdb.StringFieldIndex{Field: "usersetObjectID"}, - &memdb.StringFieldIndex{Field: "usersetRelation"}, + &memdb.StringFieldIndex{Field: "subjectNamespace"}, + &memdb.StringFieldIndex{Field: "subjectObjectID"}, + &memdb.StringFieldIndex{Field: "subjectRelation"}, &memdb.UintFieldIndex{Field: "createdTxn"}, }, }, @@ -200,11 +200,11 @@ var schema = &memdb.DBSchema{ Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ &memdb.StringFieldIndex{Field: "namespace"}, - &memdb.StringFieldIndex{Field: "objectID"}, + &memdb.StringFieldIndex{Field: "resourceID"}, &memdb.StringFieldIndex{Field: "relation"}, - &memdb.StringFieldIndex{Field: "usersetNamespace"}, - &memdb.StringFieldIndex{Field: "usersetObjectID"}, - &memdb.StringFieldIndex{Field: "usersetRelation"}, + &memdb.StringFieldIndex{Field: "subjectNamespace"}, + &memdb.StringFieldIndex{Field: "subjectObjectID"}, + &memdb.StringFieldIndex{Field: "subjectRelation"}, &memdb.UintFieldIndex{Field: "deletedTxn"}, }, }, @@ -214,13 +214,13 @@ var schema = &memdb.DBSchema{ Unique: false, Indexer: &memdb.StringFieldIndex{Field: "namespace"}, }, - indexNamespaceAndObjectID: { - Name: indexNamespaceAndObjectID, + indexNamespaceAndResourceID: { + Name: indexNamespaceAndResourceID, Unique: false, Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ &memdb.StringFieldIndex{Field: "namespace"}, - &memdb.StringFieldIndex{Field: "objectID"}, + &memdb.StringFieldIndex{Field: "resourceID"}, }, }, }, @@ -234,57 +234,57 @@ var schema = &memdb.DBSchema{ }, }, }, - indexNamespaceAndUsersetID: { - Name: indexNamespaceAndUsersetID, + indexNamespaceAndSubjectID: { + Name: indexNamespaceAndSubjectID, Unique: false, Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ &memdb.StringFieldIndex{Field: "namespace"}, - &memdb.StringFieldIndex{Field: "usersetNamespace"}, - &memdb.StringFieldIndex{Field: "usersetObjectID"}, + &memdb.StringFieldIndex{Field: "subjectNamespace"}, + &memdb.StringFieldIndex{Field: "subjectObjectID"}, }, }, }, - indexRelationAndUserset: { - Name: indexRelationAndUserset, + indexRelationAndSubject: { + Name: indexRelationAndSubject, Unique: false, Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{Field: "usersetNamespace"}, - &memdb.StringFieldIndex{Field: "usersetObjectID"}, - &memdb.StringFieldIndex{Field: "usersetRelation"}, + &memdb.StringFieldIndex{Field: "subjectNamespace"}, + &memdb.StringFieldIndex{Field: "subjectObjectID"}, + &memdb.StringFieldIndex{Field: "subjectRelation"}, &memdb.StringFieldIndex{Field: "namespace"}, &memdb.StringFieldIndex{Field: "relation"}, }, }, }, - indexUsersetRelation: { - Name: indexUsersetRelation, + indexSubjectRelation: { + Name: indexSubjectRelation, Unique: false, Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{Field: "usersetNamespace"}, - &memdb.StringFieldIndex{Field: "usersetRelation"}, + &memdb.StringFieldIndex{Field: "subjectNamespace"}, + &memdb.StringFieldIndex{Field: "subjectRelation"}, }, }, }, - indexUserset: { - Name: indexUserset, + indexSubject: { + Name: indexSubject, Unique: false, Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{Field: "usersetNamespace"}, - &memdb.StringFieldIndex{Field: "usersetObjectID"}, - &memdb.StringFieldIndex{Field: "usersetRelation"}, + &memdb.StringFieldIndex{Field: "subjectNamespace"}, + &memdb.StringFieldIndex{Field: "subjectObjectID"}, + &memdb.StringFieldIndex{Field: "subjectRelation"}, }, }, }, - indexUsersetNamespace: { - Name: indexUsersetNamespace, + indexSubjectNamespace: { + Name: indexSubjectNamespace, Unique: false, Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{Field: "usersetNamespace"}, + &memdb.StringFieldIndex{Field: "subjectNamespace"}, }, }, }, @@ -293,8 +293,8 @@ var schema = &memdb.DBSchema{ Unique: false, Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{Field: "usersetNamespace"}, - &memdb.StringFieldIndex{Field: "usersetRelation"}, + &memdb.StringFieldIndex{Field: "subjectNamespace"}, + &memdb.StringFieldIndex{Field: "subjectRelation"}, &memdb.StringFieldIndex{Field: "namespace"}, &memdb.StringFieldIndex{Field: "relation"}, }, diff --git a/internal/datastore/memdb/query.go b/internal/datastore/memdb/query.go index 76b46c59d7..ca3768f906 100644 --- a/internal/datastore/memdb/query.go +++ b/internal/datastore/memdb/query.go @@ -20,7 +20,7 @@ type memdbTupleQuery struct { resourceFilter *v1.RelationshipFilter optionalSubjectFilter *v1.SubjectFilter - usersetsFilter []*v0.ObjectAndRelation + subjectsFilter []*v0.ObjectAndRelation limit *uint64 simulatedLatency time.Duration @@ -40,7 +40,7 @@ func (mtq memdbTupleQuery) WithSubjectFilter(filter *v1.SubjectFilter) datastore panic("cannot call WithSubjectFilter after WithUsersets") } - if mtq.usersetsFilter != nil { + if mtq.subjectsFilter != nil { panic("called WithSubjectFilter twice") } @@ -48,20 +48,20 @@ func (mtq memdbTupleQuery) WithSubjectFilter(filter *v1.SubjectFilter) datastore return mtq } -func (mtq memdbTupleQuery) WithUsersets(usersets []*v0.ObjectAndRelation) datastore.TupleQuery { +func (mtq memdbTupleQuery) WithUsersets(subjects []*v0.ObjectAndRelation) datastore.TupleQuery { if mtq.optionalSubjectFilter != nil { panic("cannot call WithUsersets after WithSubjectFilter") } - if mtq.usersetsFilter != nil { + if mtq.subjectsFilter != nil { panic("called WithUsersets twice") } - if len(usersets) == 0 { - panic("Given nil or empty usersets") + if len(subjects) == 0 { + panic("Given nil or empty subjects") } - mtq.usersetsFilter = usersets + mtq.subjectsFilter = subjects return mtq } @@ -70,14 +70,14 @@ func iteratorForFilter(txn *memdb.Txn, filter *v1.RelationshipFilter) (memdb.Res case filter.OptionalResourceId != "": return txn.Get( tableRelationship, - indexNamespaceAndObjectID, + indexNamespaceAndResourceID, filter.ResourceType, filter.OptionalResourceId, ) case filter.OptionalSubjectFilter != nil && filter.OptionalSubjectFilter.OptionalSubjectId != "": return txn.Get( tableRelationship, - indexNamespaceAndUsersetID, + indexNamespaceAndSubjectID, filter.ResourceType, filter.OptionalSubjectFilter.SubjectType, filter.OptionalSubjectFilter.OptionalSubjectId, @@ -122,7 +122,7 @@ func (mtq memdbTupleQuery) Execute(ctx context.Context) (datastore.TupleIterator filter := relationshipFilter switch { - case filter.OptionalResourceId != "" && filter.OptionalResourceId != tuple.objectID: + case filter.OptionalResourceId != "" && filter.OptionalResourceId != tuple.resourceID: return true case filter.OptionalRelation != "" && filter.OptionalRelation != tuple.relation: return true @@ -130,21 +130,21 @@ func (mtq memdbTupleQuery) Execute(ctx context.Context) (datastore.TupleIterator if subjectFilter := filter.OptionalSubjectFilter; subjectFilter != nil { switch { - case subjectFilter.SubjectType != tuple.usersetNamespace: + case subjectFilter.SubjectType != tuple.subjectNamespace: return true - case subjectFilter.OptionalSubjectId != "" && subjectFilter.OptionalSubjectId != tuple.usersetObjectID: + case subjectFilter.OptionalSubjectId != "" && subjectFilter.OptionalSubjectId != tuple.subjectObjectID: return true - case subjectFilter.OptionalRelation != nil && stringz.DefaultEmpty(subjectFilter.OptionalRelation.Relation, datastore.Ellipsis) != tuple.usersetRelation: + case subjectFilter.OptionalRelation != nil && stringz.DefaultEmpty(subjectFilter.OptionalRelation.Relation, datastore.Ellipsis) != tuple.subjectRelation: return true } } - if len(mtq.usersetsFilter) > 0 { + if len(mtq.subjectsFilter) > 0 { found := false - for _, filter := range mtq.usersetsFilter { - if filter.Namespace == tuple.usersetNamespace && - filter.ObjectId == tuple.usersetObjectID && - filter.Relation == tuple.usersetRelation { + for _, filter := range mtq.subjectsFilter { + if filter.Namespace == tuple.subjectNamespace && + filter.ObjectId == tuple.subjectObjectID && + filter.Relation == tuple.subjectRelation { found = true break } diff --git a/internal/datastore/memdb/reverse_query.go b/internal/datastore/memdb/reverse_query.go index 5de45f6000..0f40b28059 100644 --- a/internal/datastore/memdb/reverse_query.go +++ b/internal/datastore/memdb/reverse_query.go @@ -54,7 +54,7 @@ func (mtq memdbReverseTupleQuery) Execute(ctx context.Context) (datastore.TupleI if mtq.subObjectID != "" { bestIterator, err = txn.Get( tableRelationship, - indexRelationAndUserset, + indexRelationAndSubject, mtq.subNamespaceName, mtq.subObjectID, mtq.subRelationName, @@ -74,7 +74,7 @@ func (mtq memdbReverseTupleQuery) Execute(ctx context.Context) (datastore.TupleI } else if mtq.subObjectID != "" { bestIterator, err = txn.Get( tableRelationship, - indexUserset, + indexSubject, mtq.subNamespaceName, mtq.subObjectID, mtq.subRelationName, @@ -82,14 +82,14 @@ func (mtq memdbReverseTupleQuery) Execute(ctx context.Context) (datastore.TupleI } else if mtq.subRelationName != "" { bestIterator, err = txn.Get( tableRelationship, - indexUsersetRelation, + indexSubjectRelation, mtq.subNamespaceName, mtq.subRelationName, ) } else { bestIterator, err = txn.Get( tableRelationship, - indexUsersetNamespace, + indexSubjectNamespace, mtq.subNamespaceName, ) } diff --git a/internal/datastore/memdb/tuple.go b/internal/datastore/memdb/tuple.go index 5d9877e7c6..88c0cb2aff 100644 --- a/internal/datastore/memdb/tuple.go +++ b/internal/datastore/memdb/tuple.go @@ -322,7 +322,7 @@ func relationshipFilterFilterFunc(filter *v1.RelationshipFilter) func(interface{ switch { case filter.ResourceType != tuple.namespace: return true - case filter.OptionalResourceId != "" && filter.OptionalResourceId != tuple.objectID: + case filter.OptionalResourceId != "" && filter.OptionalResourceId != tuple.resourceID: return true case filter.OptionalRelation != "" && filter.OptionalRelation != tuple.relation: return true @@ -331,12 +331,12 @@ func relationshipFilterFilterFunc(filter *v1.RelationshipFilter) func(interface{ // If it doesn't match one of the subject filters, filter it. if subjectFilter := filter.OptionalSubjectFilter; subjectFilter != nil { switch { - case subjectFilter.SubjectType != tuple.usersetNamespace: + case subjectFilter.SubjectType != tuple.subjectNamespace: return true - case subjectFilter.OptionalSubjectId != "" && subjectFilter.OptionalSubjectId != tuple.usersetObjectID: + case subjectFilter.OptionalSubjectId != "" && subjectFilter.OptionalSubjectId != tuple.subjectObjectID: return true case subjectFilter.OptionalRelation != nil && - stringz.DefaultEmpty(subjectFilter.OptionalRelation.Relation, datastore.Ellipsis) != tuple.usersetRelation: + stringz.DefaultEmpty(subjectFilter.OptionalRelation.Relation, datastore.Ellipsis) != tuple.subjectRelation: return true } } diff --git a/internal/datastore/memdb/watch.go b/internal/datastore/memdb/watch.go index 9e62e7c586..e34e983254 100644 --- a/internal/datastore/memdb/watch.go +++ b/internal/datastore/memdb/watch.go @@ -99,5 +99,5 @@ func (mds *memdbDatastore) loadChanges(ctx context.Context, currentTxn uint64) ( return nil, 0, nil, fmt.Errorf(errWatchError, err) } - return stagedChanges.RevisionChanges(), currentTxn, watchChan, nil + return stagedChanges.AsRevisionChanges(), currentTxn, watchChan, nil }