Skip to content

Commit

Permalink
disable readonly mode for namespace read
Browse files Browse the repository at this point in the history
crdb can (seemingly incorrectly) throw an error when querying at a time
that we've gotten from a previous write if we read with readonly mode
  • Loading branch information
ecordell committed Jan 3, 2022
1 parent b2736dd commit 4f69871
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 37 deletions.
51 changes: 22 additions & 29 deletions e2e/newenemy/newenemy_test.go
Expand Up @@ -145,56 +145,51 @@ func TestNoNewEnemy(t *testing.T) {
// seems to make the test converge faster
schemaData := generateSchemaData(4000, 500)
fillSchema(t, schemaTpl, schemaData, vulnerableSpiceDb[1].Client().V1Alpha1().Schema())

t.Log("determining a prefix with a leader on the slow node")
slowNodeId, err := crdb[1].NodeID(testCtx)
require.NoError(t, err)
slowPrefix := prefixForNode(ctx, crdb[1].Conn(), schemaData, slowNodeId)
t.Logf("using prefix %s for slow node %d", slowPrefix, slowNodeId)

t.Log("filling with data to span multiple ranges")
fill(t, vulnerableSpiceDb[0].Client().V0().ACL(), slowPrefix, 4000, 1000)

t.Log("modifying time")
require.NoError(t, crdb.TimeDelay(ctx, e2e.MustFile(ctx, t, "timeattack-1.log"), 1, -150*time.Millisecond))

t.Run("protected from data newenemy", func(t *testing.T) {
t.Run("protected from schema newenemy", func(t *testing.T) {
vulnerableFn := func(t *testing.T) int {
protected := true
var attempts int
for protected {
// cap attempts at 100, retry with a different prefix
protected, attempts = checkDataNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, vulnerableSpiceDb, 100)
protected, attempts = checkSchemaNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, vulnerableSpiceDb, 100)
}
require.False(t, protected)
t.Logf("determined spicedb vulnerable in %d attempts", attempts+1)
return attempts
}
protectedFn := func(t *testing.T, count int) {
t.Logf("check spicedb is protected after %d attempts", count)
protected, _ := checkDataNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, protectedSpiceDb, count)
t.Logf("check spicedb is protected after %d attempts", count+1)
protected, attempts := checkSchemaNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, protectedSpiceDb, count)
require.Equal(t, attempts, count)
require.True(t, protected, "protection is enabled, but newenemy detected")
t.Logf("spicedb is protected after %d attempts", count)
}
statTest(t, 5, vulnerableFn, protectedFn)
})

t.Run("protected from schema newenemy", func(t *testing.T) {
t.Run("protected from data newenemy", func(t *testing.T) {
vulnerableFn := func(t *testing.T) int {
protected := true
var attempts int
for protected {
// cap attempts at 1000, retry with a different prefix
protected, attempts = checkSchemaNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, vulnerableSpiceDb, 1000)
// cap attempts at 100, retry with a different prefix
protected, attempts = checkDataNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, vulnerableSpiceDb, 100)
}
require.False(t, protected)
t.Logf("determined spicedb vulnerable in %d attempts", attempts+1)
return attempts
}
protectedFn := func(t *testing.T, count int) {
t.Logf("check spicedb is protected after %d attempts", count)
protected, _ := checkSchemaNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, protectedSpiceDb, count)
t.Logf("check spicedb is protected after %d attempts", count+1)
protected, attempts := checkDataNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, protectedSpiceDb, count)
require.Equal(t, attempts, count)
require.True(t, protected, "protection is enabled, but newenemy detected")
t.Logf("spicedb is protected after %d attempts", count)
}
statTest(t, 5, vulnerableFn, protectedFn)
})
Expand Down Expand Up @@ -259,6 +254,8 @@ func iterationsForHighConfidence(samples []int) (iterations int) {
// ensures overlapping transactions are linearizable.
func checkDataNoNewEnemy(ctx context.Context, t testing.TB, schemaData []SchemaData, slowNodeId int, crdb cockroach.Cluster, spicedb spice.Cluster, maxAttempts int) (bool, int) {
prefix := prefixForNode(ctx, crdb[1].Conn(), schemaData, slowNodeId)
t.Log("filling with data to span multiple ranges for prefix ", prefix)
fill(t, spicedb[0].Client().V0().ACL(), prefix, 4000, 1000)

for attempts := 0; attempts < maxAttempts; attempts++ {
direct, exclude := generateTuple(prefix, objIdGenerator)
Expand Down Expand Up @@ -305,11 +302,8 @@ func checkDataNoNewEnemy(ctx context.Context, t testing.TB, schemaData []SchemaD

if ns1Leader != slowNodeId || ns2Leader != slowNodeId {
t.Log("namespace leader changed, pick new prefix and fill")
prefix = prefixForNode(ctx, crdb[1].Conn(), schemaData, slowNodeId)
// need to fill new prefix
t.Log("filling with data to span multiple ranges for prefix ", prefix)
fill(t, spicedb[0].Client().V0().ACL(), prefix, 4000, 1000)
continue
// returning true will re-run with a new prefix
return true, attempts
}

if z1.Sub(z2).IsPositive() {
Expand Down Expand Up @@ -382,6 +376,11 @@ func checkSchemaNoNewEnemy(ctx context.Context, t testing.TB, schemaData []Schem
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)
// write the "excludes" tuple
r2, err := spicedb[0].Client().V0().ACL().Write(testCtx, &v0.WriteRequest{
Updates: []*v0.RelationTupleUpdate{exclude},
})
require.NoError(t, err)

// write the "exclude" schema
b.Reset()
Expand All @@ -394,13 +393,7 @@ func checkSchemaNoNewEnemy(ctx context.Context, t testing.TB, schemaData []Schem
continue
}

// write the "excludes" tuple
r2, err := spicedb[0].Client().V0().ACL().Write(testCtx, &v0.WriteRequest{
Updates: []*v0.RelationTupleUpdate{exclude},
})
require.NoError(t, err)

canHas, err := spicedb[1].Client().V0().ACL().Check(context.Background(), &v0.CheckRequest{
canHas, err := spicedb[0].Client().V0().ACL().Check(context.Background(), &v0.CheckRequest{
TestUserset: &v0.ObjectAndRelation{
Namespace: direct.Tuple.ObjectAndRelation.Namespace,
ObjectId: direct.Tuple.ObjectAndRelation.ObjectId,
Expand Down
9 changes: 1 addition & 8 deletions internal/datastore/crdb/namespace.go
Expand Up @@ -44,8 +44,6 @@ var (
func (cds *crdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.NamespaceDefinition) (datastore.Revision, error) {
var hlcNow decimal.Decimal
if err := cds.execute(ctx, cds.conn, pgx.TxOptions{}, func(tx pgx.Tx) error {
keySet := newKeySet()
cds.AddOverlapKey(keySet, newConfig.Name)
serialized, err := proto.Marshal(newConfig)
if err != nil {
return err
Expand All @@ -56,12 +54,7 @@ func (cds *crdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Name
if err != nil {
return err
}
for k := range keySet {
if _, err := tx.Exec(ctx, queryTouchTransaction, k); err != nil {
return err
}
}
return tx.QueryRow(
return cds.conn.QueryRow(
datastore.SeparateContextWithTracing(ctx), writeSQL, writeArgs...,
).Scan(&hlcNow)
}); err != nil {
Expand Down

0 comments on commit 4f69871

Please sign in to comment.