Skip to content

Commit

Permalink
Merge pull request #1892 from josephschorr/spanner-stats
Browse files Browse the repository at this point in the history
Switch spanner datastore to use the built-in stats table for estimating rel count
  • Loading branch information
josephschorr committed May 13, 2024
2 parents 8339620 + d05a0f9 commit 2ccd129
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 109 deletions.
35 changes: 8 additions & 27 deletions internal/datastore/spanner/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (

type spannerReadWriteTXN struct {
spannerReader
spannerRWT *spanner.ReadWriteTransaction
disableStats bool
spannerRWT *spanner.ReadWriteTransaction
}

const inLimit = 10_000 // https://cloud.google.com/spanner/quotas#query-limits
Expand All @@ -40,12 +39,6 @@ func (rwt spannerReadWriteTXN) WriteRelationships(ctx context.Context, mutations
}
}

if !rwt.disableStats {
if err := updateCounter(ctx, rwt.spannerRWT, rowCountChange); err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
}
}

return nil
}

Expand Down Expand Up @@ -74,15 +67,15 @@ func spannerMutation(
}

func (rwt spannerReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
limitReached, err := deleteWithFilter(ctx, rwt.spannerRWT, filter, rwt.disableStats, opts...)
limitReached, err := deleteWithFilter(ctx, rwt.spannerRWT, filter, opts...)
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

return limitReached, nil
}

func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool, opts ...options.DeleteOptionsOption) (bool, error) {
func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
var delLimit uint64
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
Expand All @@ -94,34 +87,28 @@ func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, fi

var numDeleted int64
if delLimit > 0 {
nu, err := deleteWithFilterAndLimit(ctx, rwt, filter, disableStats, delLimit)
nu, err := deleteWithFilterAndLimit(ctx, rwt, filter, delLimit)
if err != nil {
return false, err
}
numDeleted = nu
} else {
nu, err := deleteWithFilterAndNoLimit(ctx, rwt, filter, disableStats)
nu, err := deleteWithFilterAndNoLimit(ctx, rwt, filter)
if err != nil {
return false, err
}

numDeleted = nu
}

if !disableStats {
if err := updateCounter(ctx, rwt, -1*numDeleted); err != nil {
return false, err
}
}

if delLimit > 0 && uint64(numDeleted) == delLimit {
return true, nil
}

return false, nil
}

