diff --git a/internal/dashboard/dashboard.go b/internal/dashboard/dashboard.go index b003e794bd..58c7eed725 100644 --- a/internal/dashboard/dashboard.go +++ b/internal/dashboard/dashboard.go @@ -131,7 +131,14 @@ func NewHandler(grpcAddr string, grpcTLSEnabled bool, datastoreEngine string, ds userFound := false resourceFound := false - nsDefs, err := ds.ListNamespaces(r.Context()) + syncRevision, err := ds.SyncRevision(r.Context()) + if err != nil { + log.Ctx(r.Context()).Error().Err(err).Msg("Got error when computing datastore revision") + fmt.Fprintf(w, "Internal Error") + return + } + + nsDefs, err := ds.ListNamespaces(r.Context(), syncRevision) if err != nil { log.Ctx(r.Context()).Error().AnErr("datastore-error", err).Msg("Got error when trying to load namespaces") fmt.Fprintf(w, "Internal Error") diff --git a/internal/datastore/crdb/namespace.go b/internal/datastore/crdb/namespace.go index a7d6f6aab6..d9add775bc 100644 --- a/internal/datastore/crdb/namespace.go +++ b/internal/datastore/crdb/namespace.go @@ -64,7 +64,11 @@ func (cds *crdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Name return hlcNow, nil } -func (cds *crdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) { +func (cds *crdbDatastore) ReadNamespace( + ctx context.Context, + nsName string, + revision datastore.Revision, +) (*v0.NamespaceDefinition, datastore.Revision, error) { ctx = datastore.SeparateContextWithTracing(ctx) tx, err := cds.conn.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}) @@ -73,6 +77,10 @@ func (cds *crdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0 } defer tx.Rollback(ctx) + if err := cds.prepareTransaction(ctx, tx, revision); err != nil { + return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err) + } + config, timestamp, err := loadNamespace(ctx, tx, nsName) if err != nil { if errors.As(err, &datastore.ErrNamespaceNotFound{}) { @@ -87,10 +95,10 @@ func (cds *crdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0 func (cds *crdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (datastore.Revision, error) { ctx = datastore.SeparateContextWithTracing(ctx) - var timestamp time.Time + var hlcNow decimal.Decimal if err := cds.execute(ctx, cds.conn, pgx.TxOptions{}, func(tx pgx.Tx) error { var err error - _, timestamp, err = loadNamespace(ctx, tx, nsName) + _, timestamp, err := loadNamespace(ctx, tx, nsName) if err != nil { return err } @@ -111,14 +119,14 @@ func (cds *crdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (d } deleteTupleSQL, deleteTupleArgs, err := queryDeleteTuples. + Suffix(queryReturningTimestamp). Where(sq.Eq{colNamespace: nsName}). ToSql() if err != nil { return err } - _, err = tx.Exec(ctx, deleteTupleSQL, deleteTupleArgs...) - return err + return tx.QueryRow(ctx, deleteTupleSQL, deleteTupleArgs...).Scan(&hlcNow) }); err != nil { if errors.As(err, &datastore.ErrNamespaceNotFound{}) { return datastore.NoRevision, err @@ -126,7 +134,7 @@ func (cds *crdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (d return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err) } - return revisionFromTimestamp(timestamp), nil + return hlcNow, nil } func loadNamespace(ctx context.Context, tx pgx.Tx, nsName string) (*v0.NamespaceDefinition, time.Time, error) { @@ -155,7 +163,7 @@ func loadNamespace(ctx context.Context, tx pgx.Tx, nsName string) (*v0.Namespace return loaded, timestamp, nil } -func (cds *crdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) { +func (cds *crdbDatastore) ListNamespaces(ctx context.Context, revision datastore.Revision) ([]*v0.NamespaceDefinition, error) { ctx = datastore.SeparateContextWithTracing(ctx) tx, err := cds.conn.Begin(ctx) @@ -164,6 +172,10 @@ func (cds *crdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDe } defer tx.Rollback(ctx) + if err := cds.prepareTransaction(ctx, tx, revision); err != nil { + return nil, fmt.Errorf(errUnableToListNamespaces, err) + } + query := queryReadNamespace sql, args, err := query.ToSql() diff --git a/internal/datastore/datastore.go b/internal/datastore/datastore.go index e35cd747b5..f0d1801fb1 100644 --- a/internal/datastore/datastore.go +++ b/internal/datastore/datastore.go @@ -47,15 +47,15 @@ type Datastore interface { // returning the version of the namespace that was created. WriteNamespace(ctx context.Context, newConfig *v0.NamespaceDefinition) (Revision, error) - // ReadNamespace reads a namespace definition and version and returns it if - // found. - ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, Revision, error) + // ReadNamespace reads a namespace definition and version and returns it, and the revision at + // which it was created or last written, if found. + ReadNamespace(ctx context.Context, nsName string, revision Revision) (ns *v0.NamespaceDefinition, lastWritten Revision, err error) // DeleteNamespace deletes a namespace and any associated tuples. DeleteNamespace(ctx context.Context, nsName string) (Revision, error) // ListNamespaces lists all namespaces defined. - ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) + ListNamespaces(ctx context.Context, revision Revision) ([]*v0.NamespaceDefinition, error) // IsReady returns whether the datastore is ready to accept data. Datastores that require // database schema creation will return false until the migrations have been run to create diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index d94c8c8bd2..b109a56601 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -26,6 +26,7 @@ const ( tableNamespace = "namespaceConfig" indexID = "id" + indexUnique = "unique" indexTimestamp = "timestamp" indexLive = "live" indexNamespace = "namespace" @@ -47,6 +48,11 @@ const ( errUnableToInstantiateTuplestore = "unable to instantiate datastore: %w" ) +type hasLifetime interface { + getCreatedTxn() uint64 + getDeletedTxn() uint64 +} + type namespace struct { name string configBytes []byte @@ -54,6 +60,16 @@ type namespace struct { deletedTxn uint64 } +func (ns namespace) getCreatedTxn() uint64 { + return ns.createdTxn +} + +func (ns namespace) getDeletedTxn() uint64 { + return ns.deletedTxn +} + +var _ hasLifetime = &namespace{} + type transaction struct { id uint64 timestamp uint64 @@ -70,6 +86,16 @@ type relationship struct { deletedTxn uint64 } +func (r relationship) getCreatedTxn() uint64 { + return r.createdTxn +} + +func (r relationship) getDeletedTxn() uint64 { + return r.deletedTxn +} + +var _ hasLifetime = &relationship{} + func tupleEntryFromRelationship(r *v1.Relationship, created, deleted uint64) *relationship { return &relationship{ namespace: r.Resource.ObjectType, @@ -83,49 +109,49 @@ func tupleEntryFromRelationship(r *v1.Relationship, created, deleted uint64) *re } } -func (t relationship) Relationship() *v1.Relationship { +func (r relationship) Relationship() *v1.Relationship { return &v1.Relationship{ Resource: &v1.ObjectReference{ - ObjectType: t.namespace, - ObjectId: t.resourceID, + ObjectType: r.namespace, + ObjectId: r.resourceID, }, - Relation: t.relation, + Relation: r.relation, Subject: &v1.SubjectReference{ Object: &v1.ObjectReference{ - ObjectType: t.subjectNamespace, - ObjectId: t.subjectObjectID, + ObjectType: r.subjectNamespace, + ObjectId: r.subjectObjectID, }, - OptionalRelation: stringz.Default(t.subjectRelation, "", datastore.Ellipsis), + OptionalRelation: stringz.Default(r.subjectRelation, "", datastore.Ellipsis), }, } } -func (t relationship) RelationTuple() *v0.RelationTuple { +func (r relationship) RelationTuple() *v0.RelationTuple { return &v0.RelationTuple{ ObjectAndRelation: &v0.ObjectAndRelation{ - Namespace: t.namespace, - ObjectId: t.resourceID, - Relation: t.relation, + Namespace: r.namespace, + ObjectId: r.resourceID, + Relation: r.relation, }, User: &v0.User{UserOneof: &v0.User_Userset{Userset: &v0.ObjectAndRelation{ - Namespace: t.subjectNamespace, - ObjectId: t.subjectObjectID, - Relation: t.subjectRelation, + Namespace: r.subjectNamespace, + ObjectId: r.subjectObjectID, + Relation: r.subjectRelation, }}}, } } -func (t relationship) String() string { +func (r relationship) String() string { return fmt.Sprintf( "%s:%s#%s@%s:%s#%s[%d-%d)", - t.namespace, - t.resourceID, - t.relation, - t.subjectNamespace, - t.subjectObjectID, - t.subjectRelation, - t.createdTxn, - t.deletedTxn, + r.namespace, + r.resourceID, + r.relation, + r.subjectNamespace, + r.subjectObjectID, + r.subjectRelation, + r.createdTxn, + r.deletedTxn, ) } @@ -137,6 +163,15 @@ var schema = &memdb.DBSchema{ indexID: { Name: indexID, Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "name"}, + }, + }, + }, + indexUnique: { + Name: indexUnique, + Unique: true, Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ &memdb.StringFieldIndex{Field: "name"}, @@ -406,3 +441,12 @@ func createNewTransaction(txn *memdb.Txn) (uint64, error) { return newTransactionID, nil } + +// filterToLiveObjects creates a memdb.FilterFunc which returns true for the items to remove, +// which is opposite of most filter implementations. +func filterToLiveObjects(revision datastore.Revision) memdb.FilterFunc { + return func(hasLifetimeRaw interface{}) bool { + obj := hasLifetimeRaw.(hasLifetime) + return uint64(revision.IntPart()) < obj.getCreatedTxn() || uint64(revision.IntPart()) >= obj.getDeletedTxn() + } +} diff --git a/internal/datastore/memdb/namespace.go b/internal/datastore/memdb/namespace.go index 369eb0aad6..a481d9bb8e 100644 --- a/internal/datastore/memdb/namespace.go +++ b/internal/datastore/memdb/namespace.go @@ -7,6 +7,7 @@ import ( v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + "github.com/hashicorp/go-memdb" "google.golang.org/protobuf/proto" "github.com/authzed/spicedb/internal/datastore" @@ -18,7 +19,10 @@ const ( errUnableToDeleteConfig = "unable to delete namespace config: %w" ) -func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.NamespaceDefinition) (datastore.Revision, error) { +func (mds *memdbDatastore) WriteNamespace( + ctx context.Context, + newConfig *v0.NamespaceDefinition, +) (datastore.Revision, error) { db := mds.db if db == nil { return datastore.NoRevision, fmt.Errorf("memdb closed") @@ -70,7 +74,11 @@ func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Nam } // ReadNamespace reads a namespace definition and version and returns it if found. -func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) { +func (mds *memdbDatastore) ReadNamespace( + ctx context.Context, + nsName string, + revision datastore.Revision, +) (*v0.NamespaceDefinition, datastore.Revision, error) { db := mds.db if db == nil { return nil, datastore.NoRevision, fmt.Errorf("memdb closed") @@ -80,11 +88,13 @@ func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v defer txn.Abort() time.Sleep(mds.simulatedLatency) - foundRaw, err := txn.First(tableNamespace, indexLive, nsName, deletedTransactionID) + foundIter, err := txn.Get(tableNamespace, indexID, nsName) if err != nil { return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err) } + foundFiltered := memdb.NewFilterIterator(foundIter, filterToLiveObjects(revision)) + foundRaw := foundFiltered.Next() if foundRaw == nil { return nil, datastore.NoRevision, datastore.NewNamespaceNotFoundErr(nsName) } @@ -150,7 +160,10 @@ func (mds *memdbDatastore) DeleteNamespace(ctx context.Context, nsName string) ( return revisionFromVersion(writeTxnID), nil } -func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) { +func (mds *memdbDatastore) ListNamespaces( + ctx context.Context, + revision datastore.Revision, +) ([]*v0.NamespaceDefinition, error) { db := mds.db if db == nil { return nil, fmt.Errorf("memdb closed") @@ -161,11 +174,13 @@ func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceD txn := db.Txn(false) defer txn.Abort() - it, err := txn.Get(tableNamespace, indexDeletedTxn, deletedTransactionID) + aliveAndDead, err := txn.LowerBound(tableNamespace, indexID) if err != nil { - return nsDefs, err + return nil, fmt.Errorf(errUnableToReadConfig, err) } + it := memdb.NewFilterIterator(aliveAndDead, filterToLiveObjects(revision)) + for { foundRaw := it.Next() if foundRaw == nil { diff --git a/internal/datastore/memdb/query.go b/internal/datastore/memdb/query.go index ca3768f906..c97d7c94b4 100644 --- a/internal/datastore/memdb/query.go +++ b/internal/datastore/memdb/query.go @@ -152,15 +152,14 @@ func (mtq memdbTupleQuery) Execute(ctx context.Context) (datastore.TupleIterator return !found } - if uint64(mtq.revision.IntPart()) < tuple.createdTxn || uint64(mtq.revision.IntPart()) >= tuple.deletedTxn { - return true - } return false }) + filteredAlive := memdb.NewFilterIterator(filteredIterator, filterToLiveObjects(mtq.revision)) + iter := &memdbTupleIterator{ txn: txn, - it: filteredIterator, + it: filteredAlive, limit: mtq.limit, } diff --git a/internal/datastore/memdb/reverse_query.go b/internal/datastore/memdb/reverse_query.go index 0f40b28059..dd828ef3e5 100644 --- a/internal/datastore/memdb/reverse_query.go +++ b/internal/datastore/memdb/reverse_query.go @@ -99,13 +99,7 @@ func (mtq memdbReverseTupleQuery) Execute(ctx context.Context) (datastore.TupleI return nil, fmt.Errorf(errUnableToQueryTuples, err) } - filteredIterator := memdb.NewFilterIterator(bestIterator, func(tupleRaw interface{}) bool { - tuple := tupleRaw.(*relationship) - if uint64(mtq.revision.IntPart()) < tuple.createdTxn || uint64(mtq.revision.IntPart()) >= tuple.deletedTxn { - return true - } - return false - }) + filteredIterator := memdb.NewFilterIterator(bestIterator, filterToLiveObjects(mtq.revision)) iter := &memdbTupleIterator{ txn: txn, diff --git a/internal/datastore/postgres/namespace.go b/internal/datastore/postgres/namespace.go index 512bd1d39b..7b4d52263a 100644 --- a/internal/datastore/postgres/namespace.go +++ b/internal/datastore/postgres/namespace.go @@ -30,10 +30,7 @@ var ( colCreatedTxn, ) - readNamespace = psql.Select(colConfig, colCreatedTxn). - From(tableNamespace). - Where(sq.Eq{colDeletedTxn: liveDeletedTxnID}) - + readNamespace = psql.Select(colConfig, colCreatedTxn).From(tableNamespace) deleteNamespace = psql.Update(tableNamespace).Where(sq.Eq{colDeletedTxn: liveDeletedTxnID}) deleteNamespaceTuples = psql.Update(tableTuple).Where(sq.Eq{colDeletedTxn: liveDeletedTxnID}) @@ -99,7 +96,7 @@ func (pgd *pgDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Namesp return revisionFromTransaction(newTxnID), nil } -func (pgd *pgDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) { +func (pgd *pgDatastore) ReadNamespace(ctx context.Context, nsName string, revision datastore.Revision) (*v0.NamespaceDefinition, datastore.Revision, error) { ctx, span := tracer.Start(ctx, "ReadNamespace", trace.WithAttributes( attribute.String("name", nsName), )) @@ -111,7 +108,7 @@ func (pgd *pgDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0.N } defer tx.Rollback(ctx) - loaded, version, err := loadNamespace(ctx, nsName, tx) + loaded, version, err := loadNamespace(ctx, nsName, tx, filterToLivingObjects(readNamespace, revision)) switch { case errors.As(err, &datastore.ErrNamespaceNotFound{}): return nil, datastore.NoRevision, err @@ -131,7 +128,8 @@ func (pgd *pgDatastore) DeleteNamespace(ctx context.Context, nsName string) (dat } defer tx.Rollback(ctx) - _, version, err := loadNamespace(ctx, nsName, tx) + baseQuery := readNamespace.Where(sq.Eq{colDeletedTxn: liveDeletedTxnID}) + _, createdAt, err := loadNamespace(ctx, nsName, tx, baseQuery) switch { case errors.As(err, &datastore.ErrNamespaceNotFound{}): return datastore.NoRevision, err @@ -148,7 +146,7 @@ func (pgd *pgDatastore) DeleteNamespace(ctx context.Context, nsName string) (dat delSQL, delArgs, err := deleteNamespace. Set(colDeletedTxn, newTxnID). - Where(sq.Eq{colNamespace: nsName, colCreatedTxn: version}). + Where(sq.Eq{colNamespace: nsName, colCreatedTxn: createdAt}). ToSql() if err != nil { return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err) @@ -177,16 +175,16 @@ func (pgd *pgDatastore) DeleteNamespace(ctx context.Context, nsName string) (dat return datastore.NoRevision, fmt.Errorf(errUnableToDeleteConfig, err) } - return version, nil + return revisionFromTransaction(newTxnID), nil } -func loadNamespace(ctx context.Context, namespace string, tx pgx.Tx) (*v0.NamespaceDefinition, datastore.Revision, error) { +func loadNamespace(ctx context.Context, namespace string, tx pgx.Tx, baseQuery sq.SelectBuilder) (*v0.NamespaceDefinition, datastore.Revision, error) { ctx = datastore.SeparateContextWithTracing(ctx) ctx, span := tracer.Start(ctx, "loadNamespace") defer span.End() - sql, args, err := readNamespace.Where(sq.Eq{colNamespace: namespace}).ToSql() + sql, args, err := baseQuery.Where(sq.Eq{colNamespace: namespace}).ToSql() if err != nil { return nil, datastore.NoRevision, err } @@ -210,7 +208,7 @@ func loadNamespace(ctx context.Context, namespace string, tx pgx.Tx) (*v0.Namesp return loaded, version, nil } -func (pgd *pgDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) { +func (pgd *pgDatastore) ListNamespaces(ctx context.Context, revision datastore.Revision) ([]*v0.NamespaceDefinition, error) { ctx = datastore.SeparateContextWithTracing(ctx) tx, err := pgd.dbpool.Begin(ctx) @@ -219,9 +217,7 @@ func (pgd *pgDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefi } defer tx.Rollback(ctx) - query := readNamespace - - sql, args, err := query.ToSql() + sql, args, err := filterToLivingObjects(readNamespace, revision).ToSql() if err != nil { return nil, err } diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index de2da21ed0..9b6797491e 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -552,3 +552,11 @@ func revisionFromTransaction(txID uint64) datastore.Revision { func transactionFromRevision(revision datastore.Revision) uint64 { return uint64(revision.IntPart()) } + +func filterToLivingObjects(original sq.SelectBuilder, revision datastore.Revision) sq.SelectBuilder { + return original.Where(sq.LtOrEq{colCreatedTxn: transactionFromRevision(revision)}). + Where(sq.Or{ + sq.Eq{colDeletedTxn: liveDeletedTxnID}, + sq.Gt{colDeletedTxn: revision}, + }) +} diff --git a/internal/datastore/postgres/query.go b/internal/datastore/postgres/query.go index 014b4ca1ed..97bb373345 100644 --- a/internal/datastore/postgres/query.go +++ b/internal/datastore/postgres/query.go @@ -28,13 +28,8 @@ var schema = common.SchemaInformation{ } func (pgd *pgDatastore) QueryTuples(filter datastore.TupleQueryResourceFilter, revision datastore.Revision) datastore.TupleQuery { - initialQuery := queryTuples. - Where(sq.Eq{colNamespace: filter.ResourceType}). - Where(sq.LtOrEq{colCreatedTxn: transactionFromRevision(revision)}). - Where(sq.Or{ - sq.Eq{colDeletedTxn: liveDeletedTxnID}, - sq.Gt{colDeletedTxn: revision}, - }) + initialQuery := filterToLivingObjects(queryTuples, revision). + Where(sq.Eq{colNamespace: filter.ResourceType}) tracerAttributes := []attribute.KeyValue{common.ObjNamespaceNameKey.String(filter.ResourceType)} diff --git a/internal/datastore/proxy/hedging.go b/internal/datastore/proxy/hedging.go index 64ac26cadf..17a43613c2 100644 --- a/internal/datastore/proxy/hedging.go +++ b/internal/datastore/proxy/hedging.go @@ -203,13 +203,13 @@ func (hp hedgingProxy) SyncRevision(ctx context.Context) (rev datastore.Revision return } -func (hp hedgingProxy) ReadNamespace(ctx context.Context, nsName string) (ns *v0.NamespaceDefinition, rev datastore.Revision, err error) { +func (hp hedgingProxy) ReadNamespace(ctx context.Context, nsName string, revision datastore.Revision) (ns *v0.NamespaceDefinition, createdAt datastore.Revision, err error) { var once sync.Once subreq := func(ctx context.Context, responseReady chan<- struct{}) { - delegatedNs, delegatedRev, delegatedErr := hp.delegate.ReadNamespace(ctx, nsName) + delegatedNs, delegatedRev, delegatedErr := hp.delegate.ReadNamespace(ctx, nsName, revision) once.Do(func() { ns = delegatedNs - rev = delegatedRev + createdAt = delegatedRev err = delegatedErr }) responseReady <- struct{}{} @@ -280,8 +280,8 @@ func (hp hedgingProxy) CheckRevision(ctx context.Context, revision datastore.Rev return hp.delegate.CheckRevision(ctx, revision) } -func (hp hedgingProxy) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) { - return hp.delegate.ListNamespaces(ctx) +func (hp hedgingProxy) ListNamespaces(ctx context.Context, revision datastore.Revision) ([]*v0.NamespaceDefinition, error) { + return hp.delegate.ListNamespaces(ctx, revision) } type hedgingTupleQuery struct { diff --git a/internal/datastore/proxy/hedging_test.go b/internal/datastore/proxy/hedging_test.go index 401e9d751c..676a8c0e82 100644 --- a/internal/datastore/proxy/hedging_test.go +++ b/internal/datastore/proxy/hedging_test.go @@ -42,12 +42,12 @@ func TestDatastoreRequestHedging(t *testing.T) { }{ { "ReadNamespace", - []interface{}{mock.Anything, nsKnown}, + []interface{}{mock.Anything, nsKnown, datastore.NoRevision}, []interface{}{&v0.NamespaceDefinition{}, revisionKnown, errKnown}, []interface{}{&v0.NamespaceDefinition{}, anotherRevisionKnown, errKnown}, func(t *testing.T, proxy datastore.Datastore, expectFirst bool) { require := require.New(t) - _, rev, err := proxy.ReadNamespace(context.Background(), nsKnown) + _, rev, err := proxy.ReadNamespace(context.Background(), nsKnown, datastore.NoRevision) require.ErrorIs(errKnown, err) if expectFirst { require.Equal(revisionKnown, rev) diff --git a/internal/datastore/proxy/mapping.go b/internal/datastore/proxy/mapping.go index 74c5468bfe..34f2bc719a 100644 --- a/internal/datastore/proxy/mapping.go +++ b/internal/datastore/proxy/mapping.go @@ -150,13 +150,13 @@ func (mp mappingProxy) WriteNamespace(ctx context.Context, newConfig *v0.Namespa }) } -func (mp mappingProxy) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) { +func (mp mappingProxy) ReadNamespace(ctx context.Context, nsName string, revision datastore.Revision) (*v0.NamespaceDefinition, datastore.Revision, error) { storedNamespaceName, err := mp.mapper.Encode(nsName) if err != nil { return nil, datastore.NoRevision, fmt.Errorf(errTranslation, err) } - ns, rev, err := mp.delegate.ReadNamespace(ctx, storedNamespaceName) + ns, rev, err := mp.delegate.ReadNamespace(ctx, storedNamespaceName, revision) if err != nil { return ns, rev, err } @@ -216,8 +216,8 @@ func (mp mappingProxy) CheckRevision(ctx context.Context, revision datastore.Rev return mp.delegate.CheckRevision(ctx, revision) } -func (mp mappingProxy) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) { - nsDefs, err := mp.delegate.ListNamespaces(ctx) +func (mp mappingProxy) ListNamespaces(ctx context.Context, revision datastore.Revision) ([]*v0.NamespaceDefinition, error) { + nsDefs, err := mp.delegate.ListNamespaces(ctx, revision) if err != nil { return nil, err } diff --git a/internal/datastore/proxy/readonly.go b/internal/datastore/proxy/readonly.go index 6d4e9943cb..418b89c0af 100644 --- a/internal/datastore/proxy/readonly.go +++ b/internal/datastore/proxy/readonly.go @@ -53,8 +53,8 @@ func (rd roDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Namespac return datastore.NoRevision, errReadOnly } -func (rd roDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) { - return rd.delegate.ReadNamespace(ctx, nsName) +func (rd roDatastore) ReadNamespace(ctx context.Context, nsName string, revision datastore.Revision) (*v0.NamespaceDefinition, datastore.Revision, error) { + return rd.delegate.ReadNamespace(ctx, nsName, revision) } func (rd roDatastore) DeleteNamespace(ctx context.Context, nsName string) (datastore.Revision, error) { @@ -81,6 +81,6 @@ func (rd roDatastore) CheckRevision(ctx context.Context, revision datastore.Revi return rd.delegate.CheckRevision(ctx, revision) } -func (rd roDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) { - return rd.delegate.ListNamespaces(ctx) +func (rd roDatastore) ListNamespaces(ctx context.Context, revision datastore.Revision) ([]*v0.NamespaceDefinition, error) { + return rd.delegate.ListNamespaces(ctx, revision) } diff --git a/internal/datastore/proxy/readonly_test.go b/internal/datastore/proxy/readonly_test.go index bbcec9bcb0..e4674fd392 100644 --- a/internal/datastore/proxy/readonly_test.go +++ b/internal/datastore/proxy/readonly_test.go @@ -129,9 +129,9 @@ func TestReadNamespacePassthrough(t *testing.T) { ds := NewReadonlyDatastore(delegate) ctx := context.Background() - delegate.On("ReadNamespace", "test").Return(&v0.NamespaceDefinition{}, expectedRevision, nil).Times(1) + delegate.On("ReadNamespace", "test", expectedRevision).Return(&v0.NamespaceDefinition{}, expectedRevision, nil).Times(1) - ns, revision, err := ds.ReadNamespace(ctx, "test") + ns, revision, err := ds.ReadNamespace(ctx, "test", expectedRevision) require.Equal(&v0.NamespaceDefinition{}, ns) require.Equal(expectedRevision, revision) require.NoError(err) @@ -197,9 +197,9 @@ func TestListNamespacesPassthrough(t *testing.T) { ds := NewReadonlyDatastore(delegate) ctx := context.Background() - delegate.On("ListNamespaces").Return([]*v0.NamespaceDefinition{}, nil).Times(1) + delegate.On("ListNamespaces", expectedRevision).Return([]*v0.NamespaceDefinition{}, nil).Times(1) - nsDefs, err := ds.ListNamespaces(ctx) + nsDefs, err := ds.ListNamespaces(ctx, expectedRevision) require.Equal([]*v0.NamespaceDefinition{}, nsDefs) require.NoError(err) delegate.AssertExpectations(t) @@ -237,8 +237,8 @@ func (dm *delegateMock) WriteNamespace(ctx context.Context, newConfig *v0.Namesp panic("shouldn't ever call write method on delegate") } -func (dm *delegateMock) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) { - args := dm.Called(nsName) +func (dm *delegateMock) ReadNamespace(ctx context.Context, nsName string, revision datastore.Revision) (*v0.NamespaceDefinition, datastore.Revision, error) { + args := dm.Called(nsName, revision) return args.Get(0).(*v0.NamespaceDefinition), args.Get(1).(datastore.Revision), args.Error(2) } @@ -271,8 +271,8 @@ func (dm *delegateMock) CheckRevision(ctx context.Context, revision datastore.Re return args.Error(0) } -func (dm *delegateMock) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) { - args := dm.Called() +func (dm *delegateMock) ListNamespaces(ctx context.Context, revision datastore.Revision) ([]*v0.NamespaceDefinition, error) { + args := dm.Called(revision) return args.Get(0).([]*v0.NamespaceDefinition), args.Error(1) } diff --git a/internal/datastore/test/mock.go b/internal/datastore/test/mock.go index 0ab9f3a3bb..01d3ae4c58 100644 --- a/internal/datastore/test/mock.go +++ b/internal/datastore/test/mock.go @@ -55,8 +55,8 @@ func (md *MockedDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Nam return args.Get(0).(datastore.Revision), args.Error(1) } -func (md *MockedDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) { - args := md.Called(ctx, nsName) +func (md *MockedDatastore) ReadNamespace(ctx context.Context, nsName string, revision datastore.Revision) (*v0.NamespaceDefinition, datastore.Revision, error) { + args := md.Called(ctx, nsName, revision) return args.Get(0).(*v0.NamespaceDefinition), args.Get(1).(datastore.Revision), args.Error(2) } @@ -90,8 +90,8 @@ func (md *MockedDatastore) CheckRevision(ctx context.Context, revision datastore return args.Error(0) } -func (md *MockedDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) { - args := md.Called(ctx) +func (md *MockedDatastore) ListNamespaces(ctx context.Context, revision datastore.Revision) ([]*v0.NamespaceDefinition, error) { + args := md.Called(ctx, revision) return args.Get(0).([]*v0.NamespaceDefinition), args.Error(1) } diff --git a/internal/datastore/test/namespace.go b/internal/datastore/test/namespace.go index e459bd0604..c85e00c6f2 100644 --- a/internal/datastore/test/namespace.go +++ b/internal/datastore/test/namespace.go @@ -36,37 +36,66 @@ func NamespaceWriteTest(t *testing.T, tester DatastoreTester) { ctx := context.Background() - nsDefs, err := ds.ListNamespaces(ctx) + startRevision, err := ds.SyncRevision(ctx) + require.NoError(err) + require.True(startRevision.GreaterThanOrEqual(datastore.NoRevision)) + + nsDefs, err := ds.ListNamespaces(ctx, startRevision) require.NoError(err) require.Equal(0, len(nsDefs)) - _, err = ds.WriteNamespace(ctx, testUserNS) + writtenRev, err := ds.WriteNamespace(ctx, testUserNS) require.NoError(err) + require.True(writtenRev.GreaterThan(startRevision)) - nsDefs, err = ds.ListNamespaces(ctx) + nsDefs, err = ds.ListNamespaces(ctx, writtenRev) require.NoError(err) require.Equal(1, len(nsDefs)) require.Equal(testUserNS.Name, nsDefs[0].Name) - _, err = ds.WriteNamespace(ctx, testNamespace) + secondWritten, err := ds.WriteNamespace(ctx, testNamespace) require.NoError(err) + require.True(secondWritten.GreaterThan(writtenRev)) - nsDefs, err = ds.ListNamespaces(ctx) + nsDefs, err = ds.ListNamespaces(ctx, secondWritten) require.NoError(err) require.Equal(2, len(nsDefs)) - found, _, err := ds.ReadNamespace(ctx, testNamespace.Name) + _, _, err = ds.ReadNamespace(ctx, testNamespace.Name, writtenRev) + require.Error(err) + + nsDefs, err = ds.ListNamespaces(ctx, writtenRev) + require.NoError(err) + require.Equal(1, len(nsDefs)) + + found, createdRev, err := ds.ReadNamespace(ctx, testNamespace.Name, secondWritten) require.NoError(err) + require.True(createdRev.LessThanOrEqual(secondWritten)) + require.True(createdRev.GreaterThan(startRevision)) foundDiff := cmp.Diff(testNamespace, found, protocmp.Transform()) require.Empty(foundDiff) - _, err = ds.WriteNamespace(ctx, updatedNamespace) + updatedRevision, err := ds.WriteNamespace(ctx, updatedNamespace) require.NoError(err) - checkUpdated, _, err := ds.ReadNamespace(ctx, testNamespace.Name) + checkUpdated, createdRev, err := ds.ReadNamespace(ctx, testNamespace.Name, updatedRevision) require.NoError(err) + require.True(createdRev.LessThanOrEqual(updatedRevision)) + require.True(createdRev.GreaterThan(startRevision)) foundUpdated := cmp.Diff(updatedNamespace, checkUpdated, protocmp.Transform()) require.Empty(foundUpdated) + + checkOld, createdRev, err := ds.ReadNamespace(ctx, testUserNamespace, writtenRev) + require.NoError(err) + require.True(createdRev.LessThanOrEqual(writtenRev)) + require.True(createdRev.GreaterThan(startRevision)) + require.Empty(cmp.Diff(testUserNS, checkOld, protocmp.Transform())) + + checkOldList, err := ds.ListNamespaces(ctx, writtenRev) + require.NoError(err) + require.Equal(1, len(checkOldList)) + require.Equal(testUserNS.Name, checkOldList[0].Name) + require.Empty(cmp.Diff(testUserNS, checkOldList[0], protocmp.Transform())) } // NamespaceDeleteTest tests whether or not the requirements for deleting @@ -91,17 +120,17 @@ func NamespaceDeleteTest(t *testing.T, tester DatastoreTester) { deletedRev, err := ds.DeleteNamespace(ctx, testfixtures.DocumentNS.Name) require.NoError(err) - require.True(deletedRev.GreaterThan(datastore.NoRevision)) + require.True(deletedRev.GreaterThan(revision)) - _, _, err = ds.ReadNamespace(ctx, testfixtures.DocumentNS.Name) + _, _, err = ds.ReadNamespace(ctx, testfixtures.DocumentNS.Name, deletedRev) require.True(errors.As(err, &datastore.ErrNamespaceNotFound{})) - found, ver, err := ds.ReadNamespace(ctx, testfixtures.FolderNS.Name) + found, nsCreatedRev, err := ds.ReadNamespace(ctx, testfixtures.FolderNS.Name, deletedRev) require.NotNil(found) - require.True(ver.GreaterThan(datastore.NoRevision)) + require.True(nsCreatedRev.LessThan(deletedRev)) require.NoError(err) - allNamespaces, err := ds.ListNamespaces(ctx) + allNamespaces, err := ds.ListNamespaces(ctx, deletedRev) require.NoError(err) for _, ns := range allNamespaces { require.NotEqual(testfixtures.DocumentNS.Name, ns.Name, "deleted namespace '%s' should not be in namespace list", ns.Name) diff --git a/internal/dispatch/graph/graph.go b/internal/dispatch/graph/graph.go index 006a5488be..e995bc9d80 100644 --- a/internal/dispatch/graph/graph.go +++ b/internal/dispatch/graph/graph.go @@ -6,6 +6,7 @@ import ( "fmt" v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" + "github.com/shopspring/decimal" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -59,9 +60,9 @@ type localDispatcher struct { nsm namespace.Manager } -func (ld *localDispatcher) loadRelation(ctx context.Context, nsName, relationName string) (*v0.Relation, error) { +func (ld *localDispatcher) loadRelation(ctx context.Context, nsName, relationName string, revision decimal.Decimal) (*v0.Relation, error) { // Load namespace and relation from the datastore - ns, _, err := ld.nsm.ReadNamespace(ctx, nsName) + ns, err := ld.nsm.ReadNamespace(ctx, nsName, revision) if err != nil { return nil, rewriteError(err) } @@ -110,12 +111,22 @@ func (ld *localDispatcher) DispatchCheck(ctx context.Context, req *v1.DispatchCh return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, err } - relation, err := ld.loadRelation(ctx, req.ObjectAndRelation.Namespace, req.ObjectAndRelation.Relation) + revision, err := decimal.NewFromString(req.Metadata.AtRevision) if err != nil { return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, err } - return ld.checker.Check(ctx, req, relation) + relation, err := ld.loadRelation(ctx, req.ObjectAndRelation.Namespace, req.ObjectAndRelation.Relation, revision) + if err != nil { + return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, err + } + + validatedReq := graph.ValidatedCheckRequest{ + DispatchCheckRequest: req, + Revision: revision, + } + + return ld.checker.Check(ctx, validatedReq, relation) } // DispatchExpand implements dispatch.Expand interface @@ -130,12 +141,22 @@ func (ld *localDispatcher) DispatchExpand(ctx context.Context, req *v1.DispatchE return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err } - relation, err := ld.loadRelation(ctx, req.ObjectAndRelation.Namespace, req.ObjectAndRelation.Relation) + revision, err := decimal.NewFromString(req.Metadata.AtRevision) + if err != nil { + return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err + } + + relation, err := ld.loadRelation(ctx, req.ObjectAndRelation.Namespace, req.ObjectAndRelation.Relation, revision) if err != nil { return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err } - return ld.expander.Expand(ctx, req, relation) + validatedReq := graph.ValidatedExpandRequest{ + DispatchExpandRequest: req, + Revision: revision, + } + + return ld.expander.Expand(ctx, validatedReq, relation) } // DispatchLookup implements dispatch.Lookup interface @@ -152,11 +173,21 @@ func (ld *localDispatcher) DispatchLookup(ctx context.Context, req *v1.DispatchL return &v1.DispatchLookupResponse{Metadata: emptyMetadata}, err } + revision, err := decimal.NewFromString(req.Metadata.AtRevision) + if err != nil { + return &v1.DispatchLookupResponse{Metadata: emptyMetadata}, err + } + if req.Limit <= 0 { return &v1.DispatchLookupResponse{Metadata: emptyMetadata, ResolvedOnrs: []*v0.ObjectAndRelation{}}, nil } - return ld.lookupHandler.Lookup(ctx, req) + validatedReq := graph.ValidatedLookupRequest{ + DispatchLookupRequest: req, + Revision: revision, + } + + return ld.lookupHandler.Lookup(ctx, validatedReq) } func (ld *localDispatcher) Close() error { diff --git a/internal/graph/check.go b/internal/graph/check.go index 9e284a2b30..35dcffc2e4 100644 --- a/internal/graph/check.go +++ b/internal/graph/check.go @@ -32,8 +32,15 @@ func onrEqual(lhs, rhs *v0.ObjectAndRelation) bool { return lhs.ObjectId == rhs.ObjectId && lhs.Relation == rhs.Relation && lhs.Namespace == rhs.Namespace } +// ValidatedCheckRequest represents a request after it has been validated and parsed for internal +// consumption. +type ValidatedCheckRequest struct { + *v1.DispatchCheckRequest + Revision decimal.Decimal +} + // Check performs a check request with the provided request and context -func (cc *ConcurrentChecker) Check(ctx context.Context, req *v1.DispatchCheckRequest, relation *v0.Relation) (*v1.DispatchCheckResponse, error) { +func (cc *ConcurrentChecker) Check(ctx context.Context, req ValidatedCheckRequest, relation *v0.Relation) (*v1.DispatchCheckResponse, error) { var directFunc ReduceableCheckFunc if onrEqual(req.Subject, req.ObjectAndRelation) { @@ -50,28 +57,22 @@ func (cc *ConcurrentChecker) Check(ctx context.Context, req *v1.DispatchCheckReq return resolved.Resp, resolved.Err } -func (cc *ConcurrentChecker) dispatch(req *v1.DispatchCheckRequest) ReduceableCheckFunc { +func (cc *ConcurrentChecker) dispatch(req ValidatedCheckRequest) ReduceableCheckFunc { return func(ctx context.Context, resultChan chan<- CheckResult) { log.Ctx(ctx).Trace().Object("dispatch", req).Send() - result, err := cc.d.DispatchCheck(ctx, req) + result, err := cc.d.DispatchCheck(ctx, req.DispatchCheckRequest) resultChan <- CheckResult{result, err} } } -func (cc *ConcurrentChecker) checkDirect(ctx context.Context, req *v1.DispatchCheckRequest) ReduceableCheckFunc { +func (cc *ConcurrentChecker) checkDirect(ctx context.Context, req ValidatedCheckRequest) ReduceableCheckFunc { return func(ctx context.Context, resultChan chan<- CheckResult) { - requestRevision, err := decimal.NewFromString(req.Metadata.AtRevision) - if err != nil { - resultChan <- checkResultError(NewCheckFailureErr(err), emptyMetadata) - return - } - log.Ctx(ctx).Trace().Object("direct", req).Send() it, err := cc.ds.QueryTuples(datastore.TupleQueryResourceFilter{ ResourceType: req.ObjectAndRelation.Namespace, OptionalResourceID: req.ObjectAndRelation.ObjectId, OptionalResourceRelation: req.ObjectAndRelation.Relation, - }, requestRevision).Execute(ctx) + }, req.Revision).Execute(ctx) if err != nil { resultChan <- checkResultError(NewCheckFailureErr(err), emptyMetadata) return @@ -87,11 +88,14 @@ func (cc *ConcurrentChecker) checkDirect(ctx context.Context, req *v1.DispatchCh } if tplUserset.Relation != Ellipsis { // We need to recursively call check here, potentially changing namespaces - requestsToDispatch = append(requestsToDispatch, cc.dispatch(&v1.DispatchCheckRequest{ - ObjectAndRelation: tplUserset, - Subject: req.Subject, - - Metadata: decrementDepth(req.Metadata), + requestsToDispatch = append(requestsToDispatch, cc.dispatch(ValidatedCheckRequest{ + &v1.DispatchCheckRequest{ + ObjectAndRelation: tplUserset, + Subject: req.Subject, + + Metadata: decrementDepth(req.Metadata), + }, + req.Revision, })) } } @@ -103,7 +107,7 @@ func (cc *ConcurrentChecker) checkDirect(ctx context.Context, req *v1.DispatchCh } } -func (cc *ConcurrentChecker) checkUsersetRewrite(ctx context.Context, req *v1.DispatchCheckRequest, usr *v0.UsersetRewrite) ReduceableCheckFunc { +func (cc *ConcurrentChecker) checkUsersetRewrite(ctx context.Context, req ValidatedCheckRequest, usr *v0.UsersetRewrite) ReduceableCheckFunc { switch rw := usr.RewriteOperation.(type) { case *v0.UsersetRewrite_Union: return cc.checkSetOperation(ctx, req, rw.Union, any) @@ -116,7 +120,7 @@ func (cc *ConcurrentChecker) checkUsersetRewrite(ctx context.Context, req *v1.Di } } -func (cc *ConcurrentChecker) checkSetOperation(ctx context.Context, req *v1.DispatchCheckRequest, so *v0.SetOperation, reducer Reducer) ReduceableCheckFunc { +func (cc *ConcurrentChecker) checkSetOperation(ctx context.Context, req ValidatedCheckRequest, so *v0.SetOperation, reducer Reducer) ReduceableCheckFunc { var requests []ReduceableCheckFunc for _, childOneof := range so.Child { switch child := childOneof.ChildType.(type) { @@ -136,7 +140,7 @@ func (cc *ConcurrentChecker) checkSetOperation(ctx context.Context, req *v1.Disp } } -func (cc *ConcurrentChecker) checkComputedUserset(ctx context.Context, req *v1.DispatchCheckRequest, cu *v0.ComputedUserset, tpl *v0.RelationTuple) ReduceableCheckFunc { +func (cc *ConcurrentChecker) checkComputedUserset(ctx context.Context, req ValidatedCheckRequest, cu *v0.ComputedUserset, tpl *v0.RelationTuple) ReduceableCheckFunc { var start *v0.ObjectAndRelation if cu.Object == v0.ComputedUserset_TUPLE_USERSET_OBJECT { if tpl == nil { @@ -164,7 +168,7 @@ func (cc *ConcurrentChecker) checkComputedUserset(ctx context.Context, req *v1.D } // Check if the target relation exists. If not, return nothing. - err := cc.nsm.CheckNamespaceAndRelation(ctx, start.Namespace, cu.Relation, true) + err := cc.nsm.CheckNamespaceAndRelation(ctx, start.Namespace, cu.Relation, true, req.Revision) if err != nil { if errors.As(err, &namespace.ErrRelationNotFound{}) { return notMember() @@ -173,27 +177,24 @@ func (cc *ConcurrentChecker) checkComputedUserset(ctx context.Context, req *v1.D return checkError(err) } - return cc.dispatch(&v1.DispatchCheckRequest{ - ObjectAndRelation: targetOnr, - Subject: req.Subject, - Metadata: decrementDepth(req.Metadata), + return cc.dispatch(ValidatedCheckRequest{ + &v1.DispatchCheckRequest{ + ObjectAndRelation: targetOnr, + Subject: req.Subject, + Metadata: decrementDepth(req.Metadata), + }, + req.Revision, }) } -func (cc *ConcurrentChecker) checkTupleToUserset(ctx context.Context, req *v1.DispatchCheckRequest, ttu *v0.TupleToUserset) ReduceableCheckFunc { +func (cc *ConcurrentChecker) checkTupleToUserset(ctx context.Context, req ValidatedCheckRequest, ttu *v0.TupleToUserset) ReduceableCheckFunc { return func(ctx context.Context, resultChan chan<- CheckResult) { - requestRevision, err := decimal.NewFromString(req.Metadata.AtRevision) - if err != nil { - resultChan <- checkResultError(NewCheckFailureErr(err), emptyMetadata) - return - } - log.Ctx(ctx).Trace().Object("ttu", req).Send() it, err := cc.ds.QueryTuples(datastore.TupleQueryResourceFilter{ ResourceType: req.ObjectAndRelation.Namespace, OptionalResourceID: req.ObjectAndRelation.ObjectId, OptionalResourceRelation: ttu.Tupleset.Relation, - }, requestRevision).Execute(ctx) + }, req.Revision).Execute(ctx) if err != nil { resultChan <- checkResultError(NewCheckFailureErr(err), emptyMetadata) return diff --git a/internal/graph/expand.go b/internal/graph/expand.go index 68bac49cbc..991d131819 100644 --- a/internal/graph/expand.go +++ b/internal/graph/expand.go @@ -34,8 +34,15 @@ type ConcurrentExpander struct { nsm namespace.Manager } +// ValidatedExpandRequest represents a request after it has been validated and parsed for internal +// consumption. +type ValidatedExpandRequest struct { + *v1.DispatchExpandRequest + Revision decimal.Decimal +} + // Expand performs an expand request with the provided request and context. -func (ce *ConcurrentExpander) Expand(ctx context.Context, req *v1.DispatchExpandRequest, relation *v0.Relation) (*v1.DispatchExpandResponse, error) { +func (ce *ConcurrentExpander) Expand(ctx context.Context, req ValidatedExpandRequest, relation *v0.Relation) (*v1.DispatchExpandResponse, error) { log.Ctx(ctx).Trace().Object("expand", req).Send() var directFunc ReduceableExpandFunc @@ -52,23 +59,17 @@ func (ce *ConcurrentExpander) Expand(ctx context.Context, req *v1.DispatchExpand func (ce *ConcurrentExpander) expandDirect( ctx context.Context, - req *v1.DispatchExpandRequest, + req ValidatedExpandRequest, startBehavior startInclusion, ) ReduceableExpandFunc { log.Ctx(ctx).Trace().Object("direct", req).Send() return func(ctx context.Context, resultChan chan<- ExpandResult) { - requestRevision, err := decimal.NewFromString(req.Metadata.AtRevision) - if err != nil { - resultChan <- expandResultError(NewExpansionFailureErr(err), emptyMetadata) - return - } - it, err := ce.ds.QueryTuples(datastore.TupleQueryResourceFilter{ ResourceType: req.ObjectAndRelation.Namespace, OptionalResourceID: req.ObjectAndRelation.ObjectId, OptionalResourceRelation: req.ObjectAndRelation.Relation, - }, requestRevision).Execute(ctx) + }, req.Revision).Execute(ctx) if err != nil { resultChan <- expandResultError(NewExpansionFailureErr(err), emptyMetadata) return @@ -116,10 +117,13 @@ func (ce *ConcurrentExpander) expandDirect( // found terminals together. var requestsToDispatch []ReduceableExpandFunc for _, nonTerminalUser := range foundNonTerminalUsersets { - requestsToDispatch = append(requestsToDispatch, ce.dispatch(&v1.DispatchExpandRequest{ - ObjectAndRelation: nonTerminalUser.GetUserset(), - Metadata: decrementDepth(req.Metadata), - ExpansionMode: req.ExpansionMode, + requestsToDispatch = append(requestsToDispatch, ce.dispatch(ValidatedExpandRequest{ + &v1.DispatchExpandRequest{ + ObjectAndRelation: nonTerminalUser.GetUserset(), + Metadata: decrementDepth(req.Metadata), + ExpansionMode: req.ExpansionMode, + }, + req.Revision, })) } @@ -142,7 +146,7 @@ func (ce *ConcurrentExpander) expandDirect( } } -func (ce *ConcurrentExpander) expandUsersetRewrite(ctx context.Context, req *v1.DispatchExpandRequest, usr *v0.UsersetRewrite) ReduceableExpandFunc { +func (ce *ConcurrentExpander) expandUsersetRewrite(ctx context.Context, req ValidatedExpandRequest, usr *v0.UsersetRewrite) ReduceableExpandFunc { switch rw := usr.RewriteOperation.(type) { case *v0.UsersetRewrite_Union: log.Ctx(ctx).Trace().Msg("union") @@ -158,7 +162,7 @@ func (ce *ConcurrentExpander) expandUsersetRewrite(ctx context.Context, req *v1. } } -func (ce *ConcurrentExpander) expandSetOperation(ctx context.Context, req *v1.DispatchExpandRequest, so *v0.SetOperation, reducer ExpandReducer) ReduceableExpandFunc { +func (ce *ConcurrentExpander) expandSetOperation(ctx context.Context, req ValidatedExpandRequest, so *v0.SetOperation, reducer ExpandReducer) ReduceableExpandFunc { var requests []ReduceableExpandFunc for _, childOneof := range so.Child { switch child := childOneof.ChildType.(type) { @@ -177,15 +181,15 @@ func (ce *ConcurrentExpander) expandSetOperation(ctx context.Context, req *v1.Di } } -func (ce *ConcurrentExpander) dispatch(req *v1.DispatchExpandRequest) ReduceableExpandFunc { +func (ce *ConcurrentExpander) dispatch(req ValidatedExpandRequest) ReduceableExpandFunc { return func(ctx context.Context, resultChan chan<- ExpandResult) { log.Ctx(ctx).Trace().Object("dispatch expand", req).Send() - result, err := ce.d.DispatchExpand(ctx, req) + result, err := ce.d.DispatchExpand(ctx, req.DispatchExpandRequest) resultChan <- ExpandResult{result, err} } } -func (ce *ConcurrentExpander) expandComputedUserset(ctx context.Context, req *v1.DispatchExpandRequest, cu *v0.ComputedUserset, tpl *v0.RelationTuple) ReduceableExpandFunc { +func (ce *ConcurrentExpander) expandComputedUserset(ctx context.Context, req ValidatedExpandRequest, cu *v0.ComputedUserset, tpl *v0.RelationTuple) ReduceableExpandFunc { log.Ctx(ctx).Trace().Str("relation", cu.Relation).Msg("computed userset") var start *v0.ObjectAndRelation if cu.Object == v0.ComputedUserset_TUPLE_USERSET_OBJECT { @@ -204,7 +208,7 @@ func (ce *ConcurrentExpander) expandComputedUserset(ctx context.Context, req *v1 } // Check if the target relation exists. If not, return nothing. - err := ce.nsm.CheckNamespaceAndRelation(ctx, start.Namespace, cu.Relation, true) + err := ce.nsm.CheckNamespaceAndRelation(ctx, start.Namespace, cu.Relation, true, req.Revision) if err != nil { if errors.As(err, &namespace.ErrRelationNotFound{}) { return emptyExpansion(req.ObjectAndRelation) @@ -213,30 +217,28 @@ func (ce *ConcurrentExpander) expandComputedUserset(ctx context.Context, req *v1 return expandError(err) } - return ce.dispatch(&v1.DispatchExpandRequest{ - ObjectAndRelation: &v0.ObjectAndRelation{ - Namespace: start.Namespace, - ObjectId: start.ObjectId, - Relation: cu.Relation, + return ce.dispatch(ValidatedExpandRequest{ + + &v1.DispatchExpandRequest{ + ObjectAndRelation: &v0.ObjectAndRelation{ + Namespace: start.Namespace, + ObjectId: start.ObjectId, + Relation: cu.Relation, + }, + Metadata: decrementDepth(req.Metadata), + ExpansionMode: req.ExpansionMode, }, - Metadata: decrementDepth(req.Metadata), - ExpansionMode: req.ExpansionMode, + req.Revision, }) } -func (ce *ConcurrentExpander) expandTupleToUserset(ctx context.Context, req *v1.DispatchExpandRequest, ttu *v0.TupleToUserset) ReduceableExpandFunc { +func (ce *ConcurrentExpander) expandTupleToUserset(ctx context.Context, req ValidatedExpandRequest, ttu *v0.TupleToUserset) ReduceableExpandFunc { return func(ctx context.Context, resultChan chan<- ExpandResult) { - requestRevision, err := decimal.NewFromString(req.Metadata.AtRevision) - if err != nil { - resultChan <- expandResultError(NewExpansionFailureErr(err), emptyMetadata) - return - } - it, err := ce.ds.QueryTuples(datastore.TupleQueryResourceFilter{ ResourceType: req.ObjectAndRelation.Namespace, OptionalResourceID: req.ObjectAndRelation.ObjectId, OptionalResourceRelation: ttu.Tupleset.Relation, - }, requestRevision).Execute(ctx) + }, req.Revision).Execute(ctx) if err != nil { resultChan <- expandResultError(NewExpansionFailureErr(err), emptyMetadata) return diff --git a/internal/graph/graph.go b/internal/graph/graph.go index e49d4ef77d..d2eea9bbd7 100644 --- a/internal/graph/graph.go +++ b/internal/graph/graph.go @@ -59,7 +59,7 @@ type ExpandReducer func( type ReduceableLookupFunc func(ctx context.Context, resultChan chan<- LookupResult) // LookupReducer is a type for the functions which combine lookup results. -type LookupReducer func(ctx context.Context, parentReq *v1.DispatchLookupRequest, limit uint32, requests []ReduceableLookupFunc) LookupResult +type LookupReducer func(ctx context.Context, parentReq ValidatedLookupRequest, limit uint32, requests []ReduceableLookupFunc) LookupResult func decrementDepth(md *v1.ResolverMeta) *v1.ResolverMeta { return &v1.ResolverMeta{ diff --git a/internal/graph/lookup.go b/internal/graph/lookup.go index 4c79c6dbe3..8c23f41ed3 100644 --- a/internal/graph/lookup.go +++ b/internal/graph/lookup.go @@ -28,6 +28,13 @@ type ConcurrentLookup struct { nsm namespace.Manager } +// ValidatedLookupRequest represents a request after it has been validated and parsed for internal +// consumption. +type ValidatedLookupRequest struct { + *v1.DispatchLookupRequest + Revision decimal.Decimal +} + // Calculate the maximum int value to allow us to effectively set no limit on certain recursive // lookup calls. const ( @@ -35,7 +42,7 @@ const ( ) // Lookup performs a lookup request with the provided request and context. -func (cl *ConcurrentLookup) Lookup(ctx context.Context, req *v1.DispatchLookupRequest) (*v1.DispatchLookupResponse, error) { +func (cl *ConcurrentLookup) Lookup(ctx context.Context, req ValidatedLookupRequest) (*v1.DispatchLookupResponse, error) { funcToResolve := cl.lookupInternal(ctx, req) resolved := lookupOne(ctx, req, funcToResolve) @@ -56,7 +63,7 @@ func (cl *ConcurrentLookup) Lookup(ctx context.Context, req *v1.DispatchLookupRe return resolved.Resp, resolved.Err } -func (cl *ConcurrentLookup) lookupInternal(ctx context.Context, req *v1.DispatchLookupRequest) ReduceableLookupFunc { +func (cl *ConcurrentLookup) lookupInternal(ctx context.Context, req ValidatedLookupRequest) ReduceableLookupFunc { log.Ctx(ctx).Trace().Object("lookup", req).Send() objSet := tuple.NewONRSet() @@ -67,7 +74,7 @@ func (cl *ConcurrentLookup) lookupInternal(ctx context.Context, req *v1.Dispatch objSet.Add(req.Subject) } - nsdef, typeSystem, _, err := cl.nsm.ReadNamespaceAndTypes(ctx, req.ObjectRelation.Namespace) + nsdef, typeSystem, err := cl.nsm.ReadNamespaceAndTypes(ctx, req.ObjectRelation.Namespace, req.Revision) if err != nil { return returnResult(lookupResultError(req, err, emptyMetadata)) } @@ -112,13 +119,16 @@ func (cl *ConcurrentLookup) lookupInternal(ctx context.Context, req *v1.Dispatch continue } - requests = append(requests, cl.dispatch(&v1.DispatchLookupRequest{ - Subject: obj, - ObjectRelation: req.ObjectRelation, - Limit: req.Limit - objSet.Length(), - Metadata: decrementDepth(req.Metadata), - DirectStack: req.DirectStack, - TtuStack: req.TtuStack, + requests = append(requests, cl.dispatch(ValidatedLookupRequest{ + &v1.DispatchLookupRequest{ + Subject: obj, + ObjectRelation: req.ObjectRelation, + Limit: req.Limit - objSet.Length(), + Metadata: decrementDepth(req.Metadata), + DirectStack: req.DirectStack, + TtuStack: req.TtuStack, + }, + req.Revision, })) } @@ -142,7 +152,7 @@ func (cl *ConcurrentLookup) lookupInternal(ctx context.Context, req *v1.Dispatch return returnResult(lookupResult(req, limitedSlice(objSet.AsSlice(), req.Limit), responseMetadata)) } -func (cl *ConcurrentLookup) lookupDirect(ctx context.Context, req *v1.DispatchLookupRequest, typeSystem *namespace.NamespaceTypeSystem) ReduceableLookupFunc { +func (cl *ConcurrentLookup) lookupDirect(ctx context.Context, req ValidatedLookupRequest, typeSystem *namespace.NamespaceTypeSystem) ReduceableLookupFunc { requests := []ReduceableLookupFunc{} // Ensure type informatione exists on the relation. @@ -160,15 +170,10 @@ func (cl *ConcurrentLookup) lookupDirect(ctx context.Context, req *v1.DispatchLo return returnResult(lookupResultError(req, err, emptyMetadata)) } - requestRevision, err := decimal.NewFromString(req.Metadata.AtRevision) - if err != nil { - return returnResult(lookupResultError(req, err, emptyMetadata)) - } - if isDirectAllowed == namespace.DirectRelationValid { requests = append(requests, func(ctx context.Context, resultChan chan<- LookupResult) { objects := tuple.NewONRSet() - it, err := cl.ds.ReverseQueryTuplesFromSubject(req.Subject, requestRevision). + it, err := cl.ds.ReverseQueryTuplesFromSubject(req.Subject, req.Revision). WithObjectRelation(req.ObjectRelation.Namespace, req.ObjectRelation.Relation). Execute(ctx) if err != nil { @@ -231,16 +236,19 @@ func (cl *ConcurrentLookup) lookupDirect(ctx context.Context, req *v1.DispatchLo requests = append(requests, func(ctx context.Context, resultChan chan<- LookupResult) { // Dispatch on the inferred relation. - inferredRequest := cl.dispatch(&v1.DispatchLookupRequest{ - Subject: req.Subject, - ObjectRelation: &v0.RelationReference{ - Namespace: allowedDirectType.Namespace, - Relation: allowedDirectType.Relation, + inferredRequest := cl.dispatch(ValidatedLookupRequest{ + &v1.DispatchLookupRequest{ + Subject: req.Subject, + ObjectRelation: &v0.RelationReference{ + Namespace: allowedDirectType.Namespace, + Relation: allowedDirectType.Relation, + }, + Limit: noLimit, // Since this is an inferred lookup, we can't limit. + Metadata: decrementDepth(req.Metadata), + DirectStack: directStack, + TtuStack: req.TtuStack, }, - Limit: noLimit, // Since this is an inferred lookup, we can't limit. - Metadata: decrementDepth(req.Metadata), - DirectStack: directStack, - TtuStack: req.TtuStack, + req.Revision, }) result := lookupAny(ctx, req, noLimit, []ReduceableLookupFunc{inferredRequest}) @@ -255,7 +263,7 @@ func (cl *ConcurrentLookup) lookupDirect(ctx context.Context, req *v1.DispatchLo it, err := cl.ds.QueryTuples(datastore.TupleQueryResourceFilter{ ResourceType: req.ObjectRelation.Namespace, OptionalResourceRelation: req.ObjectRelation.Relation, - }, requestRevision).WithUsersets(result.Resp.ResolvedOnrs).Limit(uint64(req.Limit)).Execute(ctx) + }, req.Revision).WithUsersets(result.Resp.ResolvedOnrs).Limit(uint64(req.Limit)).Execute(ctx) if err != nil { resultChan <- lookupResultError(req, err, emptyMetadata) return @@ -284,7 +292,7 @@ func (cl *ConcurrentLookup) lookupDirect(ctx context.Context, req *v1.DispatchLo } } -func (cl *ConcurrentLookup) processRewrite(ctx context.Context, req *v1.DispatchLookupRequest, nsdef *v0.NamespaceDefinition, typeSystem *namespace.NamespaceTypeSystem, usr *v0.UsersetRewrite) ReduceableLookupFunc { +func (cl *ConcurrentLookup) processRewrite(ctx context.Context, req ValidatedLookupRequest, nsdef *v0.NamespaceDefinition, typeSystem *namespace.NamespaceTypeSystem, usr *v0.UsersetRewrite) ReduceableLookupFunc { switch rw := usr.RewriteOperation.(type) { case *v0.UsersetRewrite_Union: return cl.processSetOperation(ctx, req, nsdef, typeSystem, rw.Union, lookupAny) @@ -297,7 +305,7 @@ func (cl *ConcurrentLookup) processRewrite(ctx context.Context, req *v1.Dispatch } } -func (cl *ConcurrentLookup) processSetOperation(ctx context.Context, req *v1.DispatchLookupRequest, nsdef *v0.NamespaceDefinition, typeSystem *namespace.NamespaceTypeSystem, so *v0.SetOperation, reducer LookupReducer) ReduceableLookupFunc { +func (cl *ConcurrentLookup) processSetOperation(ctx context.Context, req ValidatedLookupRequest, nsdef *v0.NamespaceDefinition, typeSystem *namespace.NamespaceTypeSystem, so *v0.SetOperation, reducer LookupReducer) ReduceableLookupFunc { var requests []ReduceableLookupFunc for _, childOneof := range so.Child { @@ -330,7 +338,7 @@ func findRelation(nsdef *v0.NamespaceDefinition, relationName string) (*v0.Relat return nil, false } -func (cl *ConcurrentLookup) processTupleToUserset(ctx context.Context, req *v1.DispatchLookupRequest, nsdef *v0.NamespaceDefinition, typeSystem *namespace.NamespaceTypeSystem, ttu *v0.TupleToUserset) ReduceableLookupFunc { +func (cl *ConcurrentLookup) processTupleToUserset(ctx context.Context, req ValidatedLookupRequest, nsdef *v0.NamespaceDefinition, typeSystem *namespace.NamespaceTypeSystem, ttu *v0.TupleToUserset) ReduceableLookupFunc { // Ensure that we don't process TTUs recursively, as that can cause an infinite loop. nr := &v0.RelationReference{ Namespace: req.ObjectRelation.Namespace, @@ -347,11 +355,6 @@ func (cl *ConcurrentLookup) processTupleToUserset(ctx context.Context, req *v1.D return returnResult(lookupResultError(req, err, emptyMetadata)) } - requestRevision, err := decimal.NewFromString(req.Metadata.AtRevision) - if err != nil { - return returnResult(lookupResultError(req, err, emptyMetadata)) - } - // Dispatch to all the accessible namespaces for the computed userset. requests := []ReduceableLookupFunc{} namespaces := map[string]bool{} @@ -362,7 +365,7 @@ func (cl *ConcurrentLookup) processTupleToUserset(ctx context.Context, req *v1.D continue } - _, directRelTypeSystem, _, err := cl.nsm.ReadNamespaceAndTypes(ctx, directRelation.Namespace) + _, directRelTypeSystem, err := cl.nsm.ReadNamespaceAndTypes(ctx, directRelation.Namespace, req.Revision) if err != nil { return returnResult(lookupResultError(req, err, emptyMetadata)) } @@ -378,16 +381,19 @@ func (cl *ConcurrentLookup) processTupleToUserset(ctx context.Context, req *v1.D requests = append(requests, func(ctx context.Context, resultChan chan<- LookupResult) { // Dispatch a request to perform the computed userset lookup. - computedUsersetRequest := cl.dispatch(&v1.DispatchLookupRequest{ - Subject: req.Subject, - ObjectRelation: &v0.RelationReference{ - Namespace: directRelation.Namespace, - Relation: ttu.ComputedUserset.Relation, + computedUsersetRequest := cl.dispatch(ValidatedLookupRequest{ + &v1.DispatchLookupRequest{ + Subject: req.Subject, + ObjectRelation: &v0.RelationReference{ + Namespace: directRelation.Namespace, + Relation: ttu.ComputedUserset.Relation, + }, + Limit: noLimit, // Since this is a step in the lookup. + Metadata: decrementDepth(req.Metadata), + DirectStack: req.DirectStack, + TtuStack: append(req.TtuStack, nr), }, - Limit: noLimit, // Since this is a step in the lookup. - Metadata: decrementDepth(req.Metadata), - DirectStack: req.DirectStack, - TtuStack: append(req.TtuStack, nr), + req.Revision, }) result := lookupAny(ctx, req, noLimit, []ReduceableLookupFunc{computedUsersetRequest}) @@ -439,7 +445,7 @@ func (cl *ConcurrentLookup) processTupleToUserset(ctx context.Context, req *v1.D it, err := cl.ds.QueryTuples(datastore.TupleQueryResourceFilter{ ResourceType: req.ObjectRelation.Namespace, OptionalResourceRelation: ttu.Tupleset.Relation, - }, requestRevision).WithUsersets(usersets).Limit(uint64(req.Limit)).Execute(ctx) + }, req.Revision).WithUsersets(usersets).Limit(uint64(req.Limit)).Execute(ctx) if err != nil { resultChan <- lookupResultError(req, err, result.Resp.Metadata) return @@ -493,20 +499,24 @@ func (cl *ConcurrentLookup) processTupleToUserset(ctx context.Context, req *v1.D } } -func (cl *ConcurrentLookup) lookupComputed(ctx context.Context, req *v1.DispatchLookupRequest, cu *v0.ComputedUserset) ReduceableLookupFunc { - result := lookupOne(ctx, req, cl.dispatch(&v1.DispatchLookupRequest{ - Subject: req.Subject, - ObjectRelation: &v0.RelationReference{ - Namespace: req.ObjectRelation.Namespace, - Relation: cu.Relation, +func (cl *ConcurrentLookup) lookupComputed(ctx context.Context, req ValidatedLookupRequest, cu *v0.ComputedUserset) ReduceableLookupFunc { + result := lookupOne(ctx, req, cl.dispatch(ValidatedLookupRequest{ + + &v1.DispatchLookupRequest{ + Subject: req.Subject, + ObjectRelation: &v0.RelationReference{ + Namespace: req.ObjectRelation.Namespace, + Relation: cu.Relation, + }, + Limit: req.Limit, + Metadata: decrementDepth(req.Metadata), + DirectStack: append(req.DirectStack, &v0.RelationReference{ + Namespace: req.ObjectRelation.Namespace, + Relation: req.ObjectRelation.Relation, + }), + TtuStack: req.TtuStack, }, - Limit: req.Limit, - Metadata: decrementDepth(req.Metadata), - DirectStack: append(req.DirectStack, &v0.RelationReference{ - Namespace: req.ObjectRelation.Namespace, - Relation: req.ObjectRelation.Relation, - }), - TtuStack: req.TtuStack, + req.Revision, })) if result.Err != nil { @@ -538,15 +548,15 @@ func (cl *ConcurrentLookup) lookupComputed(ctx context.Context, req *v1.Dispatch return returnResult(lookupResult(req, rewrittenResolved, result.Resp.Metadata)) } -func (cl *ConcurrentLookup) dispatch(req *v1.DispatchLookupRequest) ReduceableLookupFunc { +func (cl *ConcurrentLookup) dispatch(req ValidatedLookupRequest) ReduceableLookupFunc { return func(ctx context.Context, resultChan chan<- LookupResult) { log.Ctx(ctx).Trace().Object("dispatch lookup", req).Send() - result, err := cl.d.DispatchLookup(ctx, req) + result, err := cl.d.DispatchLookup(ctx, req.DispatchLookupRequest) resultChan <- LookupResult{result, err} } } -func lookupOne(ctx context.Context, parentReq *v1.DispatchLookupRequest, request ReduceableLookupFunc) LookupResult { +func lookupOne(ctx context.Context, parentReq ValidatedLookupRequest, request ReduceableLookupFunc) LookupResult { childCtx, cancelFn := context.WithCancel(ctx) defer cancelFn() @@ -561,13 +571,13 @@ func lookupOne(ctx context.Context, parentReq *v1.DispatchLookupRequest, request } } -func lookupAnyWithExcludedDirect(ctx context.Context, parentReq *v1.DispatchLookupRequest, limit uint32, requests []ReduceableLookupFunc, excludedDirect []*v0.RelationReference) LookupResult { +func lookupAnyWithExcludedDirect(ctx context.Context, parentReq ValidatedLookupRequest, limit uint32, requests []ReduceableLookupFunc, excludedDirect []*v0.RelationReference) LookupResult { result := lookupAny(ctx, parentReq, limit, requests) result.Resp.Metadata.LookupExcludedDirect = excludedDirect return result } -func lookupAny(ctx context.Context, parentReq *v1.DispatchLookupRequest, limit uint32, requests []ReduceableLookupFunc) LookupResult { +func lookupAny(ctx context.Context, parentReq ValidatedLookupRequest, limit uint32, requests []ReduceableLookupFunc) LookupResult { childCtx, cancelFn := context.WithCancel(ctx) defer cancelFn() @@ -602,7 +612,7 @@ func lookupAny(ctx context.Context, parentReq *v1.DispatchLookupRequest, limit u return lookupResult(parentReq, limitedSlice(objects.AsSlice(), limit), responseMetadata) } -func lookupAll(ctx context.Context, parentReq *v1.DispatchLookupRequest, limit uint32, requests []ReduceableLookupFunc) LookupResult { +func lookupAll(ctx context.Context, parentReq ValidatedLookupRequest, limit uint32, requests []ReduceableLookupFunc) LookupResult { if len(requests) == 0 { return lookupResult(parentReq, []*v0.ObjectAndRelation{}, emptyMetadata) } @@ -647,7 +657,7 @@ func lookupAll(ctx context.Context, parentReq *v1.DispatchLookupRequest, limit u return lookupResult(parentReq, objSet.AsSlice(), responseMetadata) } -func lookupExclude(ctx context.Context, parentReq *v1.DispatchLookupRequest, limit uint32, requests []ReduceableLookupFunc) LookupResult { +func lookupExclude(ctx context.Context, parentReq ValidatedLookupRequest, limit uint32, requests []ReduceableLookupFunc) LookupResult { childCtx, cancelFn := context.WithCancel(ctx) defer cancelFn() @@ -704,7 +714,7 @@ func limitedSlice(slice []*v0.ObjectAndRelation, limit uint32) []*v0.ObjectAndRe return slice } -func lookupResult(req *v1.DispatchLookupRequest, resolvedONRs []*v0.ObjectAndRelation, subProblemMetadata *v1.ResponseMeta) LookupResult { +func lookupResult(req ValidatedLookupRequest, resolvedONRs []*v0.ObjectAndRelation, subProblemMetadata *v1.ResponseMeta) LookupResult { return LookupResult{ &v1.DispatchLookupResponse{ Metadata: ensureMetadata(subProblemMetadata), @@ -714,7 +724,7 @@ func lookupResult(req *v1.DispatchLookupRequest, resolvedONRs []*v0.ObjectAndRel } } -func lookupResultError(req *v1.DispatchLookupRequest, err error, subProblemMetadata *v1.ResponseMeta) LookupResult { +func lookupResultError(req ValidatedLookupRequest, err error, subProblemMetadata *v1.ResponseMeta) LookupResult { return LookupResult{ &v1.DispatchLookupResponse{ Metadata: ensureMetadata(subProblemMetadata), diff --git a/internal/namespace/caching.go b/internal/namespace/caching.go index 28fa5dd608..3d361910d6 100644 --- a/internal/namespace/caching.go +++ b/internal/namespace/caching.go @@ -25,10 +25,8 @@ type cachingManager struct { c *ristretto.Cache } -type cacheEntry struct { - definition *v0.NamespaceDefinition - version decimal.Decimal - expiration time.Time +func cacheKey(nsName string, revision decimal.Decimal) string { + return fmt.Sprintf("%s@%s", nsName, revision) } func NewCachingNamespaceManager( @@ -56,59 +54,49 @@ func NewCachingNamespaceManager( }, nil } -func (nsc cachingManager) ReadNamespaceAndTypes(ctx context.Context, nsName string) (*v0.NamespaceDefinition, *NamespaceTypeSystem, decimal.Decimal, error) { - nsDef, rev, err := nsc.ReadNamespace(ctx, nsName) +func (nsc cachingManager) ReadNamespaceAndTypes(ctx context.Context, nsName string, revision decimal.Decimal) (*v0.NamespaceDefinition, *NamespaceTypeSystem, error) { + nsDef, err := nsc.ReadNamespace(ctx, nsName, revision) if err != nil { - return nsDef, nil, rev, err + return nsDef, nil, err } // TODO(jschorr): Cache the type system too - ts, terr := BuildNamespaceTypeSystemForManager(nsDef, nsc) - return nsDef, ts, rev, terr + ts, terr := BuildNamespaceTypeSystemForManager(nsDef, nsc, revision) + return nsDef, ts, terr } -func (nsc cachingManager) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, decimal.Decimal, error) { +func (nsc cachingManager) ReadNamespace(ctx context.Context, nsName string, revision decimal.Decimal) (*v0.NamespaceDefinition, error) { ctx, span := tracer.Start(ctx, "ReadNamespace") defer span.End() // Check the cache. - now := time.Now() - value, found := nsc.c.Get(nsName) + value, found := nsc.c.Get(cacheKey(nsName, revision)) if found { - foundEntry := value.(cacheEntry) - if foundEntry.expiration.After(now) { - span.AddEvent("Returning namespace from cache") - return foundEntry.definition, foundEntry.version, nil - } + return value.(*v0.NamespaceDefinition), nil } // We couldn't use the cached entry, load one - loaded, version, err := nsc.delegate.ReadNamespace(ctx, nsName) + loaded, _, err := nsc.delegate.ReadNamespace(ctx, nsName, revision) if errors.As(err, &datastore.ErrNamespaceNotFound{}) { - return nil, decimal.Zero, NewNamespaceNotFoundErr(nsName) + return nil, NewNamespaceNotFoundErr(nsName) } if err != nil { - return nil, decimal.Zero, err + return nil, err } // Remove user-defined metadata. loaded = namespace.FilterUserDefinedMetadata(loaded) // Save it to the cache - newEntry := cacheEntry{ - definition: loaded, - version: version, - expiration: now.Add(nsc.expiration), - } - nsc.c.Set(nsName, newEntry, int64(proto.Size(loaded))) + nsc.c.Set(cacheKey(nsName, revision), loaded, int64(proto.Size(loaded))) span.AddEvent("Saved to cache") - return loaded, version, nil + return loaded, nil } -func (nsc cachingManager) CheckNamespaceAndRelation(ctx context.Context, namespace, relation string, allowEllipsis bool) error { - config, _, err := nsc.ReadNamespace(ctx, namespace) +func (nsc cachingManager) CheckNamespaceAndRelation(ctx context.Context, namespace, relation string, allowEllipsis bool, revision decimal.Decimal) error { + config, err := nsc.ReadNamespace(ctx, namespace, revision) if err != nil { return err } diff --git a/internal/namespace/manager.go b/internal/namespace/manager.go index 714d39724b..b9e9d12227 100644 --- a/internal/namespace/manager.go +++ b/internal/namespace/manager.go @@ -16,7 +16,7 @@ type Manager interface { // // Returns ErrNamespaceNotFound if the namespace cannot be found. // Returns the direct downstream error for all other unknown error. - ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, decimal.Decimal, error) + ReadNamespace(ctx context.Context, nsName string, revision decimal.Decimal) (*v0.NamespaceDefinition, error) // CheckNamespaceAndRelation checks that the specified namespace and relation exist in the // datastore. @@ -24,10 +24,10 @@ type Manager interface { // Returns ErrNamespaceNotFound if the namespace cannot be found. // Returns ErrRelationNotFound if the relation was not found in the namespace. // Returns the direct downstream error for all other unknown error. - CheckNamespaceAndRelation(ctx context.Context, namespace, relation string, allowEllipsis bool) error + CheckNamespaceAndRelation(ctx context.Context, namespace, relation string, allowEllipsis bool, revision decimal.Decimal) error // ReadNamespaceAndTypes reads a namespace definition, version, and type system and returns it if found. - ReadNamespaceAndTypes(ctx context.Context, nsName string) (*v0.NamespaceDefinition, *NamespaceTypeSystem, decimal.Decimal, error) + ReadNamespaceAndTypes(ctx context.Context, nsName string, revision decimal.Decimal) (*v0.NamespaceDefinition, *NamespaceTypeSystem, error) // Closes the namespace manager, disposing of any resources. // diff --git a/internal/namespace/typesystem.go b/internal/namespace/typesystem.go index 64d3023139..d180d65c49 100644 --- a/internal/namespace/typesystem.go +++ b/internal/namespace/typesystem.go @@ -5,6 +5,7 @@ import ( "fmt" v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" + "github.com/shopspring/decimal" iv1 "github.com/authzed/spicedb/internal/proto/impl/v1" "github.com/authzed/spicedb/pkg/graph" @@ -32,7 +33,7 @@ type LookupNamespace func(ctx context.Context, name string) (*v0.NamespaceDefini // BuildNamespaceTypeSystemWithFallback constructs a type system view of a namespace definition, with automatic lookup // via the additional defs first, and then the namespace manager as a fallback. -func BuildNamespaceTypeSystemWithFallback(nsDef *v0.NamespaceDefinition, manager Manager, additionalDefs []*v0.NamespaceDefinition) (*NamespaceTypeSystem, error) { +func BuildNamespaceTypeSystemWithFallback(nsDef *v0.NamespaceDefinition, manager Manager, additionalDefs []*v0.NamespaceDefinition, revision decimal.Decimal) (*NamespaceTypeSystem, error) { return BuildNamespaceTypeSystem(nsDef, func(ctx context.Context, namespaceName string) (*v0.NamespaceDefinition, error) { // NOTE: Order is important here: We always check the new definitions before the existing // ones. @@ -45,16 +46,16 @@ func BuildNamespaceTypeSystemWithFallback(nsDef *v0.NamespaceDefinition, manager } // Otherwise, check already defined namespaces. - otherNamespaceDef, _, err := manager.ReadNamespace(ctx, namespaceName) + otherNamespaceDef, err := manager.ReadNamespace(ctx, namespaceName, revision) return otherNamespaceDef, err }) } // BuildNamespaceTypeSystemForManager constructs a type system view of a namespace definition, with automatic lookup // via the namespace manager. -func BuildNamespaceTypeSystemForManager(nsDef *v0.NamespaceDefinition, manager Manager) (*NamespaceTypeSystem, error) { +func BuildNamespaceTypeSystemForManager(nsDef *v0.NamespaceDefinition, manager Manager, revision decimal.Decimal) (*NamespaceTypeSystem, error) { return BuildNamespaceTypeSystem(nsDef, func(ctx context.Context, nsName string) (*v0.NamespaceDefinition, error) { - nsDef, _, err := manager.ReadNamespace(ctx, nsName) + nsDef, err := manager.ReadNamespace(ctx, nsName, revision) return nsDef, err }) } diff --git a/internal/namespace/typesystem_test.go b/internal/namespace/typesystem_test.go index eb98b5e0cc..606aa81b32 100644 --- a/internal/namespace/typesystem_test.go +++ b/internal/namespace/typesystem_test.go @@ -6,6 +6,7 @@ import ( "time" v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" + "github.com/shopspring/decimal" "github.com/stretchr/testify/require" "github.com/authzed/spicedb/internal/datastore/memdb" @@ -178,12 +179,14 @@ func TestTypeSystem(t *testing.T) { nsm, err := NewCachingNamespaceManager(ds, 0*time.Second, nil) require.NoError(err) + var lastRevision decimal.Decimal for _, otherNS := range tc.otherNamespaces { - _, err := ds.WriteNamespace(context.Background(), otherNS) + var err error + lastRevision, err = ds.WriteNamespace(context.Background(), otherNS) require.NoError(err) } - ts, err := BuildNamespaceTypeSystemForManager(tc.toCheck, nsm) + ts, err := BuildNamespaceTypeSystemForManager(tc.toCheck, nsm, lastRevision) require.NoError(err) terr := ts.Validate(context.Background()) diff --git a/internal/services/consistency_test.go b/internal/services/consistency_test.go index 780f2ab82f..978c5a637b 100644 --- a/internal/services/consistency_test.go +++ b/internal/services/consistency_test.go @@ -74,7 +74,7 @@ func TestConsistency(t *testing.T) { // Validate the type system for each namespace. for _, nsDef := range fullyResolved.NamespaceDefinitions { - _, ts, _, err := ns.ReadNamespaceAndTypes(context.Background(), nsDef.Name) + _, ts, err := ns.ReadNamespaceAndTypes(context.Background(), nsDef.Name, revision) lrequire.NoError(err) err = ts.Validate(context.Background()) diff --git a/internal/services/shared/schema.go b/internal/services/shared/schema.go index fa112a5e98..cd0885cdfc 100644 --- a/internal/services/shared/schema.go +++ b/internal/services/shared/schema.go @@ -5,6 +5,7 @@ import ( "errors" v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" + "github.com/shopspring/decimal" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -42,12 +43,12 @@ func EnsureNoRelationshipsExist(ctx context.Context, ds datastore.Datastore, nam // SanityCheckExistingRelationships ensures that a namespace definition being written does not result // in relationships without associated defined schema object definitions and relations. -func SanityCheckExistingRelationships(ctx context.Context, ds datastore.Datastore, nsdef *v0.NamespaceDefinition) error { +func SanityCheckExistingRelationships(ctx context.Context, ds datastore.Datastore, nsdef *v0.NamespaceDefinition, revision decimal.Decimal) error { // Ensure that the updated namespace does not break the existing tuple data. // // NOTE: We use the datastore here to read the namespace, rather than the namespace manager, // to ensure there is no caching being used. - existing, _, err := ds.ReadNamespace(ctx, nsdef.Name) + existing, _, err := ds.ReadNamespace(ctx, nsdef.Name, revision) if err != nil && !errors.As(err, &datastore.ErrNamespaceNotFound{}) { return err } diff --git a/internal/services/v0/acl.go b/internal/services/v0/acl.go index aa488e175c..f090e94254 100644 --- a/internal/services/v0/acl.go +++ b/internal/services/v0/acl.go @@ -66,8 +66,13 @@ func NewACLServer(ds datastore.Datastore, nsm namespace.Manager, dispatch dispat } func (as *aclServer) Write(ctx context.Context, req *v0.WriteRequest) (*v0.WriteResponse, error) { + atRevision, err := as.ds.SyncRevision(ctx) + if err != nil { + return nil, rewriteACLError(ctx, err) + } + for _, mutation := range req.Updates { - err := validateTupleWrite(ctx, mutation.Tuple, as.nsm) + err := validateTupleWrite(ctx, mutation.Tuple, as.nsm, atRevision) if err != nil { return nil, rewriteACLError(ctx, err) } @@ -97,6 +102,24 @@ func (as *aclServer) Write(ctx context.Context, req *v0.WriteRequest) (*v0.Write } func (as *aclServer) Read(ctx context.Context, req *v0.ReadRequest) (*v0.ReadResponse, error) { + var atRevision decimal.Decimal + if req.AtRevision != nil { + // Read should attempt to use the exact revision requested + decoded, err := zookie.DecodeRevision(req.AtRevision) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "bad request revision: %s", err) + } + + atRevision = decoded + } else { + // No revision provided, we'll pick one + var err error + atRevision, err = as.ds.Revision(ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "unable to pick request revision: %s", err) + } + } + for _, tuplesetFilter := range req.Tuplesets { checkedRelation := false for _, filter := range tuplesetFilter.Filters { @@ -120,6 +143,7 @@ func (as *aclServer) Read(ctx context.Context, req *v0.ReadRequest) (*v0.ReadRes tuplesetFilter.Namespace, tuplesetFilter.Relation, false, // Disallow ellipsis + atRevision, ); err != nil { return nil, rewriteACLError(ctx, err) } @@ -146,30 +170,13 @@ func (as *aclServer) Read(ctx context.Context, req *v0.ReadRequest) (*v0.ReadRes tuplesetFilter.Namespace, datastore.Ellipsis, true, // Allow ellipsis + atRevision, ); err != nil { return nil, rewriteACLError(ctx, err) } } } - var atRevision decimal.Decimal - if req.AtRevision != nil { - // Read should attempt to use the exact revision requested - decoded, err := zookie.DecodeRevision(req.AtRevision) - if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "bad request revision: %s", err) - } - - atRevision = decoded - } else { - // No revision provided, we'll pick one - var err error - atRevision, err = as.ds.Revision(ctx) - if err != nil { - return nil, status.Errorf(codes.Internal, "unable to pick request revision: %s", err) - } - } - err := as.ds.CheckRevision(ctx, atRevision) if err != nil { return nil, rewriteACLError(ctx, err) @@ -248,12 +255,12 @@ func (as *aclServer) commonCheck( start *v0.ObjectAndRelation, goal *v0.ObjectAndRelation, ) (*v0.CheckResponse, error) { - err := as.nsm.CheckNamespaceAndRelation(ctx, start.Namespace, start.Relation, false) + err := as.nsm.CheckNamespaceAndRelation(ctx, start.Namespace, start.Relation, false, atRevision) if err != nil { return nil, rewriteACLError(ctx, err) } - err = as.nsm.CheckNamespaceAndRelation(ctx, goal.Namespace, goal.Relation, true) + err = as.nsm.CheckNamespaceAndRelation(ctx, goal.Namespace, goal.Relation, true, atRevision) if err != nil { return nil, rewriteACLError(ctx, err) } @@ -290,12 +297,12 @@ func (as *aclServer) commonCheck( } func (as *aclServer) Expand(ctx context.Context, req *v0.ExpandRequest) (*v0.ExpandResponse, error) { - err := as.nsm.CheckNamespaceAndRelation(ctx, req.Userset.Namespace, req.Userset.Relation, false) + atRevision, err := as.pickBestRevision(ctx, req.AtRevision) if err != nil { return nil, rewriteACLError(ctx, err) } - atRevision, err := as.pickBestRevision(ctx, req.AtRevision) + err = as.nsm.CheckNamespaceAndRelation(ctx, req.Userset.Namespace, req.Userset.Relation, false, atRevision) if err != nil { return nil, rewriteACLError(ctx, err) } @@ -320,17 +327,17 @@ func (as *aclServer) Expand(ctx context.Context, req *v0.ExpandRequest) (*v0.Exp } func (as *aclServer) Lookup(ctx context.Context, req *v0.LookupRequest) (*v0.LookupResponse, error) { - err := as.nsm.CheckNamespaceAndRelation(ctx, req.User.Namespace, req.User.Relation, true) + atRevision, err := as.pickBestRevision(ctx, req.AtRevision) if err != nil { return nil, rewriteACLError(ctx, err) } - err = as.nsm.CheckNamespaceAndRelation(ctx, req.ObjectRelation.Namespace, req.ObjectRelation.Relation, false) + err = as.nsm.CheckNamespaceAndRelation(ctx, req.User.Namespace, req.User.Relation, true, atRevision) if err != nil { return nil, rewriteACLError(ctx, err) } - atRevision, err := as.pickBestRevision(ctx, req.AtRevision) + err = as.nsm.CheckNamespaceAndRelation(ctx, req.ObjectRelation.Namespace, req.ObjectRelation.Relation, false, atRevision) if err != nil { return nil, rewriteACLError(ctx, err) } diff --git a/internal/services/v0/devcontext.go b/internal/services/v0/devcontext.go index f72d2b4d37..8f7f696831 100644 --- a/internal/services/v0/devcontext.go +++ b/internal/services/v0/devcontext.go @@ -72,7 +72,9 @@ func newDevContext(ctx context.Context, requestContext *v0.RequestContext, ds da return &DevContext{Ctx: ctx, NamespaceManager: nsm, RequestErrors: []*v0.DeveloperError{devError}}, false, nil } - requestErrors, err := loadNamespaces(ctx, namespaces, nsm, ds) + var currentRevision decimal.Decimal + var requestErrors []*v0.DeveloperError + requestErrors, currentRevision, err = loadNamespaces(ctx, namespaces, nsm, ds) if err != nil { return &DevContext{Ctx: ctx, NamespaceManager: nsm}, false, err } @@ -82,7 +84,7 @@ func newDevContext(ctx context.Context, requestContext *v0.RequestContext, ds da } if len(requestContext.LegacyNsConfigs) > 0 { - requestErrors, err := loadNamespaces(ctx, requestContext.LegacyNsConfigs, nsm, ds) + requestErrors, currentRevision, err = loadNamespaces(ctx, requestContext.LegacyNsConfigs, nsm, ds) if err != nil { return &DevContext{Ctx: ctx, NamespaceManager: nsm}, false, err } @@ -92,7 +94,7 @@ func newDevContext(ctx context.Context, requestContext *v0.RequestContext, ds da } } - revision, requestErrors, err := loadTuples(ctx, requestContext.Relationships, nsm, ds) + revision, requestErrors, err := loadTuples(ctx, requestContext.Relationships, nsm, ds, currentRevision) if err != nil { return &DevContext{Ctx: ctx, NamespaceManager: nsm, Namespaces: namespaces}, false, err } @@ -162,7 +164,7 @@ func compile(schema string) ([]*v0.NamespaceDefinition, *v0.DeveloperError, erro return namespaces, nil, nil } -func loadTuples(ctx context.Context, tuples []*v0.RelationTuple, nsm namespace.Manager, ds datastore.Datastore) (decimal.Decimal, []*v0.DeveloperError, error) { +func loadTuples(ctx context.Context, tuples []*v0.RelationTuple, nsm namespace.Manager, ds datastore.Datastore, revision decimal.Decimal) (decimal.Decimal, []*v0.DeveloperError, error) { var errors []*v0.DeveloperError var updates []*v1.RelationshipUpdate for _, tpl := range tuples { @@ -177,7 +179,7 @@ func loadTuples(ctx context.Context, tuples []*v0.RelationTuple, nsm namespace.M continue } - err := validateTupleWrite(ctx, tpl, nsm) + err := validateTupleWrite(ctx, tpl, nsm, revision) if err != nil { verrs, wireErr := rewriteGraphError(ctx, v0.DeveloperError_RELATIONSHIP, 0, 0, tuple.String(tpl), err) if wireErr == nil { @@ -198,19 +200,26 @@ func loadTuples(ctx context.Context, tuples []*v0.RelationTuple, nsm namespace.M return revision, errors, err } -func loadNamespaces(ctx context.Context, namespaces []*v0.NamespaceDefinition, nsm namespace.Manager, ds datastore.Datastore) ([]*v0.DeveloperError, error) { +func loadNamespaces( + ctx context.Context, + namespaces []*v0.NamespaceDefinition, + nsm namespace.Manager, + ds datastore.Datastore, +) ([]*v0.DeveloperError, decimal.Decimal, error) { var errors []*v0.DeveloperError + var lastRevision decimal.Decimal for _, nsDef := range namespaces { ts, terr := namespace.BuildNamespaceTypeSystemForDefs(nsDef, namespaces) if terr != nil { - return errors, terr + return errors, lastRevision, terr } tverr := ts.Validate(ctx) if tverr == nil { - _, err := ds.WriteNamespace(ctx, nsDef) + var err error + lastRevision, err = ds.WriteNamespace(ctx, nsDef) if err != nil { - return errors, err + return errors, lastRevision, err } continue } @@ -223,5 +232,5 @@ func loadNamespaces(ctx context.Context, namespaces []*v0.NamespaceDefinition, n }) } - return errors, nil + return errors, lastRevision, nil } diff --git a/internal/services/v0/namespace.go b/internal/services/v0/namespace.go index 43cac1f411..04a32d66f1 100644 --- a/internal/services/v0/namespace.go +++ b/internal/services/v0/namespace.go @@ -45,9 +45,14 @@ func (nss *nsServer) WriteConfig(ctx context.Context, req *v0.WriteConfigRequest return nil, rewriteNamespaceError(ctx, err) } + readRevision, err := nss.ds.SyncRevision(ctx) + if err != nil { + return nil, rewriteNamespaceError(ctx, err) + } + for _, config := range req.Configs { // Validate the type system for the updated namespace. - ts, terr := namespace.BuildNamespaceTypeSystemWithFallback(config, nsm, req.Configs) + ts, terr := namespace.BuildNamespaceTypeSystemWithFallback(config, nsm, req.Configs, readRevision) if terr != nil { return nil, rewriteNamespaceError(ctx, terr) } @@ -61,7 +66,7 @@ func (nss *nsServer) WriteConfig(ctx context.Context, req *v0.WriteConfigRequest // // NOTE: We use the datastore here to read the namespace, rather than the namespace manager, // to ensure there is no caching being used. - existing, _, err := nss.ds.ReadNamespace(ctx, config.Name) + existing, _, err := nss.ds.ReadNamespace(ctx, config.Name, readRevision) if err != nil && !errors.As(err, &datastore.ErrNamespaceNotFound{}) { return nil, rewriteNamespaceError(ctx, err) } @@ -128,7 +133,12 @@ func (nss *nsServer) WriteConfig(ctx context.Context, req *v0.WriteConfigRequest } func (nss *nsServer) ReadConfig(ctx context.Context, req *v0.ReadConfigRequest) (*v0.ReadConfigResponse, error) { - found, version, err := nss.ds.ReadNamespace(ctx, req.Namespace) + readRevision, err := nss.ds.SyncRevision(ctx) + if err != nil { + return nil, rewriteNamespaceError(ctx, err) + } + + found, _, err := nss.ds.ReadNamespace(ctx, req.Namespace, readRevision) if err != nil { return nil, rewriteNamespaceError(ctx, err) } @@ -136,7 +146,7 @@ func (nss *nsServer) ReadConfig(ctx context.Context, req *v0.ReadConfigRequest) return &v0.ReadConfigResponse{ Namespace: req.Namespace, Config: found, - Revision: zookie.NewFromRevision(version), + Revision: zookie.NewFromRevision(readRevision), }, nil } @@ -149,7 +159,7 @@ func (nss *nsServer) DeleteConfigs(ctx context.Context, req *v0.DeleteConfigsReq // Ensure that all the specified namespaces can be deleted. for _, nsName := range req.Namespaces { // Ensure the namespace exists. - _, _, err := nss.ds.ReadNamespace(ctx, nsName) + _, _, err := nss.ds.ReadNamespace(ctx, nsName, syncRevision) if err != nil { return nil, rewriteNamespaceError(ctx, err) } diff --git a/internal/services/v0/validation.go b/internal/services/v0/validation.go index 474d52ebbf..a9c26cf0db 100644 --- a/internal/services/v0/validation.go +++ b/internal/services/v0/validation.go @@ -5,6 +5,7 @@ import ( "fmt" v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" + "github.com/shopspring/decimal" "github.com/authzed/spicedb/internal/namespace" ) @@ -15,12 +16,18 @@ type invalidRelationError struct { onr *v0.ObjectAndRelation } -func validateTupleWrite(ctx context.Context, tpl *v0.RelationTuple, nsm namespace.Manager) error { +func validateTupleWrite( + ctx context.Context, + tpl *v0.RelationTuple, + nsm namespace.Manager, + revision decimal.Decimal, +) error { if err := nsm.CheckNamespaceAndRelation( ctx, tpl.ObjectAndRelation.Namespace, tpl.ObjectAndRelation.Relation, false, // Disallow ellipsis + revision, ); err != nil { return err } @@ -30,11 +37,12 @@ func validateTupleWrite(ctx context.Context, tpl *v0.RelationTuple, nsm namespac tpl.User.GetUserset().Namespace, tpl.User.GetUserset().Relation, true, // Allow Ellipsis + revision, ); err != nil { return err } - _, ts, _, err := nsm.ReadNamespaceAndTypes(ctx, tpl.ObjectAndRelation.Namespace) + _, ts, err := nsm.ReadNamespaceAndTypes(ctx, tpl.ObjectAndRelation.Namespace, revision) if err != nil { return err } diff --git a/internal/services/v0/watch.go b/internal/services/v0/watch.go index cfb56ffbcc..933ad1f916 100644 --- a/internal/services/v0/watch.go +++ b/internal/services/v0/watch.go @@ -35,17 +35,6 @@ func NewWatchServer(ds datastore.Datastore, nsm namespace.Manager) v0.WatchServi } func (ws *watchServer) Watch(req *v0.WatchRequest, stream v0.WatchService_WatchServer) error { - namespaceMap := make(map[string]struct{}) - for _, ns := range req.Namespaces { - err := ws.nsm.CheckNamespaceAndRelation(stream.Context(), ns, datastore.Ellipsis, true) - if err != nil { - return status.Errorf(codes.FailedPrecondition, "invalid namespace: %s", err) - } - - namespaceMap[ns] = struct{}{} - } - filter := namespaceFilter{namespaces: namespaceMap} - var afterRevision decimal.Decimal if req.StartRevision != nil && req.StartRevision.Token != "" { decodedRevision, err := zookie.DecodeRevision(req.StartRevision) @@ -62,6 +51,17 @@ func (ws *watchServer) Watch(req *v0.WatchRequest, stream v0.WatchService_WatchS } } + namespaceMap := make(map[string]struct{}) + for _, ns := range req.Namespaces { + err := ws.nsm.CheckNamespaceAndRelation(stream.Context(), ns, datastore.Ellipsis, true, afterRevision) + if err != nil { + return status.Errorf(codes.FailedPrecondition, "invalid namespace: %s", err) + } + + namespaceMap[ns] = struct{}{} + } + filter := namespaceFilter{namespaces: namespaceMap} + updates, errchan := ws.ds.Watch(stream.Context(), afterRevision) for { select { diff --git a/internal/services/v1/permissions.go b/internal/services/v1/permissions.go index b38abd1142..520fd30086 100644 --- a/internal/services/v1/permissions.go +++ b/internal/services/v1/permissions.go @@ -17,7 +17,7 @@ import ( func (ps *permissionServer) CheckPermission(ctx context.Context, req *v1.CheckPermissionRequest) (*v1.CheckPermissionResponse, error) { atRevision, checkedAt := consistency.MustRevisionFromContext(ctx) - err := ps.nsm.CheckNamespaceAndRelation(ctx, req.Resource.ObjectType, req.Permission, false) + err := ps.nsm.CheckNamespaceAndRelation(ctx, req.Resource.ObjectType, req.Permission, false, atRevision) if err != nil { return nil, rewritePermissionsError(ctx, err) } @@ -25,7 +25,9 @@ func (ps *permissionServer) CheckPermission(ctx context.Context, req *v1.CheckPe err = ps.nsm.CheckNamespaceAndRelation(ctx, req.Subject.Object.ObjectType, normalizeSubjectRelation(req.Subject), - true) + true, + atRevision, + ) if err != nil { return nil, rewritePermissionsError(ctx, err) } @@ -70,7 +72,7 @@ func (ps *permissionServer) CheckPermission(ctx context.Context, req *v1.CheckPe func (ps *permissionServer) ExpandPermissionTree(ctx context.Context, req *v1.ExpandPermissionTreeRequest) (*v1.ExpandPermissionTreeResponse, error) { atRevision, expandedAt := consistency.MustRevisionFromContext(ctx) - err := ps.nsm.CheckNamespaceAndRelation(ctx, req.Resource.ObjectType, req.Permission, false) + err := ps.nsm.CheckNamespaceAndRelation(ctx, req.Resource.ObjectType, req.Permission, false, atRevision) if err != nil { return nil, rewritePermissionsError(ctx, err) } @@ -252,13 +254,18 @@ func (ps *permissionServer) LookupResources(req *v1.LookupResourcesRequest, resp ctx := resp.Context() atRevision, revisionReadAt := consistency.MustRevisionFromContext(ctx) - err := ps.nsm.CheckNamespaceAndRelation(ctx, req.Subject.Object.ObjectType, - normalizeSubjectRelation(req.Subject), true) + err := ps.nsm.CheckNamespaceAndRelation( + ctx, + req.Subject.Object.ObjectType, + normalizeSubjectRelation(req.Subject), + true, + atRevision, + ) if err != nil { return rewritePermissionsError(ctx, err) } - err = ps.nsm.CheckNamespaceAndRelation(ctx, req.ResourceObjectType, req.Permission, false) + err = ps.nsm.CheckNamespaceAndRelation(ctx, req.ResourceObjectType, req.Permission, false, atRevision) if err != nil { return rewritePermissionsError(ctx, err) } diff --git a/internal/services/v1/relationships.go b/internal/services/v1/relationships.go index 1e99f2fedc..86d895aa90 100644 --- a/internal/services/v1/relationships.go +++ b/internal/services/v1/relationships.go @@ -9,6 +9,7 @@ import ( grpcvalidate "github.com/grpc-ecosystem/go-grpc-middleware/validator" "github.com/jzelinskie/stringz" "github.com/rs/zerolog/log" + "github.com/shopspring/decimal" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -61,14 +62,14 @@ type permissionServer struct { defaultDepth uint32 } -func (ps *permissionServer) checkFilterComponent(ctx context.Context, objectType, optionalRelation string) error { +func (ps *permissionServer) checkFilterComponent(ctx context.Context, objectType, optionalRelation string, revision decimal.Decimal) error { relationToTest := stringz.DefaultEmpty(optionalRelation, datastore.Ellipsis) allowEllipsis := optionalRelation == "" - return ps.nsm.CheckNamespaceAndRelation(ctx, objectType, relationToTest, allowEllipsis) + return ps.nsm.CheckNamespaceAndRelation(ctx, objectType, relationToTest, allowEllipsis, revision) } -func (ps *permissionServer) checkFilterNamespaces(ctx context.Context, filter *v1.RelationshipFilter) error { - if err := ps.checkFilterComponent(ctx, filter.ResourceType, filter.OptionalRelation); err != nil { +func (ps *permissionServer) checkFilterNamespaces(ctx context.Context, filter *v1.RelationshipFilter, revision decimal.Decimal) error { + if err := ps.checkFilterComponent(ctx, filter.ResourceType, filter.OptionalRelation, revision); err != nil { return err } @@ -77,7 +78,7 @@ func (ps *permissionServer) checkFilterNamespaces(ctx context.Context, filter *v if subjectFilter.OptionalRelation != nil { subjectRelation = subjectFilter.OptionalRelation.Relation } - if err := ps.checkFilterComponent(ctx, subjectFilter.SubjectType, subjectRelation); err != nil { + if err := ps.checkFilterComponent(ctx, subjectFilter.SubjectType, subjectRelation, revision); err != nil { return err } } @@ -90,7 +91,7 @@ func (ps *permissionServer) ReadRelationships(req *v1.ReadRelationshipsRequest, atRevision, revisionReadAt := consistency.MustRevisionFromContext(ctx) - if err := ps.checkFilterNamespaces(ctx, req.RelationshipFilter); err != nil { + if err := ps.checkFilterNamespaces(ctx, req.RelationshipFilter, atRevision); err != nil { return rewritePermissionsError(ctx, err) } @@ -146,8 +147,13 @@ func (ps *permissionServer) ReadRelationships(req *v1.ReadRelationshipsRequest, } func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.WriteRelationshipsRequest) (*v1.WriteRelationshipsResponse, error) { + readRevision, err := ps.ds.SyncRevision(ctx) + if err != nil { + return nil, rewritePermissionsError(ctx, err) + } + for _, precond := range req.OptionalPreconditions { - if err := ps.checkFilterNamespaces(ctx, precond.Filter); err != nil { + if err := ps.checkFilterNamespaces(ctx, precond.Filter, readRevision); err != nil { return nil, rewritePermissionsError(ctx, err) } } @@ -158,6 +164,7 @@ func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.Writ update.Relationship.Resource.ObjectType, update.Relationship.Relation, false, + readRevision, ); err != nil { return nil, rewritePermissionsError(ctx, err) } @@ -167,11 +174,12 @@ func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.Writ update.Relationship.Subject.Object.ObjectType, stringz.DefaultEmpty(update.Relationship.Subject.OptionalRelation, datastore.Ellipsis), true, + readRevision, ); err != nil { return nil, rewritePermissionsError(ctx, err) } - _, ts, _, err := ps.nsm.ReadNamespaceAndTypes(ctx, update.Relationship.Resource.ObjectType) + _, ts, err := ps.nsm.ReadNamespaceAndTypes(ctx, update.Relationship.Resource.ObjectType, readRevision) if err != nil { return nil, err } @@ -214,7 +222,12 @@ func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.Writ } func (ps *permissionServer) DeleteRelationships(ctx context.Context, req *v1.DeleteRelationshipsRequest) (*v1.DeleteRelationshipsResponse, error) { - if err := ps.checkFilterNamespaces(ctx, req.RelationshipFilter); err != nil { + readRevision, err := ps.ds.SyncRevision(ctx) + if err != nil { + return nil, rewritePermissionsError(ctx, err) + } + + if err := ps.checkFilterNamespaces(ctx, req.RelationshipFilter, readRevision); err != nil { return nil, rewritePermissionsError(ctx, err) } diff --git a/internal/services/v1/schema.go b/internal/services/v1/schema.go index 0bd64c47af..88844ec1db 100644 --- a/internal/services/v1/schema.go +++ b/internal/services/v1/schema.go @@ -40,7 +40,12 @@ type schemaServer struct { } func (ss *schemaServer) ReadSchema(ctx context.Context, in *v1.ReadSchemaRequest) (*v1.ReadSchemaResponse, error) { - nsDefs, err := ss.ds.ListNamespaces(ctx) + readRevision, err := ss.ds.SyncRevision(ctx) + if err != nil { + return nil, rewritePermissionsError(ctx, err) + } + + nsDefs, err := ss.ds.ListNamespaces(ctx, readRevision) if err != nil { return nil, rewriteSchemaError(ctx, err) } @@ -62,13 +67,19 @@ func (ss *schemaServer) ReadSchema(ctx context.Context, in *v1.ReadSchemaRequest func (ss *schemaServer) WriteSchema(ctx context.Context, in *v1.WriteSchemaRequest) (*v1.WriteSchemaResponse, error) { log.Ctx(ctx).Trace().Str("schema", in.GetSchema()).Msg("requested Schema to be written") + + readRevision, err := ss.ds.SyncRevision(ctx) + if err != nil { + return nil, rewritePermissionsError(ctx, err) + } + inputSchema := compiler.InputSchema{ Source: input.InputSource("schema"), SchemaString: in.GetSchema(), } // Build a map of existing definitions to determine those being removed, if any. - existingDefs, err := ss.ds.ListNamespaces(ctx) + existingDefs, err := ss.ds.ListNamespaces(ctx, readRevision) if err != nil { return nil, rewriteSchemaError(ctx, err) } @@ -98,7 +109,7 @@ func (ss *schemaServer) WriteSchema(ctx context.Context, in *v1.WriteSchemaReque return nil, rewriteSchemaError(ctx, err) } - if err := shared.SanityCheckExistingRelationships(ctx, ss.ds, nsdef); err != nil { + if err := shared.SanityCheckExistingRelationships(ctx, ss.ds, nsdef, readRevision); err != nil { return nil, rewriteSchemaError(ctx, err) } diff --git a/internal/services/v1alpha1/schema.go b/internal/services/v1alpha1/schema.go index a80765884c..49dca0c8c6 100644 --- a/internal/services/v1alpha1/schema.go +++ b/internal/services/v1alpha1/schema.go @@ -59,21 +59,26 @@ func NewSchemaServer(ds datastore.Datastore, prefixRequired PrefixRequiredOption } func (ss *schemaServiceServer) ReadSchema(ctx context.Context, in *v1alpha1.ReadSchemaRequest) (*v1alpha1.ReadSchemaResponse, error) { + syncRevision, err := ss.ds.SyncRevision(ctx) + if err != nil { + return nil, rewriteError(ctx, err) + } + var objectDefs []string - revisions := make(map[string]datastore.Revision, len(in.GetObjectDefinitionsNames())) + createdRevisions := make(map[string]datastore.Revision, len(in.GetObjectDefinitionsNames())) for _, objectDefName := range in.GetObjectDefinitionsNames() { - found, revision, err := ss.ds.ReadNamespace(ctx, objectDefName) + found, createdAt, err := ss.ds.ReadNamespace(ctx, objectDefName, syncRevision) if err != nil { return nil, rewriteError(ctx, err) } - revisions[objectDefName] = revision + createdRevisions[objectDefName] = createdAt objectDef, _ := generator.GenerateSource(found) objectDefs = append(objectDefs, objectDef) } - computedRevision, err := nspkg.ComputeV1Alpha1Revision(revisions) + computedRevision, err := nspkg.ComputeV1Alpha1Revision(createdRevisions) if err != nil { return nil, rewriteError(ctx, err) } @@ -107,10 +112,15 @@ func (ss *schemaServiceServer) WriteSchema(ctx context.Context, in *v1alpha1.Wri return nil, rewriteError(ctx, err) } + syncRevision, err := ss.ds.SyncRevision(ctx) + if err != nil { + return nil, rewriteError(ctx, err) + } + log.Ctx(ctx).Trace().Interface("namespace definitions", nsdefs).Msg("compiled namespace definitions") for _, nsdef := range nsdefs { - ts, err := namespace.BuildNamespaceTypeSystemWithFallback(nsdef, nsm, nsdefs) + ts, err := namespace.BuildNamespaceTypeSystemWithFallback(nsdef, nsm, nsdefs, syncRevision) if err != nil { return nil, rewriteError(ctx, err) } @@ -119,7 +129,7 @@ func (ss *schemaServiceServer) WriteSchema(ctx context.Context, in *v1alpha1.Wri return nil, rewriteError(ctx, err) } - if err := shared.SanityCheckExistingRelationships(ctx, ss.ds, nsdef); err != nil { + if err := shared.SanityCheckExistingRelationships(ctx, ss.ds, nsdef, syncRevision); err != nil { return nil, rewriteError(ctx, err) } } @@ -134,7 +144,7 @@ func (ss *schemaServiceServer) WriteSchema(ctx context.Context, in *v1alpha1.Wri } for nsName, existingRevision := range decoded { - _, revision, err := ss.ds.ReadNamespace(ctx, nsName) + _, createdAt, err := ss.ds.ReadNamespace(ctx, nsName, syncRevision) if err != nil { var nsNotFoundError sharederrors.UnknownNamespaceError if errors.As(err, &nsNotFoundError) { @@ -146,7 +156,7 @@ func (ss *schemaServiceServer) WriteSchema(ctx context.Context, in *v1alpha1.Wri return nil, rewriteError(ctx, err) } - if !revision.Equal(existingRevision) { + if !createdAt.Equal(existingRevision) { return nil, rewriteError(ctx, &writeSchemaPreconditionFailure{ errors.New("current schema differs from the revision specified"), }) diff --git a/internal/testfixtures/validating.go b/internal/testfixtures/validating.go index 82cc005464..b68c9b6842 100644 --- a/internal/testfixtures/validating.go +++ b/internal/testfixtures/validating.go @@ -82,14 +82,18 @@ func (vd validatingDatastore) WriteNamespace(ctx context.Context, newConfig *v0. return vd.delegate.WriteNamespace(ctx, newConfig) } -func (vd validatingDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) { - read, rev, err := vd.delegate.ReadNamespace(ctx, nsName) +func (vd validatingDatastore) ReadNamespace( + ctx context.Context, + nsName string, + revision datastore.Revision, +) (*v0.NamespaceDefinition, datastore.Revision, error) { + read, createdAt, err := vd.delegate.ReadNamespace(ctx, nsName, revision) if err != nil { - return read, rev, err + return read, createdAt, err } err = read.Validate() - return read, rev, err + return read, createdAt, err } func (vd validatingDatastore) DeleteNamespace(ctx context.Context, nsName string) (datastore.Revision, error) { @@ -110,7 +114,7 @@ func (vd validatingDatastore) ReverseQueryTuplesFromSubjectNamespace(subjectName nil, vd.delegate.ReverseQueryTuplesFromSubjectNamespace(subjectNamespace, revision), nil, - fmt.Errorf("Empty subject namespace given to ReverseQueryTuplesFromSubjectNamespace"), + fmt.Errorf("empty subject namespace given to ReverseQueryTuplesFromSubjectNamespace"), } } @@ -128,7 +132,7 @@ func (vd validatingDatastore) ReverseQueryTuplesFromSubjectRelation(subjectNames nil, vd.delegate.ReverseQueryTuplesFromSubjectNamespace(subjectNamespace, revision), nil, - fmt.Errorf("Empty subject namespace given to ReverseQueryTuplesFromSubjectRelation"), + fmt.Errorf("empty subject namespace given to ReverseQueryTuplesFromSubjectRelation"), } } @@ -137,7 +141,7 @@ func (vd validatingDatastore) ReverseQueryTuplesFromSubjectRelation(subjectNames nil, vd.delegate.ReverseQueryTuplesFromSubjectNamespace(subjectNamespace, revision), nil, - fmt.Errorf("Empty subject relation given to ReverseQueryTuplesFromSubjectRelation"), + fmt.Errorf("empty subject relation given to ReverseQueryTuplesFromSubjectRelation"), } } @@ -148,8 +152,11 @@ func (vd validatingDatastore) CheckRevision(ctx context.Context, revision datast return vd.delegate.CheckRevision(ctx, revision) } -func (vd validatingDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) { - read, err := vd.delegate.ListNamespaces(ctx) +func (vd validatingDatastore) ListNamespaces( + ctx context.Context, + revision datastore.Revision, +) ([]*v0.NamespaceDefinition, error) { + read, err := vd.delegate.ListNamespaces(ctx, revision) if err != nil { return read, err } @@ -207,7 +214,7 @@ func (vd validatingTupleQuery) WithObjectRelation(namespace string, relation str return validatingTupleQuery{ wrapped: nil, wrappedReverse: vd.wrappedReverse, - foundErr: fmt.Errorf("Empty namespace given to WithObjectRelation"), + foundErr: fmt.Errorf("empty namespace given to WithObjectRelation"), } } @@ -215,7 +222,7 @@ func (vd validatingTupleQuery) WithObjectRelation(namespace string, relation str return validatingTupleQuery{ wrapped: nil, wrappedReverse: vd.wrappedReverse, - foundErr: fmt.Errorf("Empty relation given to WithObjectRelation"), + foundErr: fmt.Errorf("empty relation given to WithObjectRelation"), } } diff --git a/pkg/cmd/serve/serve.go b/pkg/cmd/serve/serve.go index d670540c83..f5442212be 100644 --- a/pkg/cmd/serve/serve.go +++ b/pkg/cmd/serve/serve.go @@ -105,7 +105,12 @@ func serveRun(cmd *cobra.Command, args []string, datastoreOpts *cmdutil.Datastor bootstrapFilePaths := cobrautil.MustGetStringSlice(cmd, "datastore-bootstrap-files") if len(bootstrapFilePaths) > 0 { bootstrapOverwrite := cobrautil.MustGetBool(cmd, "datastore-bootstrap-overwrite") - nsDefs, err := ds.ListNamespaces(context.Background()) + revision, err := ds.SyncRevision(context.Background()) + if err != nil { + log.Fatal().Err(err).Msg("unable to determine datastore state before applying bootstrap data") + } + + nsDefs, err := ds.ListNamespaces(context.Background(), revision) if err != nil { log.Fatal().Err(err).Msg("unable to determine datastore state before applying bootstrap data") }