Skip to content

Commit

Permalink
disable readonly mode for namespace read
Browse files Browse the repository at this point in the history
crdb can (seemingly incorrectly) throw an error when querying at a time
that we've gotten from a previous write if we read with readonly mode
  • Loading branch information
ecordell committed Jan 3, 2022
1 parent b2736dd commit 76e036f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 28 deletions.
47 changes: 20 additions & 27 deletions e2e/newenemy/newenemy_test.go
Expand Up @@ -145,56 +145,51 @@ func TestNoNewEnemy(t *testing.T) {
// seems to make the test converge faster
schemaData := generateSchemaData(4000, 500)
fillSchema(t, schemaTpl, schemaData, vulnerableSpiceDb[1].Client().V1Alpha1().Schema())

t.Log("determining a prefix with a leader on the slow node")
slowNodeId, err := crdb[1].NodeID(testCtx)
require.NoError(t, err)
slowPrefix := prefixForNode(ctx, crdb[1].Conn(), schemaData, slowNodeId)
t.Logf("using prefix %s for slow node %d", slowPrefix, slowNodeId)

t.Log("filling with data to span multiple ranges")
fill(t, vulnerableSpiceDb[0].Client().V0().ACL(), slowPrefix, 4000, 1000)

t.Log("modifying time")
require.NoError(t, crdb.TimeDelay(ctx, e2e.MustFile(ctx, t, "timeattack-1.log"), 1, -150*time.Millisecond))

t.Run("protected from data newenemy", func(t *testing.T) {
t.Run("protected from schema newenemy", func(t *testing.T) {
vulnerableFn := func(t *testing.T) int {
protected := true
var attempts int
for protected {
// cap attempts at 100, retry with a different prefix
protected, attempts = checkDataNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, vulnerableSpiceDb, 100)
protected, attempts = checkSchemaNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, vulnerableSpiceDb, 100)
}
require.False(t, protected)
t.Logf("determined spicedb vulnerable in %d attempts", attempts+1)
return attempts
}
protectedFn := func(t *testing.T, count int) {
t.Logf("check spicedb is protected after %d attempts", count)
protected, _ := checkDataNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, protectedSpiceDb, count)
protected, attempts := checkSchemaNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, protectedSpiceDb, count)
require.Equal(t, attempts, count)
require.True(t, protected, "protection is enabled, but newenemy detected")
t.Logf("spicedb is protected after %d attempts", count)
}
statTest(t, 5, vulnerableFn, protectedFn)
})

t.Run("protected from schema newenemy", func(t *testing.T) {
t.Run("protected from data newenemy", func(t *testing.T) {
vulnerableFn := func(t *testing.T) int {
protected := true
var attempts int
for protected {
// cap attempts at 1000, retry with a different prefix
protected, attempts = checkSchemaNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, vulnerableSpiceDb, 1000)
// cap attempts at 100, retry with a different prefix
protected, attempts = checkDataNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, vulnerableSpiceDb, 100)
}
require.False(t, protected)
t.Logf("determined spicedb vulnerable in %d attempts", attempts+1)
return attempts
}
protectedFn := func(t *testing.T, count int) {
t.Logf("check spicedb is protected after %d attempts", count)
protected, _ := checkSchemaNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, protectedSpiceDb, count)
protected, attempts := checkDataNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, protectedSpiceDb, count)
require.Equal(t, attempts, count)
require.True(t, protected, "protection is enabled, but newenemy detected")
t.Logf("spicedb is protected after %d attempts", count)
}
statTest(t, 5, vulnerableFn, protectedFn)
})
Expand Down Expand Up @@ -259,6 +254,8 @@ func iterationsForHighConfidence(samples []int) (iterations int) {
// ensures overlapping transactions are linearizable.
func checkDataNoNewEnemy(ctx context.Context, t testing.TB, schemaData []SchemaData, slowNodeId int, crdb cockroach.Cluster, spicedb spice.Cluster, maxAttempts int) (bool, int) {
prefix := prefixForNode(ctx, crdb[1].Conn(), schemaData, slowNodeId)
t.Log("filling with data to span multiple ranges for prefix ", prefix)
fill(t, spicedb[0].Client().V0().ACL(), prefix, 4000, 1000)

for attempts := 0; attempts < maxAttempts; attempts++ {
direct, exclude := generateTuple(prefix, objIdGenerator)
Expand Down Expand Up @@ -305,11 +302,8 @@ func checkDataNoNewEnemy(ctx context.Context, t testing.TB, schemaData []SchemaD

if ns1Leader != slowNodeId || ns2Leader != slowNodeId {
t.Log("namespace leader changed, pick new prefix and fill")
prefix = prefixForNode(ctx, crdb[1].Conn(), schemaData, slowNodeId)
// need to fill new prefix
t.Log("filling with data to span multiple ranges for prefix ", prefix)
fill(t, spicedb[0].Client().V0().ACL(), prefix, 4000, 1000)
continue
// returning true will re-run with a new prefix
return true, attempts
}

if z1.Sub(z2).IsPositive() {
Expand Down Expand Up @@ -382,6 +376,11 @@ func checkSchemaNoNewEnemy(ctx context.Context, t testing.TB, schemaData []Schem
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)
// write the "excludes" tuple
r2, err := spicedb[0].Client().V0().ACL().Write(testCtx, &v0.WriteRequest{
Updates: []*v0.RelationTupleUpdate{exclude},
})
require.NoError(t, err)

// write the "exclude" schema
b.Reset()
Expand All @@ -394,13 +393,7 @@ func checkSchemaNoNewEnemy(ctx context.Context, t testing.TB, schemaData []Schem
continue
}

// write the "excludes" tuple
r2, err := spicedb[0].Client().V0().ACL().Write(testCtx, &v0.WriteRequest{
Updates: []*v0.RelationTupleUpdate{exclude},
})
require.NoError(t, err)

canHas, err := spicedb[1].Client().V0().ACL().Check(context.Background(), &v0.CheckRequest{
canHas, err := spicedb[0].Client().V0().ACL().Check(context.Background(), &v0.CheckRequest{
TestUserset: &v0.ObjectAndRelation{
Namespace: direct.Tuple.ObjectAndRelation.Namespace,
ObjectId: direct.Tuple.ObjectAndRelation.ObjectId,
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/crdb/namespace.go
Expand Up @@ -78,7 +78,7 @@ func (cds *crdbDatastore) ReadNamespace(
) (*v0.NamespaceDefinition, datastore.Revision, error) {
ctx = datastore.SeparateContextWithTracing(ctx)

tx, err := cds.conn.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})
tx, err := cds.conn.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
}
Expand Down

0 comments on commit 76e036f

Please sign in to comment.