func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool, delLimit uint64) (int64, error) {
func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, delLimit uint64) (int64, error) {
query := queryTuplesForDelete
filteredQuery, err := applyFilterToQuery(query, filter)
if err != nil {
Expand Down Expand Up @@ -172,7 +159,7 @@ func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransac
return int64(len(mutations)), nil
}

func deleteWithFilterAndNoLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool) (int64, error) {
func deleteWithFilterAndNoLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter) (int64, error) {
query := sql.Delete(tableRelationship)
filteredQuery, err := applyFilterToQuery(query, filter)
if err != nil {
Expand Down Expand Up @@ -278,7 +265,7 @@ func (rwt spannerReadWriteTXN) WriteNamespaces(_ context.Context, newConfigs ...
func (rwt spannerReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error {
for _, nsName := range nsNames {
relFilter := &v1.RelationshipFilter{ResourceType: nsName}
if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter, rwt.disableStats); err != nil {
if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter); err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
}

Expand Down Expand Up @@ -313,12 +300,6 @@ func (rwt spannerReadWriteTXN) BulkLoad(ctx context.Context, iter datastore.Bulk
return 0, fmt.Errorf(errUnableToBulkLoadRelationships, err)
}

if !rwt.disableStats {
if err := updateCounter(ctx, rwt.spannerRWT, int64(numLoaded)); err != nil {
return 0, fmt.Errorf(errUnableToBulkLoadRelationships, err)
}
}

return numLoaded, nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/datastore/spanner/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var ParseRevisionString = revisions.RevisionParser(revisions.Timestamp)

func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
func (sd *spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
now, err := sd.now(ctx)
if err != nil {
return datastore.NoRevision, fmt.Errorf(errRevision, err)
Expand All @@ -22,11 +22,11 @@ func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.
return revisions.NewForTime(now), nil
}

func (sd spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
func (sd *spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
return sd.headRevisionInternal(ctx)
}

func (sd spannerDatastore) now(ctx context.Context) (time.Time, error) {
func (sd *spannerDatastore) now(ctx context.Context) (time.Time, error) {
var timestamp time.Time
if err := sd.client.Single().Query(ctx, spanner.NewStatement("SELECT CURRENT_TIMESTAMP()")).Do(func(r *spanner.Row) error {
return r.Columns(&timestamp)
Expand Down
4 changes: 0 additions & 4 deletions internal/datastore/spanner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ const (

tableMetadata = "metadata"
colUniqueID = "unique_id"

tableCounters = "relationship_estimate_counters"
colID = "id"
colCount = "count"
)

var allRelationshipCols = []string{
Expand Down
34 changes: 22 additions & 12 deletions internal/datastore/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"regexp"
"strconv"
"sync"
"time"

"cloud.google.com/go/spanner"
Expand Down Expand Up @@ -61,6 +62,8 @@ const (
defaultChangeStreamRetention = 24 * time.Hour
)

const tableSizesStatsTable = "spanner_sys.table_sizes_stats_1hour"

var (
sql = sq.StatementBuilder.PlaceholderFormat(sq.AtP)
tracer = otel.Tracer("spicedb/internal/datastore/spanner")
Expand All @@ -78,6 +81,11 @@ type spannerDatastore struct {
client *spanner.Client
config spannerOptions
database string

cachedEstimatedBytesPerRelationshipLock sync.RWMutex
cachedEstimatedBytesPerRelationship uint64

tableSizesStatsTable string
}

// NewSpannerDatastore returns a datastore backed by cloud spanner
Expand Down Expand Up @@ -143,7 +151,7 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())*
config.maxRevisionStalenessPercent) * time.Nanosecond

ds := spannerDatastore{
ds := &spannerDatastore{
RemoteClockRevisions: revisions.NewRemoteClockRevisions(
defaultChangeStreamRetention,
maxRevisionStaleness,
Expand All @@ -153,11 +161,14 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
CommonDecoder: revisions.CommonDecoder{
Kind: revisions.Timestamp,
},
client: client,
config: config,
database: database,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
watchBufferLength: config.watchBufferLength,
client: client,
config: config,
database: database,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
watchBufferLength: config.watchBufferLength,
cachedEstimatedBytesPerRelationship: 0,
cachedEstimatedBytesPerRelationshipLock: sync.RWMutex{},
tableSizesStatsTable: tableSizesStatsTable,
}
ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal)

Expand Down Expand Up @@ -195,7 +206,7 @@ func (t *traceableRTX) Query(ctx context.Context, statement spanner.Statement) *
return t.delegate.Query(ctx, statement)
}

func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader {
func (sd *spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader {
r := revisionRaw.(revisions.TimestampRevision)

txSource := func() readTX {
Expand All @@ -205,7 +216,7 @@ func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datast
return spannerReader{executor, txSource}
}

func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
func (sd *spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
config := options.NewRWTOptionsWithOptions(opts...)

ctx, span := tracer.Start(ctx, "ReadWriteTx")
Expand All @@ -221,7 +232,6 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF
rwt := spannerReadWriteTXN{
spannerReader{executor, txSource},
spannerRWT,
sd.config.disableStats,
}
err := func() error {
innerCtx, innerSpan := tracer.Start(ctx, "TxUserFunc")
Expand All @@ -248,7 +258,7 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF
return revisions.NewForTime(ts), nil
}

func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
func (sd *spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
headMigration, err := migrations.SpannerMigrations.HeadRevision()
if err != nil {
return datastore.ReadyState{}, fmt.Errorf("invalid head migration found for spanner: %w", err)
Expand All @@ -275,11 +285,11 @@ func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState
}, nil
}

func (sd spannerDatastore) Features(_ context.Context) (*datastore.Features, error) {
func (sd *spannerDatastore) Features(_ context.Context) (*datastore.Features, error) {
return &datastore.Features{Watch: datastore.Feature{Enabled: true}}, nil
}

func (sd spannerDatastore) Close() error {
func (sd *spannerDatastore) Close() error {
sd.client.Close()
return nil
}
Expand Down
109 changes: 107 additions & 2 deletions internal/datastore/spanner/spanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@ import (
"testing"
"time"

"cloud.google.com/go/spanner"
admin "cloud.google.com/go/spanner/admin/database/apiv1"
"cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

testdatastore "github.com/authzed/spicedb/internal/testserver/datastore"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/test"
corev1 "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
)

// Implement TestableDatastore interface
func (sd spannerDatastore) ExampleRetryableError() error {
func (sd *spannerDatastore) ExampleRetryableError() error {
return status.New(codes.Aborted, "retryable").Err()
}

Expand All @@ -38,5 +43,105 @@ func TestSpannerDatastore(t *testing.T) {
return ds
})
return ds, nil
}), test.WithCategories(test.GCCategory, test.WatchCategory))
}), test.WithCategories(test.GCCategory, test.WatchCategory, test.StatsCategory))

t.Run("TestFakeStats", createDatastoreTest(
b,
FakeStatsTest,
))
}

type datastoreTestFunc func(t *testing.T, ds datastore.Datastore)

func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) {
return func(t *testing.T) {
ctx := context.Background()
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := NewSpannerDatastore(ctx, uri, options...)
require.NoError(t, err)
return ds
})
defer ds.Close()

tf(t, ds)
}
}

const createFakeStatsTable = `
CREATE TABLE fake_stats_table (
interval_end TIMESTAMP,
table_name STRING(MAX),
used_bytes INT64,
) PRIMARY KEY (table_name, interval_end)
`

func FakeStatsTest(t *testing.T, ds datastore.Datastore) {
spannerDS := ds.(*spannerDatastore)
spannerDS.tableSizesStatsTable = "fake_stats_table"

spannerClient := spannerDS.client
ctx := context.Background()

adminClient, err := admin.NewDatabaseAdminClient(ctx)
require.NoError(t, err)

// Manually add the stats table to simulate the table that the emulator doesn't create.
updateOp, err := adminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{
Database: spannerClient.DatabaseName(),
Statements: []string{
createFakeStatsTable,
},
})
require.NoError(t, err)

err = updateOp.Wait(ctx)
require.NoError(t, err)

// Call stats with no stats rows and no relationship rows.
stats, err := ds.Statistics(ctx)
require.NoError(t, err)
require.Equal(t, uint64(0), stats.EstimatedRelationshipCount)

// Add some relationships.
_, err = ds.ReadWriteTx(ctx, func(ctx context.Context, tx datastore.ReadWriteTransaction) error {
return tx.WriteRelationships(ctx, []*corev1.RelationTupleUpdate{
tuple.Create(tuple.MustParse("document:foo#viewer@user:tom")),
tuple.Create(tuple.MustParse("document:foo#viewer@user:sarah")),
tuple.Create(tuple.MustParse("document:foo#viewer@user:fred")),
})
})
require.NoError(t, err)

// Call stats with no stats rows and some relationship rows.
stats, err = ds.Statistics(ctx)
require.NoError(t, err)
require.Equal(t, uint64(0), stats.EstimatedRelationshipCount)

// Add some stats row with a byte count.
_, err = spannerClient.Apply(ctx, []*spanner.Mutation{
spanner.Insert("fake_stats_table", []string{"interval_end", "table_name", "used_bytes"}, []interface{}{
time.Now().UTC().Add(-100 * time.Second), tableRelationship, 100,
}),
})
require.NoError(t, err)

// Call stats with a stats row and some relationship rows and ensure we get an estimate.
stats, err = ds.Statistics(ctx)
require.NoError(t, err)
require.Equal(t, uint64(3), stats.EstimatedRelationshipCount)

// Add some more relationships.
_, err = ds.ReadWriteTx(ctx, func(ctx context.Context, tx datastore.ReadWriteTransaction) error {
return tx.WriteRelationships(ctx, []*corev1.RelationTupleUpdate{
tuple.Create(tuple.MustParse("document:foo#viewer@user:tommy1236512365123651236512365123612365123655")),
tuple.Create(tuple.MustParse("document:foo#viewer@user:sara1236512365123651236512365123651236512365")),
tuple.Create(tuple.MustParse("document:foo#viewer@user:freddy1236512365123651236512365123651236512365")),
})
})
require.NoError(t, err)

// Call stats again and ensure it uses the cached relationship size value, even if we'd addded more relationships.
stats, err = ds.Statistics(ctx)
require.NoError(t, err)
require.Equal(t, uint64(3), stats.EstimatedRelationshipCount)
}

0 comments on commit 2ccd129

Please sign in to comment.