Skip to content

Commit

Permalink
user revisions for loading namespaces in handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
jakedt committed Dec 9, 2021
1 parent 5f98335 commit eb2e715
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 85 deletions.
9 changes: 8 additions & 1 deletion internal/dashboard/dashboard.go
Expand Up @@ -131,7 +131,14 @@ func NewHandler(grpcAddr string, grpcTLSEnabled bool, datastoreEngine string, ds
userFound := false
resourceFound := false

nsDefs, err := ds.ListNamespaces(r.Context())
syncRevision, err := ds.SyncRevision(r.Context())
if err != nil {
log.Ctx(r.Context()).Error().Err(err).Msg("Got error when computing datastore revision")
fmt.Fprintf(w, "Internal Error")
return
}

nsDefs, err := ds.ListNamespaces(r.Context(), syncRevision)
if err != nil {
log.Ctx(r.Context()).Error().AnErr("datastore-error", err).Msg("Got error when trying to load namespaces")
fmt.Fprintf(w, "Internal Error")
Expand Down
2 changes: 1 addition & 1 deletion internal/services/consistency_test.go
Expand Up @@ -74,7 +74,7 @@ func TestConsistency(t *testing.T) {

// Validate the type system for each namespace.
for _, nsDef := range fullyResolved.NamespaceDefinitions {
_, ts, _, err := ns.ReadNamespaceAndTypes(context.Background(), nsDef.Name)
_, ts, err := ns.ReadNamespaceAndTypes(context.Background(), nsDef.Name, revision)
lrequire.NoError(err)

err = ts.Validate(context.Background())
Expand Down
5 changes: 3 additions & 2 deletions internal/services/shared/schema.go
Expand Up @@ -5,6 +5,7 @@ import (
"errors"

v0 "github.com/authzed/authzed-go/proto/authzed/api/v0"
"github.com/shopspring/decimal"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -42,12 +43,12 @@ func EnsureNoRelationshipsExist(ctx context.Context, ds datastore.Datastore, nam

// SanityCheckExistingRelationships ensures that a namespace definition being written does not result
// in relationships without associated defined schema object definitions and relations.
func SanityCheckExistingRelationships(ctx context.Context, ds datastore.Datastore, nsdef *v0.NamespaceDefinition) error {
func SanityCheckExistingRelationships(ctx context.Context, ds datastore.Datastore, nsdef *v0.NamespaceDefinition, revision decimal.Decimal) error {
// Ensure that the updated namespace does not break the existing tuple data.
//
// NOTE: We use the datastore here to read the namespace, rather than the namespace manager,
// to ensure there is no caching being used.
existing, _, err := ds.ReadNamespace(ctx, nsdef.Name)
existing, _, err := ds.ReadNamespace(ctx, nsdef.Name, revision)
if err != nil && !errors.As(err, &datastore.ErrNamespaceNotFound{}) {
return err
}
Expand Down
59 changes: 33 additions & 26 deletions internal/services/v0/acl.go
Expand Up @@ -66,8 +66,13 @@ func NewACLServer(ds datastore.Datastore, nsm namespace.Manager, dispatch dispat
}

func (as *aclServer) Write(ctx context.Context, req *v0.WriteRequest) (*v0.WriteResponse, error) {
atRevision, err := as.ds.SyncRevision(ctx)
if err != nil {
return nil, rewriteACLError(ctx, err)
}

for _, mutation := range req.Updates {
err := validateTupleWrite(ctx, mutation.Tuple, as.nsm)
err := validateTupleWrite(ctx, mutation.Tuple, as.nsm, atRevision)
if err != nil {
return nil, rewriteACLError(ctx, err)
}
Expand Down Expand Up @@ -97,6 +102,24 @@ func (as *aclServer) Write(ctx context.Context, req *v0.WriteRequest) (*v0.Write
}

func (as *aclServer) Read(ctx context.Context, req *v0.ReadRequest) (*v0.ReadResponse, error) {
var atRevision decimal.Decimal
if req.AtRevision != nil {
// Read should attempt to use the exact revision requested
decoded, err := zookie.DecodeRevision(req.AtRevision)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "bad request revision: %s", err)
}

atRevision = decoded
} else {
// No revision provided, we'll pick one
var err error
atRevision, err = as.ds.Revision(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "unable to pick request revision: %s", err)
}
}

for _, tuplesetFilter := range req.Tuplesets {
checkedRelation := false
for _, filter := range tuplesetFilter.Filters {
Expand All @@ -120,6 +143,7 @@ func (as *aclServer) Read(ctx context.Context, req *v0.ReadRequest) (*v0.ReadRes
tuplesetFilter.Namespace,
tuplesetFilter.Relation,
false, // Disallow ellipsis
atRevision,
); err != nil {
return nil, rewriteACLError(ctx, err)
}
Expand All @@ -146,30 +170,13 @@ func (as *aclServer) Read(ctx context.Context, req *v0.ReadRequest) (*v0.ReadRes
tuplesetFilter.Namespace,
datastore.Ellipsis,
true, // Allow ellipsis
atRevision,
); err != nil {
return nil, rewriteACLError(ctx, err)
}
}
}

var atRevision decimal.Decimal
if req.AtRevision != nil {
// Read should attempt to use the exact revision requested
decoded, err := zookie.DecodeRevision(req.AtRevision)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "bad request revision: %s", err)
}

atRevision = decoded
} else {
// No revision provided, we'll pick one
var err error
atRevision, err = as.ds.Revision(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "unable to pick request revision: %s", err)
}
}

err := as.ds.CheckRevision(ctx, atRevision)
if err != nil {
return nil, rewriteACLError(ctx, err)
Expand Down Expand Up @@ -248,12 +255,12 @@ func (as *aclServer) commonCheck(
start *v0.ObjectAndRelation,
goal *v0.ObjectAndRelation,
) (*v0.CheckResponse, error) {
err := as.nsm.CheckNamespaceAndRelation(ctx, start.Namespace, start.Relation, false)
err := as.nsm.CheckNamespaceAndRelation(ctx, start.Namespace, start.Relation, false, atRevision)
if err != nil {
return nil, rewriteACLError(ctx, err)
}

err = as.nsm.CheckNamespaceAndRelation(ctx, goal.Namespace, goal.Relation, true)
err = as.nsm.CheckNamespaceAndRelation(ctx, goal.Namespace, goal.Relation, true, atRevision)
if err != nil {
return nil, rewriteACLError(ctx, err)
}
Expand Down Expand Up @@ -290,12 +297,12 @@ func (as *aclServer) commonCheck(
}

func (as *aclServer) Expand(ctx context.Context, req *v0.ExpandRequest) (*v0.ExpandResponse, error) {
err := as.nsm.CheckNamespaceAndRelation(ctx, req.Userset.Namespace, req.Userset.Relation, false)
atRevision, err := as.pickBestRevision(ctx, req.AtRevision)
if err != nil {
return nil, rewriteACLError(ctx, err)
}

atRevision, err := as.pickBestRevision(ctx, req.AtRevision)
err = as.nsm.CheckNamespaceAndRelation(ctx, req.Userset.Namespace, req.Userset.Relation, false, atRevision)
if err != nil {
return nil, rewriteACLError(ctx, err)
}
Expand All @@ -320,17 +327,17 @@ func (as *aclServer) Expand(ctx context.Context, req *v0.ExpandRequest) (*v0.Exp
}

func (as *aclServer) Lookup(ctx context.Context, req *v0.LookupRequest) (*v0.LookupResponse, error) {
err := as.nsm.CheckNamespaceAndRelation(ctx, req.User.Namespace, req.User.Relation, true)
atRevision, err := as.pickBestRevision(ctx, req.AtRevision)
if err != nil {
return nil, rewriteACLError(ctx, err)
}

err = as.nsm.CheckNamespaceAndRelation(ctx, req.ObjectRelation.Namespace, req.ObjectRelation.Relation, false)
err = as.nsm.CheckNamespaceAndRelation(ctx, req.User.Namespace, req.User.Relation, true, atRevision)
if err != nil {
return nil, rewriteACLError(ctx, err)
}

atRevision, err := as.pickBestRevision(ctx, req.AtRevision)
err = as.nsm.CheckNamespaceAndRelation(ctx, req.ObjectRelation.Namespace, req.ObjectRelation.Relation, false, atRevision)
if err != nil {
return nil, rewriteACLError(ctx, err)
}
Expand Down
29 changes: 19 additions & 10 deletions internal/services/v0/devcontext.go
Expand Up @@ -72,7 +72,9 @@ func newDevContext(ctx context.Context, requestContext *v0.RequestContext, ds da
return &DevContext{Ctx: ctx, NamespaceManager: nsm, RequestErrors: []*v0.DeveloperError{devError}}, false, nil
}

requestErrors, err := loadNamespaces(ctx, namespaces, nsm, ds)
var currentRevision decimal.Decimal
var requestErrors []*v0.DeveloperError
requestErrors, currentRevision, err = loadNamespaces(ctx, namespaces, nsm, ds)
if err != nil {
return &DevContext{Ctx: ctx, NamespaceManager: nsm}, false, err
}
Expand All @@ -82,7 +84,7 @@ func newDevContext(ctx context.Context, requestContext *v0.RequestContext, ds da
}

if len(requestContext.LegacyNsConfigs) > 0 {
requestErrors, err := loadNamespaces(ctx, requestContext.LegacyNsConfigs, nsm, ds)
requestErrors, currentRevision, err = loadNamespaces(ctx, requestContext.LegacyNsConfigs, nsm, ds)
if err != nil {
return &DevContext{Ctx: ctx, NamespaceManager: nsm}, false, err
}
Expand All @@ -92,7 +94,7 @@ func newDevContext(ctx context.Context, requestContext *v0.RequestContext, ds da
}
}

revision, requestErrors, err := loadTuples(ctx, requestContext.Relationships, nsm, ds)
revision, requestErrors, err := loadTuples(ctx, requestContext.Relationships, nsm, ds, currentRevision)
if err != nil {
return &DevContext{Ctx: ctx, NamespaceManager: nsm, Namespaces: namespaces}, false, err
}
Expand Down Expand Up @@ -162,7 +164,7 @@ func compile(schema string) ([]*v0.NamespaceDefinition, *v0.DeveloperError, erro
return namespaces, nil, nil
}

func loadTuples(ctx context.Context, tuples []*v0.RelationTuple, nsm namespace.Manager, ds datastore.Datastore) (decimal.Decimal, []*v0.DeveloperError, error) {
func loadTuples(ctx context.Context, tuples []*v0.RelationTuple, nsm namespace.Manager, ds datastore.Datastore, revision decimal.Decimal) (decimal.Decimal, []*v0.DeveloperError, error) {
var errors []*v0.DeveloperError
var updates []*v1.RelationshipUpdate
for _, tpl := range tuples {
Expand All @@ -177,7 +179,7 @@ func loadTuples(ctx context.Context, tuples []*v0.RelationTuple, nsm namespace.M
continue
}

err := validateTupleWrite(ctx, tpl, nsm)
err := validateTupleWrite(ctx, tpl, nsm, revision)
if err != nil {
verrs, wireErr := rewriteGraphError(ctx, v0.DeveloperError_RELATIONSHIP, 0, 0, tuple.String(tpl), err)
if wireErr == nil {
Expand All @@ -198,19 +200,26 @@ func loadTuples(ctx context.Context, tuples []*v0.RelationTuple, nsm namespace.M
return revision, errors, err
}

func loadNamespaces(ctx context.Context, namespaces []*v0.NamespaceDefinition, nsm namespace.Manager, ds datastore.Datastore) ([]*v0.DeveloperError, error) {
func loadNamespaces(
ctx context.Context,
namespaces []*v0.NamespaceDefinition,
nsm namespace.Manager,
ds datastore.Datastore,
) ([]*v0.DeveloperError, decimal.Decimal, error) {
var errors []*v0.DeveloperError
var lastRevision decimal.Decimal
for _, nsDef := range namespaces {
ts, terr := namespace.BuildNamespaceTypeSystemForDefs(nsDef, namespaces)
if terr != nil {
return errors, terr
return errors, lastRevision, terr
}

tverr := ts.Validate(ctx)
if tverr == nil {
_, err := ds.WriteNamespace(ctx, nsDef)
var err error
lastRevision, err = ds.WriteNamespace(ctx, nsDef)
if err != nil {
return errors, err
return errors, lastRevision, err
}
continue
}
Expand All @@ -223,5 +232,5 @@ func loadNamespaces(ctx context.Context, namespaces []*v0.NamespaceDefinition, n
})
}

return errors, nil
return errors, lastRevision, nil
}
20 changes: 15 additions & 5 deletions internal/services/v0/namespace.go
Expand Up @@ -45,9 +45,14 @@ func (nss *nsServer) WriteConfig(ctx context.Context, req *v0.WriteConfigRequest
return nil, rewriteNamespaceError(ctx, err)
}

readRevision, err := nss.ds.SyncRevision(ctx)
if err != nil {
return nil, rewriteNamespaceError(ctx, err)
}

for _, config := range req.Configs {
// Validate the type system for the updated namespace.
ts, terr := namespace.BuildNamespaceTypeSystemWithFallback(config, nsm, req.Configs)
ts, terr := namespace.BuildNamespaceTypeSystemWithFallback(config, nsm, req.Configs, readRevision)
if terr != nil {
return nil, rewriteNamespaceError(ctx, terr)
}
Expand All @@ -61,7 +66,7 @@ func (nss *nsServer) WriteConfig(ctx context.Context, req *v0.WriteConfigRequest
//
// NOTE: We use the datastore here to read the namespace, rather than the namespace manager,
// to ensure there is no caching being used.
existing, _, err := nss.ds.ReadNamespace(ctx, config.Name)
existing, _, err := nss.ds.ReadNamespace(ctx, config.Name, readRevision)
if err != nil && !errors.As(err, &datastore.ErrNamespaceNotFound{}) {
return nil, rewriteNamespaceError(ctx, err)
}
Expand Down Expand Up @@ -128,15 +133,20 @@ func (nss *nsServer) WriteConfig(ctx context.Context, req *v0.WriteConfigRequest
}

func (nss *nsServer) ReadConfig(ctx context.Context, req *v0.ReadConfigRequest) (*v0.ReadConfigResponse, error) {
found, version, err := nss.ds.ReadNamespace(ctx, req.Namespace)
readRevision, err := nss.ds.SyncRevision(ctx)
if err != nil {
return nil, rewriteNamespaceError(ctx, err)
}

found, _, err := nss.ds.ReadNamespace(ctx, req.Namespace, readRevision)
if err != nil {
return nil, rewriteNamespaceError(ctx, err)
}

return &v0.ReadConfigResponse{
Namespace: req.Namespace,
Config: found,
Revision: zookie.NewFromRevision(version),
Revision: zookie.NewFromRevision(readRevision),
}, nil
}

Expand All @@ -149,7 +159,7 @@ func (nss *nsServer) DeleteConfigs(ctx context.Context, req *v0.DeleteConfigsReq
// Ensure that all the specified namespaces can be deleted.
for _, nsName := range req.Namespaces {
// Ensure the namespace exists.
_, _, err := nss.ds.ReadNamespace(ctx, nsName)
_, _, err := nss.ds.ReadNamespace(ctx, nsName, syncRevision)
if err != nil {
return nil, rewriteNamespaceError(ctx, err)
}
Expand Down
12 changes: 10 additions & 2 deletions internal/services/v0/validation.go
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

v0 "github.com/authzed/authzed-go/proto/authzed/api/v0"
"github.com/shopspring/decimal"

"github.com/authzed/spicedb/internal/namespace"
)
Expand All @@ -15,12 +16,18 @@ type invalidRelationError struct {
onr *v0.ObjectAndRelation
}

func validateTupleWrite(ctx context.Context, tpl *v0.RelationTuple, nsm namespace.Manager) error {
func validateTupleWrite(
ctx context.Context,
tpl *v0.RelationTuple,
nsm namespace.Manager,
revision decimal.Decimal,
) error {
if err := nsm.CheckNamespaceAndRelation(
ctx,
tpl.ObjectAndRelation.Namespace,
tpl.ObjectAndRelation.Relation,
false, // Disallow ellipsis
revision,
); err != nil {
return err
}
Expand All @@ -30,11 +37,12 @@ func validateTupleWrite(ctx context.Context, tpl *v0.RelationTuple, nsm namespac
tpl.User.GetUserset().Namespace,
tpl.User.GetUserset().Relation,
true, // Allow Ellipsis
revision,
); err != nil {
return err
}

_, ts, _, err := nsm.ReadNamespaceAndTypes(ctx, tpl.ObjectAndRelation.Namespace)
_, ts, err := nsm.ReadNamespaceAndTypes(ctx, tpl.ObjectAndRelation.Namespace, revision)
if err != nil {
return err
}
Expand Down
22 changes: 11 additions & 11 deletions internal/services/v0/watch.go
Expand Up @@ -35,17 +35,6 @@ func NewWatchServer(ds datastore.Datastore, nsm namespace.Manager) v0.WatchServi
}

func (ws *watchServer) Watch(req *v0.WatchRequest, stream v0.WatchService_WatchServer) error {
namespaceMap := make(map[string]struct{})
for _, ns := range req.Namespaces {
err := ws.nsm.CheckNamespaceAndRelation(stream.Context(), ns, datastore.Ellipsis, true)
if err != nil {
return status.Errorf(codes.FailedPrecondition, "invalid namespace: %s", err)
}

namespaceMap[ns] = struct{}{}
}
filter := namespaceFilter{namespaces: namespaceMap}

var afterRevision decimal.Decimal
if req.StartRevision != nil && req.StartRevision.Token != "" {
decodedRevision, err := zookie.DecodeRevision(req.StartRevision)
Expand All @@ -62,6 +51,17 @@ func (ws *watchServer) Watch(req *v0.WatchRequest, stream v0.WatchService_WatchS
}
}

namespaceMap := make(map[string]struct{})
for _, ns := range req.Namespaces {
err := ws.nsm.CheckNamespaceAndRelation(stream.Context(), ns, datastore.Ellipsis, true, afterRevision)
if err != nil {
return status.Errorf(codes.FailedPrecondition, "invalid namespace: %s", err)
}

namespaceMap[ns] = struct{}{}
}
filter := namespaceFilter{namespaces: namespaceMap}

updates, errchan := ws.ds.Watch(stream.Context(), afterRevision)
for {
select {
Expand Down

0 comments on commit eb2e715

Please sign in to comment.