diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 4939dac5b9..bc84dfe014 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -83,8 +83,8 @@ jobs: key: "cockroach-v21.1.7-chaosd-v1.0.2-2" - name: "Install cockroachdb and chaosd" if: "steps.cache-binaries.outputs.cache-hit != 'true'" + working-directory: "e2e/newenemy" run: | - pushd e2e/newenemy curl https://binaries.cockroachdb.com/cockroach-v21.1.7.linux-amd64.tgz | tar -xz && mv cockroach-v21.1.7.linux-amd64/cockroach ./cockroach curl -fsSL https://mirrors.chaos-mesh.org/chaosd-v1.0.2-linux-amd64.tar.gz | tar -xz && mv chaosd-v1.0.2-linux-amd64/chaosd ./chaosd @@ -95,7 +95,6 @@ jobs: CGO_ENABLED=1 go build ./cmd/watchmaker/ popd mv ./chaos-mesh/watchmaker ./watchmaker - popd - uses: "actions/cache@v2" with: path: | @@ -105,10 +104,9 @@ jobs: restore-keys: | ${{ runner.os }}-go- - name: "Build SpiceDB" - working-directory: "e2e/newenemy" run: | - go get -d github.com/authzed/spicedb/cmd/spicedb/... - go build github.com/authzed/spicedb/cmd/spicedb/... + go get -d ./... + go build -o ./e2e/newenemy/spicedb ./cmd/spicedb/... - name: "Run e2e" working-directory: "e2e/newenemy" run: | diff --git a/e2e/newenemy/newenemy_test.go b/e2e/newenemy/newenemy_test.go index 958072f227..a20a9c5540 100644 --- a/e2e/newenemy/newenemy_test.go +++ b/e2e/newenemy/newenemy_test.go @@ -15,8 +15,8 @@ import ( "time" v0 "github.com/authzed/authzed-go/proto/authzed/api/v0" + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" "github.com/authzed/authzed-go/proto/authzed/api/v1alpha1" - "github.com/authzed/spicedb/pkg/zookie" "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" "github.com/stretchr/testify/require" @@ -25,6 +25,8 @@ import ( "github.com/authzed/spicedb/e2e/cockroach" "github.com/authzed/spicedb/e2e/generator" "github.com/authzed/spicedb/e2e/spice" + "github.com/authzed/spicedb/pkg/zedtoken" + "github.com/authzed/spicedb/pkg/zookie" ) type SchemaData struct { @@ -42,6 +44,17 @@ definition {{.}}/resource { {{ end }} ` +const schemaAllowAllText = ` +{{ range .Prefixes }} +definition {{.}}/user {} +definition {{.}}/resource { + relation direct: {{.}}/user + relation excluded: {{.}}/user + permission allowed = direct +} +{{ end }} +` + const ( objIDRegex = "[a-zA-Z0-9_][a-zA-Z0-9/_-]{0,127}" namespacePrefixRegex = "[a-z][a-z0-9_]{2,62}[a-z0-9]" @@ -50,8 +63,12 @@ const ( var ( maxIterations = flag.Int("max-iterations", 1000, "iteration cap for statistic-based tests (0 for no limit)") - schemaTpl = template.Must(template.New("schema").Parse(schemaText)) - testCtx context.Context + schemaTpl = template.Must(template.New("schema").Parse(schemaText)) + schemaAllowTpl = template.Must(template.New("schema_allow").Parse(schemaAllowAllText)) + objIdGenerator = generator.NewUniqueGenerator(objIDRegex) + prefixGenerator = generator.NewUniqueGenerator(namespacePrefixRegex) + + testCtx context.Context ) func TestMain(m *testing.M) { @@ -96,7 +113,6 @@ func initializeTestCRDBCluster(ctx context.Context, t testing.TB) cockroach.Clus } func TestNoNewEnemy(t *testing.T) { - require := require.New(t) rand.Seed(time.Now().UnixNano()) ctx, cancel := context.WithCancel(testCtx) defer cancel() @@ -106,9 +122,9 @@ func TestNoNewEnemy(t *testing.T) { t.Log("starting vulnerable spicedb...") vulnerableSpiceDb := spice.NewClusterFromCockroachCluster(crdb, spice.WithDbName(dbName)) - require.NoError(vulnerableSpiceDb.Start(ctx, tlog, "vulnerable", + require.NoError(t, vulnerableSpiceDb.Start(ctx, tlog, "vulnerable", "--datastore-tx-overlap-strategy=insecure")) - require.NoError(vulnerableSpiceDb.Connect(ctx, tlog)) + require.NoError(t, vulnerableSpiceDb.Connect(ctx, tlog)) t.Log("start protected spicedb cluster") protectedSpiceDb := spice.NewClusterFromCockroachCluster(crdb, @@ -118,11 +134,11 @@ func TestNoNewEnemy(t *testing.T) { spice.WithMetricsPort(9100), spice.WithDashboardPort(8100), spice.WithDbName(dbName)) - require.NoError(protectedSpiceDb.Start(ctx, tlog, "protected")) - require.NoError(protectedSpiceDb.Connect(ctx, tlog)) + require.NoError(t, protectedSpiceDb.Start(ctx, tlog, "protected")) + require.NoError(t, protectedSpiceDb.Connect(ctx, tlog)) t.Log("configure small ranges, single replicas, short ttl") - require.NoError(crdb.SQL(ctx, tlog, tlog, + require.NoError(t, crdb.SQL(ctx, tlog, tlog, fmt.Sprintf(setSmallRanges, dbName), )) @@ -130,35 +146,116 @@ func TestNoNewEnemy(t *testing.T) { // 4000 is larger than we need to span all three nodes, but a higher number // seems to make the test converge faster schemaData := generateSchemaData(4000, 500) - require.NoError(fillSchema(t, schemaData, vulnerableSpiceDb[1].Client().V1Alpha1().Schema())) - - t.Log("determining a prefix with a leader on the slow node") + fillSchema(t, schemaTpl, schemaData, vulnerableSpiceDb[1].Client().V1Alpha1().Schema()) slowNodeId, err := crdb[1].NodeID(testCtx) - require.NoError(err) - slowPrefix := prefixForNode(ctx, crdb[1].Conn(), schemaData, slowNodeId) + require.NoError(t, err) + + t.Log("modifying time") + require.NoError(t, crdb.TimeDelay(ctx, e2e.MustFile(ctx, t, "timeattack-1.log"), 1, -150*time.Millisecond)) + + tests := []struct { + name string + vulnerableProbe probeFn + protectedProbe probeFn + vulnerableMax int + sampleSize int + }{ + { + name: "protected from schema newenemy", + vulnerableProbe: func(count int) (bool, int) { + return checkSchemaNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, vulnerableSpiceDb, count) + }, + protectedProbe: func(count int) (bool, int) { + return checkSchemaNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, protectedSpiceDb, count) + }, + vulnerableMax: 100, + sampleSize: 5, + }, + { + name: "protected from data newenemy", + vulnerableProbe: func(count int) (bool, int) { + return checkDataNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, vulnerableSpiceDb, count) + }, + protectedProbe: func(count int) (bool, int) { + return checkDataNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, protectedSpiceDb, count) + }, + vulnerableMax: 100, + sampleSize: 5, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + vulnerableFn, protectedFn := attemptFnsForProbeFns(100, tt.vulnerableProbe, tt.protectedProbe) + statTest(t, 5, vulnerableFn, protectedFn) + }) + } +} - t.Logf("using prefix %s for slow node %d", slowPrefix, slowNodeId) +// probeFn tests a condition a maximum of n times, returning whether the +// condition holds and how many times it was attempted. It is used as a building +// block for attemptFn and protectedAttemptFn. +type probeFn func(n int) (success bool, count int) + +// attemptFnsForProbeFns takes probeFns and turns them into "testFns" that are used +// by stat tests +func attemptFnsForProbeFns(vulnerableMax int, vulnerableProbe, protectedProbe probeFn) (vulnerableFn attemptFn, protectedFn protectedAttemptFn) { + vulnerableFn = func(t *testing.T) int { + protected := true + var attempts int + for protected { + // attempt vulnerableMax times before resetting + // this helps if the test gets stuck in a state with bad initial + // conditions, like a prefix that never lands on the right nodes + protected, attempts = vulnerableProbe(vulnerableMax) + } + require.False(t, protected) + t.Logf("determined spicedb vulnerable in %d attempts", attempts) + return attempts + } + protectedFn = func(t *testing.T, count int) { + t.Logf("check spicedb is protected after %d attempts", count) + protected := true + var attempts int + for protected { + protected, attempts = protectedProbe(count) + // if the number of attempts doesn't match the count, that means + // the test has requests a reset for some reason. + if attempts < count { + continue + } + require.True(t, protected, "protection is enabled, but newenemy detected") + require.Equal(t, count, attempts) + t.Logf("spicedb is protected after %d attempts", count) + return + } + } + return +} - t.Log("filling with data to span multiple ranges") - fill(t, vulnerableSpiceDb[0].Client().V0().ACL(), slowPrefix, 4000, 1000) +// attemptFn runs a check and returns how many iterations it took to fail +type attemptFn func(t *testing.T) int - t.Log("modifying time") - require.NoError(crdb.TimeDelay(ctx, e2e.MustFile(ctx, t, "timeattack.log"), 1, -200*time.Millisecond)) +// protectedAttemptFn runs a check and fails the current test if it fails within +// `count` iterations +type protectedAttemptFn func(t *testing.T, count int) - const sampleSize = 5 +// statTest takes a testFn that is expected to fail the test, returning a sample +// and a protectedTestFn that is expected to succeed even after a given number +// of runs. +func statTest(t *testing.T, sampleSize int, vulnerableFn attemptFn, protectedFn protectedAttemptFn) { samples := make([]int, sampleSize) - for i := 0; i < sampleSize; i++ { - t.Log(i, "check vulnerability with mitigations disabled") - checkCtx, checkCancel := context.WithTimeout(ctx, 30*time.Minute) - protected, attempts := checkNoNewEnemy(checkCtx, t, schemaData, slowNodeId, crdb, vulnerableSpiceDb, -1) - require.NotNil(protected, "unable to determine if spicedb displays newenemy when mitigations are disabled within the time limit") - require.False(*protected) - checkCancel() - t.Logf("%d - determined spicedb vulnerable in %d attempts", i, attempts) - samples[i] = attempts + t.Logf("collecting sample %d", i) + samples[i] = vulnerableFn(t) } + protectedFn(t, iterationsForHighConfidence(samples)) +} +// iterationsForHighConfidence returns how many iterations we need to get +// > 3sigma from the mean of the samples. +// Caps at maxIterations to control test runtime. Set maxIterations to 0 for no +// cap. +func iterationsForHighConfidence(samples []int) (iterations int) { // from https://cs.opensource.google/go/x/perf/+/40a54f11:internal/stats/sample.go;l=196 // calculates mean and stddev at the same time mean, M2 := 0.0, 0.0 @@ -167,42 +264,51 @@ func TestNoNewEnemy(t *testing.T) { mean += delta / float64(n+1) M2 += delta * (float64(x) - mean) } - variance := M2 / float64(sampleSize-1) + variance := M2 / float64(len(samples)-1) stddev := math.Sqrt(variance) - samplestddev := stddev / math.Sqrt(float64(sampleSize)) + samplestddev := stddev / math.Sqrt(float64(len(samples))) // how many iterations do we need to get > 3sigma from the mean? // cap maxIterations to control test runtime. - iterations := int(math.Ceil(3*stddev*samplestddev + mean)) + iterations = int(math.Ceil(3*stddev*samplestddev + mean)) if *maxIterations != 0 && *maxIterations < iterations { iterations = *maxIterations } - - t.Logf("check spicedb is protected after %d attempts", iterations) - protected, _ := checkNoNewEnemy(ctx, t, schemaData, slowNodeId, crdb, protectedSpiceDb, iterations) - require.NotNil(protected, "unable to determine if spicedb is protected within the time limit") - require.True(*protected, "protection is enabled, but newenemy detected") + return } -// checkNoNewEnemy returns true if the service is protected, false if it is vulnerable, and nil if we couldn't determine -func checkNoNewEnemy(ctx context.Context, t testing.TB, schemaData []SchemaData, slowNodeId int, crdb cockroach.Cluster, spicedb spice.Cluster, candidateCount int) (*bool, int) { - var attempts int - +// checkDataNoNewEnemy returns true if the service is protected and false if it +// is vulnerable. +// +// This subtest ensures protection from a "data-based" newenemy problem: +// +// 1. Use this schema: +// definition user {} +// definition resource { +// relation direct: user +// relation excluded: user +// permission allowed = direct - excluded +// } +// 2. Write resource:1#excluded@user:A +// 3. Write resource:2#direct:@user:A +// 4. If the timestamp from (3) is before the timestamp for (2), then: +// check(resource:1#allowed@user:A) +// may succeed, when it should fail. The timestamps can be reversed +// if the tx overlap protections are disabled, because cockroach only +// ensures overlapping transactions are linearizable. +func checkDataNoNewEnemy(ctx context.Context, t testing.TB, schemaData []SchemaData, slowNodeId int, crdb cockroach.Cluster, spicedb spice.Cluster, maxAttempts int) (bool, int) { prefix := prefixForNode(ctx, crdb[1].Conn(), schemaData, slowNodeId) - objIdGenerator := generator.NewUniqueGenerator(objIDRegex) + t.Log("filling with data to span multiple ranges for prefix", prefix) + fill(t, spicedb[0].Client().V0().ACL(), prefix, 4000, 1000) - for { - attempts++ - directs, excludes := generateTuples(prefix, 1, objIdGenerator) + for attempts := 1; attempts <= maxAttempts; attempts++ { + direct, exclude := generateTuple(prefix, objIdGenerator) // write to node 1 r1, err := spicedb[0].Client().V0().ACL().Write(testCtx, &v0.WriteRequest{ - Updates: []*v0.RelationTupleUpdate{excludes[0]}, + Updates: []*v0.RelationTupleUpdate{exclude}, }) - if err != nil { - t.Log(err) - continue - } + require.NoError(t, err) // the first write has to read the namespaces from the second node, // which will resync the timestamps. sleeping allows the clocks to get @@ -211,83 +317,166 @@ func checkNoNewEnemy(ctx context.Context, t testing.TB, schemaData []SchemaData, // write to node 2 (clock is behind) r2, err := spicedb[1].Client().V0().ACL().Write(testCtx, &v0.WriteRequest{ - Updates: []*v0.RelationTupleUpdate{directs[0]}, + Updates: []*v0.RelationTupleUpdate{direct}, }) - if err != nil { - t.Log(err) - continue - } + require.NoError(t, err) canHas, err := spicedb[1].Client().V0().ACL().Check(context.Background(), &v0.CheckRequest{ TestUserset: &v0.ObjectAndRelation{ - Namespace: directs[0].Tuple.ObjectAndRelation.Namespace, - ObjectId: directs[0].Tuple.ObjectAndRelation.ObjectId, + Namespace: direct.Tuple.ObjectAndRelation.Namespace, + ObjectId: direct.Tuple.ObjectAndRelation.ObjectId, Relation: "allowed", }, - User: directs[0].Tuple.GetUser(), + User: direct.Tuple.GetUser(), AtRevision: r2.GetRevision(), }) - if err != nil { - t.Log(err) - continue - } - if canHas.IsMember { - t.Log("service is subject to the new enemy problem") - } + require.NoError(t, err) - r1leader, r2leader := getLeaderNode(testCtx, crdb[1].Conn(), excludes[0].Tuple), getLeaderNode(testCtx, crdb[1].Conn(), directs[0].Tuple) - ns1Leader := getLeaderNodeForNamespace(testCtx, crdb[1].Conn(), excludes[0].Tuple.ObjectAndRelation.Namespace) - ns2Leader := getLeaderNodeForNamespace(testCtx, crdb[1].Conn(), excludes[0].Tuple.User.GetUserset().Namespace) + r1leader, r2leader := getLeaderNode(testCtx, crdb[1].Conn(), exclude.Tuple), getLeaderNode(testCtx, crdb[1].Conn(), direct.Tuple) + ns1Leader := getLeaderNodeForNamespace(testCtx, crdb[1].Conn(), exclude.Tuple.ObjectAndRelation.Namespace) + ns2Leader := getLeaderNodeForNamespace(testCtx, crdb[1].Conn(), exclude.Tuple.User.GetUserset().Namespace) z1, _ := zookie.DecodeRevision(r1.GetRevision()) z2, _ := zookie.DecodeRevision(r2.GetRevision()) t.Log(z1, z2, z1.Sub(z2).String(), r1leader, r2leader, ns1Leader, ns2Leader) + if canHas.IsMember { + t.Log("service is subject to the new enemy problem") + return false, attempts + } + if ns1Leader != slowNodeId || ns2Leader != slowNodeId { t.Log("namespace leader changed, pick new prefix and fill") - prefix = prefixForNode(ctx, crdb[1].Conn(), schemaData, slowNodeId) - // need to fill new prefix - t.Log("filling with data to span multiple ranges for prefix ", prefix) - fill(t, spicedb[0].Client().V0().ACL(), prefix, 4000, 1000) - continue + // returning true will re-run with a new prefix + return true, attempts } - if canHas.IsMember { - t.Log("service is subject to the new enemy problem") - protected := false - return &protected, attempts + if z1.Sub(z2).IsPositive() { + t.Log("error in test, object id has been re-used.") + continue } - if !canHas.IsMember && z1.Sub(z2).IsPositive() { - t.Log("error in test, object id has been re-used.") + select { + case <-ctx.Done(): + return false, attempts + default: + // this sleep helps the clocks get back out of sync after an attempt + time.Sleep(100 * time.Millisecond) continue } + } + return true, maxAttempts +} + +// checkSchemaNoNewEnemy returns true if the service is protected, false if it +// is vulnerable. +// +// This test ensures protection from a "schema and data" newenemy, i.e. +// the new enemy conditions require linearizable changes to schema and data +// 1. Start with this schema: +// definition user {} +// definition resource { +// relation direct: user +// relation excluded: user +// permission allowed = direct +// } +// 2. Write resource:1#direct:@user:A +// 3. Write resource:1#excluded@user:A +// 4. Update to this schema: +// definition user {} +// definition resource { +// relation direct: user +// relation excluded: user +// permission allowed = direct - excluded +// } +// 5. If the revision from (4) is before the timestamp for (3), then: +// check(revision from 3, resource:1#allowed@user:A) +// may fail, when it should succeed. The timestamps can be reversed +// if the tx overlap protections are disabled, because cockroach only +// ensures overlapping transactions are linearizable. +// In this case, we don't get an explicit revision back from the +// WriteSchema call, but the Schema write and the resource write are +// fully consistent. +func checkSchemaNoNewEnemy(ctx context.Context, t testing.TB, schemaData []SchemaData, slowNodeId int, crdb cockroach.Cluster, spicedb spice.Cluster, maxAttempts int) (bool, int) { + prefix := prefixForNode(ctx, crdb[1].Conn(), schemaData, slowNodeId) + var b strings.Builder + require.NoError(t, schemaAllowTpl.Execute(&b, SchemaData{Prefixes: []string{prefix}})) + allowSchema := b.String() + b.Reset() + require.NoError(t, schemaTpl.Execute(&b, SchemaData{Prefixes: []string{prefix}})) + excludeSchema := b.String() + + for attempts := 1; attempts <= maxAttempts; attempts++ { + direct, exclude := generateTuple(prefix, objIdGenerator) + + // write the "allow" schema + require.NoError(t, getErr(spicedb[0].Client().V1Alpha1().Schema().WriteSchema(context.Background(), &v1alpha1.WriteSchemaRequest{ + Schema: allowSchema, + }))) + + // write the "direct" tuple + require.NoError(t, getErr(spicedb[0].Client().V0().ACL().Write(testCtx, &v0.WriteRequest{ + Updates: []*v0.RelationTupleUpdate{direct}, + }))) + + // write the "excludes" tuple + // writing to 1 primes the namespace cache on node 1 with the "allow" namespace + r2, err := spicedb[1].Client().V0().ACL().Write(testCtx, &v0.WriteRequest{ + Updates: []*v0.RelationTupleUpdate{exclude}, + }) + require.NoError(t, err) - // if we find causal reversals, but no newenemy, assume we're protected - if candidateCount > 0 && attempts >= candidateCount { - t.Log(candidateCount, "(would be) causal reversals with no new enemy detected") - protected := true - return &protected, attempts + // write the "exclude" schema. If this write hits the slow crdb node, it + // can get a revision in between the direct and exclude tuple writes + // which will cause check to fail, when it should succeed + require.NoError(t, getErr(spicedb[1].Client().V1Alpha1().Schema().WriteSchema(testCtx, &v1alpha1.WriteSchemaRequest{ + Schema: excludeSchema, + }))) + + rev, err := zookie.DecodeRevision(r2.GetRevision()) + require.NoError(t, err) + + var canHas *v1.CheckPermissionResponse + checkAccess := func() bool { + var err error + + canHas, err = spicedb[0].Client().V1().Permissions().CheckPermission(testCtx, &v1.CheckPermissionRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtExactSnapshot{AtExactSnapshot: zedtoken.NewFromRevision(rev)}, + }, + Resource: &v1.ObjectReference{ + ObjectType: direct.Tuple.ObjectAndRelation.Namespace, + ObjectId: direct.Tuple.ObjectAndRelation.ObjectId, + }, + Permission: "allowed", + Subject: &v1.SubjectReference{ + Object: &v1.ObjectReference{ + ObjectType: direct.Tuple.User.GetUserset().Namespace, + ObjectId: direct.Tuple.User.GetUserset().ObjectId, + }, + }, + }) + if err != nil { + t.Log(err) + } + return err == nil } + require.Eventually(t, checkAccess, 10*time.Second, 10*time.Millisecond) - if attempts > 1000 { - t.Log("trying with a new prefix") - attempts = 0 - prefix = prefixForNode(ctx, crdb[1].Conn(), schemaData, slowNodeId) - t.Log("filling with data to span multiple ranges for prefix ", prefix) - fill(t, spicedb[0].Client().V0().ACL(), prefix, 4000, 1000) - continue + if canHas.Permissionship == v1.CheckPermissionResponse_PERMISSIONSHIP_NO_PERMISSION { + t.Log("service is subject to the new enemy problem") + return false, attempts } select { case <-ctx.Done(): - return nil, attempts + return false, attempts default: + // this is not strictly needed, but helps avoid write contention + time.Sleep(100 * time.Millisecond) continue } - - // allow clocks to desync - time.Sleep(100 * time.Millisecond) } + return true, maxAttempts } func BenchmarkBatchWrites(b *testing.B) { @@ -299,7 +488,7 @@ func BenchmarkBatchWrites(b *testing.B) { require.NoError(b, spicedb.Start(ctx, tlog, "")) require.NoError(b, spicedb.Connect(ctx, tlog)) - directs, excludes := generateTuples("", b.N*20, generator.NewUniqueGenerator(objIDRegex)) + directs, excludes := generateTuples("", b.N*20, objIdGenerator) b.ResetTimer() _, err := spicedb[0].Client().V0().ACL().Write(testCtx, &v0.WriteRequest{ Updates: excludes, @@ -329,13 +518,13 @@ func BenchmarkConflictingTupleWrites(b *testing.B) { b.ResetTimer() - checkNoNewEnemy(ctx, b, generateSchemaData(1, 1), 1, crdb, spicedb, b.N) + checkDataNoNewEnemy(ctx, b, generateSchemaData(1, 1), 1, crdb, spicedb, b.N) } func fill(t testing.TB, client v0.ACLServiceClient, prefix string, fillerCount, batchSize int) { t.Log("filling prefix", prefix) require := require.New(t) - directs, excludes := generateTuples(prefix, fillerCount, generator.NewUniqueGenerator(objIDRegex)) + directs, excludes := generateTuples(prefix, fillerCount, objIdGenerator) for i := 0; i < fillerCount/batchSize; i++ { t.Log("filling", i*batchSize, "to", (i+1)*batchSize) _, err := client.Write(testCtx, &v0.WriteRequest{ @@ -349,24 +538,19 @@ func fill(t testing.TB, client v0.ACLServiceClient, prefix string, fillerCount, } } -func fillSchema(t testing.TB, data []SchemaData, schemaClient v1alpha1.SchemaServiceClient) error { +// fillSchema generates the schema text for given SchemaData and applies it +func fillSchema(t testing.TB, template *template.Template, data []SchemaData, schemaClient v1alpha1.SchemaServiceClient) { var b strings.Builder batchSize := len(data[0].Prefixes) for i, d := range data { t.Logf("filling %d to %d", i*batchSize, (i+1)*batchSize) b.Reset() - err := schemaTpl.Execute(&b, d) - if err != nil { - return err - } - _, err = schemaClient.WriteSchema(context.Background(), &v1alpha1.WriteSchemaRequest{ + require.NoError(t, template.Execute(&b, d)) + _, err := schemaClient.WriteSchema(context.Background(), &v1alpha1.WriteSchemaRequest{ Schema: b.String(), }) - if err != nil { - return err - } + require.NoError(t, err) } - return nil } // prefixForNode finds a prefix with namespace leaders on the node id @@ -393,7 +577,6 @@ func prefixForNode(ctx context.Context, conn *pgx.Conn, data []SchemaData, node func generateSchemaData(n int, batchSize int) (data []SchemaData) { data = make([]SchemaData, 0, n/batchSize) - prefixGenerator := generator.NewUniqueGenerator(namespacePrefixRegex) for i := 0; i < n/batchSize; i++ { schema := SchemaData{Prefixes: make([]string, 0, batchSize)} for j := i * batchSize; j < (i+1)*batchSize; j++ { @@ -404,6 +587,11 @@ func generateSchemaData(n int, batchSize int) (data []SchemaData) { return } +func generateTuple(prefix string, objIdGenerator *generator.UniqueGenerator) (direct *v0.RelationTupleUpdate, exclude *v0.RelationTupleUpdate) { + directs, excludes := generateTuples(prefix, 1, objIdGenerator) + return directs[0], excludes[0] +} + func generateTuples(prefix string, n int, objIdGenerator *generator.UniqueGenerator) (directs []*v0.RelationTupleUpdate, excludes []*v0.RelationTupleUpdate) { directs = make([]*v0.RelationTupleUpdate, 0, n) excludes = make([]*v0.RelationTupleUpdate, 0, n) @@ -475,6 +663,8 @@ func getLeaderNodeForNamespace(ctx context.Context, conn *pgx.Conn, namespace st return leaderFromRangeRow(rows) } +// leaderFromRangeRow parses the rows from a `SHOW RANGE` query and returns the +// leader node id for the range func leaderFromRangeRow(rows pgx.Rows) int { var ( startKey sql.NullString @@ -494,3 +684,17 @@ func leaderFromRangeRow(rows pgx.Rows) int { } return leaseHolder } + +func getErr(vals ...interface{}) error { + if len(vals) == 0 { + return nil + } + err := vals[len(vals)-1] + if err == nil { + return nil + } + if err, ok := err.(error); ok { + return err + } + return nil +} diff --git a/internal/datastore/crdb/namespace.go b/internal/datastore/crdb/namespace.go index 07a543295c..5edab10af2 100644 --- a/internal/datastore/crdb/namespace.go +++ b/internal/datastore/crdb/namespace.go @@ -44,6 +44,8 @@ var ( func (cds *crdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.NamespaceDefinition) (datastore.Revision, error) { var hlcNow decimal.Decimal if err := cds.execute(ctx, cds.conn, pgx.TxOptions{}, func(tx pgx.Tx) error { + keySet := newKeySet() + cds.AddOverlapKey(keySet, newConfig.Name) serialized, err := proto.Marshal(newConfig) if err != nil { return err @@ -54,7 +56,12 @@ func (cds *crdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Name if err != nil { return err } - return cds.conn.QueryRow( + for k := range keySet { + if _, err := tx.Exec(ctx, queryTouchTransaction, k); err != nil { + return err + } + } + return tx.QueryRow( datastore.SeparateContextWithTracing(ctx), writeSQL, writeArgs..., ).Scan(&hlcNow) }); err != nil {