Skip to content

Commit

Permalink
Switch spanner datastore to use the built-in stats table for estimati…
Browse files Browse the repository at this point in the history
…ng rel count

While this will be far less accurate of an estimate, it removes the need to write to a stats table on every write and delete, which should help with performance
  • Loading branch information
josephschorr committed May 10, 2024
1 parent 6952b47 commit df9b26c
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 108 deletions.
35 changes: 8 additions & 27 deletions internal/datastore/spanner/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (

type spannerReadWriteTXN struct {
spannerReader
spannerRWT *spanner.ReadWriteTransaction
disableStats bool
spannerRWT *spanner.ReadWriteTransaction
}

const inLimit = 10_000 // https://cloud.google.com/spanner/quotas#query-limits
Expand All @@ -40,12 +39,6 @@ func (rwt spannerReadWriteTXN) WriteRelationships(ctx context.Context, mutations
}
}

if !rwt.disableStats {
if err := updateCounter(ctx, rwt.spannerRWT, rowCountChange); err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
}
}

return nil
}

Expand Down Expand Up @@ -74,15 +67,15 @@ func spannerMutation(
}

func (rwt spannerReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
limitReached, err := deleteWithFilter(ctx, rwt.spannerRWT, filter, rwt.disableStats, opts...)
limitReached, err := deleteWithFilter(ctx, rwt.spannerRWT, filter, opts...)
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

return limitReached, nil
}

func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool, opts ...options.DeleteOptionsOption) (bool, error) {
func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
var delLimit uint64
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
Expand All @@ -94,34 +87,28 @@ func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, fi

var numDeleted int64
if delLimit > 0 {
nu, err := deleteWithFilterAndLimit(ctx, rwt, filter, disableStats, delLimit)
nu, err := deleteWithFilterAndLimit(ctx, rwt, filter, delLimit)
if err != nil {
return false, err
}
numDeleted = nu
} else {
nu, err := deleteWithFilterAndNoLimit(ctx, rwt, filter, disableStats)
nu, err := deleteWithFilterAndNoLimit(ctx, rwt, filter)
if err != nil {
return false, err
}

numDeleted = nu
}

if !disableStats {
if err := updateCounter(ctx, rwt, -1*numDeleted); err != nil {
return false, err
}
}

if delLimit > 0 && uint64(numDeleted) == delLimit {
return true, nil
}

return false, nil
}

func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool, delLimit uint64) (int64, error) {
func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, delLimit uint64) (int64, error) {
query := queryTuplesForDelete
filteredQuery, err := applyFilterToQuery(query, filter)
if err != nil {
Expand Down Expand Up @@ -172,7 +159,7 @@ func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransac
return int64(len(mutations)), nil
}

func deleteWithFilterAndNoLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool) (int64, error) {
func deleteWithFilterAndNoLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter) (int64, error) {
query := sql.Delete(tableRelationship)
filteredQuery, err := applyFilterToQuery(query, filter)
if err != nil {
Expand Down Expand Up @@ -278,7 +265,7 @@ func (rwt spannerReadWriteTXN) WriteNamespaces(_ context.Context, newConfigs ...
func (rwt spannerReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error {
for _, nsName := range nsNames {
relFilter := &v1.RelationshipFilter{ResourceType: nsName}
if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter, rwt.disableStats); err != nil {
if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter); err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
}

Expand Down Expand Up @@ -313,12 +300,6 @@ func (rwt spannerReadWriteTXN) BulkLoad(ctx context.Context, iter datastore.Bulk
return 0, fmt.Errorf(errUnableToBulkLoadRelationships, err)
}

if !rwt.disableStats {
if err := updateCounter(ctx, rwt.spannerRWT, int64(numLoaded)); err != nil {
return 0, fmt.Errorf(errUnableToBulkLoadRelationships, err)
}
}

return numLoaded, nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/datastore/spanner/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var ParseRevisionString = revisions.RevisionParser(revisions.Timestamp)

func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
func (sd *spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
now, err := sd.now(ctx)
if err != nil {
return datastore.NoRevision, fmt.Errorf(errRevision, err)
Expand All @@ -22,11 +22,11 @@ func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.
return revisions.NewForTime(now), nil
}

func (sd spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
func (sd *spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
return sd.headRevisionInternal(ctx)
}

func (sd spannerDatastore) now(ctx context.Context) (time.Time, error) {
func (sd *spannerDatastore) now(ctx context.Context) (time.Time, error) {
var timestamp time.Time
if err := sd.client.Single().Query(ctx, spanner.NewStatement("SELECT CURRENT_TIMESTAMP()")).Do(func(r *spanner.Row) error {
return r.Columns(&timestamp)
Expand Down
4 changes: 0 additions & 4 deletions internal/datastore/spanner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ const (

tableMetadata = "metadata"
colUniqueID = "unique_id"

tableCounters = "relationship_estimate_counters"
colID = "id"
colCount = "count"
)

var allRelationshipCols = []string{
Expand Down
34 changes: 22 additions & 12 deletions internal/datastore/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"regexp"
"strconv"
"sync"
"time"

"cloud.google.com/go/spanner"
Expand Down Expand Up @@ -61,6 +62,8 @@ const (
defaultChangeStreamRetention = 24 * time.Hour
)

const tableSizesStatsTable = "spanner_sys.table_sizes_stats_1hour"

var (
sql = sq.StatementBuilder.PlaceholderFormat(sq.AtP)
tracer = otel.Tracer("spicedb/internal/datastore/spanner")
Expand All @@ -78,6 +81,11 @@ type spannerDatastore struct {
client *spanner.Client
config spannerOptions
database string

cachedEstimatedBytesPerRelationshipLock sync.RWMutex
cachedEstimatedBytesPerRelationship uint64

tableSizesStatsTable string
}

// NewSpannerDatastore returns a datastore backed by cloud spanner
Expand Down Expand Up @@ -143,7 +151,7 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())*
config.maxRevisionStalenessPercent) * time.Nanosecond

ds := spannerDatastore{
ds := &spannerDatastore{
RemoteClockRevisions: revisions.NewRemoteClockRevisions(
defaultChangeStreamRetention,
maxRevisionStaleness,
Expand All @@ -153,11 +161,14 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
CommonDecoder: revisions.CommonDecoder{
Kind: revisions.Timestamp,
},
client: client,
config: config,
database: database,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
watchBufferLength: config.watchBufferLength,
client: client,
config: config,
database: database,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
watchBufferLength: config.watchBufferLength,
cachedEstimatedBytesPerRelationship: 0,
cachedEstimatedBytesPerRelationshipLock: sync.RWMutex{},
tableSizesStatsTable: tableSizesStatsTable,
}
ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal)

Expand Down Expand Up @@ -195,7 +206,7 @@ func (t *traceableRTX) Query(ctx context.Context, statement spanner.Statement) *
return t.delegate.Query(ctx, statement)
}

func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader {
func (sd *spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader {
r := revisionRaw.(revisions.TimestampRevision)

txSource := func() readTX {
Expand All @@ -205,7 +216,7 @@ func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datast
return spannerReader{executor, txSource}
}

func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
func (sd *spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
config := options.NewRWTOptionsWithOptions(opts...)

ctx, span := tracer.Start(ctx, "ReadWriteTx")
Expand All @@ -221,7 +232,6 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF
rwt := spannerReadWriteTXN{
spannerReader{executor, txSource},
spannerRWT,
sd.config.disableStats,
}
err := func() error {
innerCtx, innerSpan := tracer.Start(ctx, "TxUserFunc")
Expand All @@ -248,7 +258,7 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF
return revisions.NewForTime(ts), nil
}

func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
func (sd *spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
headMigration, err := migrations.SpannerMigrations.HeadRevision()
if err != nil {
return datastore.ReadyState{}, fmt.Errorf("invalid head migration found for spanner: %w", err)
Expand All @@ -275,11 +285,11 @@ func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState
}, nil
}

func (sd spannerDatastore) Features(_ context.Context) (*datastore.Features, error) {
func (sd *spannerDatastore) Features(_ context.Context) (*datastore.Features, error) {
return &datastore.Features{Watch: datastore.Feature{Enabled: true}}, nil
}

func (sd spannerDatastore) Close() error {
func (sd *spannerDatastore) Close() error {
sd.client.Close()
return nil
}
Expand Down
109 changes: 107 additions & 2 deletions internal/datastore/spanner/spanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@ import (
"testing"
"time"

"cloud.google.com/go/spanner"
admin "cloud.google.com/go/spanner/admin/database/apiv1"
"cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

testdatastore "github.com/authzed/spicedb/internal/testserver/datastore"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/test"
corev1 "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
)

// Implement TestableDatastore interface
func (sd spannerDatastore) ExampleRetryableError() error {
func (sd *spannerDatastore) ExampleRetryableError() error {
return status.New(codes.Aborted, "retryable").Err()
}

Expand All @@ -38,5 +43,105 @@ func TestSpannerDatastore(t *testing.T) {
return ds
})
return ds, nil
}), test.WithCategories(test.GCCategory, test.WatchCategory))
}), test.WithCategories(test.GCCategory, test.WatchCategory, test.StatsCategory))

t.Run("TestFakeStats", createDatastoreTest(
b,
FakeStatsTest,
))
}

type datastoreTestFunc func(t *testing.T, ds datastore.Datastore)

func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) {
return func(t *testing.T) {
ctx := context.Background()
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := NewSpannerDatastore(ctx, uri, options...)
require.NoError(t, err)
return ds
})
defer ds.Close()

tf(t, ds)
}
}

const createFakeStatsTable = `
CREATE TABLE fake_stats_table (
interval_end TIMESTAMP,
table_name STRING(MAX),
used_bytes INT64,
) PRIMARY KEY (table_name, interval_end)
`

func FakeStatsTest(t *testing.T, ds datastore.Datastore) {
spannerDS := ds.(*spannerDatastore)
spannerDS.tableSizesStatsTable = "fake_stats_table"

spannerClient := spannerDS.client
ctx := context.Background()

adminClient, err := admin.NewDatabaseAdminClient(ctx)
require.NoError(t, err)

// Manually add the stats table to simulate the table that the emulator doesn't create.
updateOp, err := adminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{
Database: spannerClient.DatabaseName(),
Statements: []string{
createFakeStatsTable,
},
})
require.NoError(t, err)

err = updateOp.Wait(ctx)
require.NoError(t, err)

// Call stats with no stats rows and no relationship rows.
stats, err := ds.Statistics(ctx)
require.NoError(t, err)
require.Equal(t, uint64(0), stats.EstimatedRelationshipCount)

// Add some relationships.
_, err = ds.ReadWriteTx(ctx, func(ctx context.Context, tx datastore.ReadWriteTransaction) error {
return tx.WriteRelationships(ctx, []*corev1.RelationTupleUpdate{
tuple.Create(tuple.MustParse("document:foo#viewer@user:tom")),
tuple.Create(tuple.MustParse("document:foo#viewer@user:sarah")),
tuple.Create(tuple.MustParse("document:foo#viewer@user:fred")),
})
})
require.NoError(t, err)

// Call stats with no stats rows and some relationship rows.
stats, err = ds.Statistics(ctx)
require.NoError(t, err)
require.Equal(t, uint64(0), stats.EstimatedRelationshipCount)

// Add some stats row with a byte count.
_, err = spannerClient.Apply(ctx, []*spanner.Mutation{
spanner.Insert("fake_stats_table", []string{"interval_end", "table_name", "used_bytes"}, []interface{}{
time.Now().UTC().Add(-100 * time.Second), tableRelationship, 100,
}),
})
require.NoError(t, err)

// Call stats with a stats row and some relationship rows and ensure we get an estimate.
stats, err = ds.Statistics(ctx)
require.NoError(t, err)
require.Equal(t, uint64(3), stats.EstimatedRelationshipCount)

// Add some more relationships.
_, err = ds.ReadWriteTx(ctx, func(ctx context.Context, tx datastore.ReadWriteTransaction) error {
return tx.WriteRelationships(ctx, []*corev1.RelationTupleUpdate{
tuple.Create(tuple.MustParse("document:foo#viewer@user:tommy1236512365123651236512365123612365123655")),
tuple.Create(tuple.MustParse("document:foo#viewer@user:sara1236512365123651236512365123651236512365")),
tuple.Create(tuple.MustParse("document:foo#viewer@user:freddy1236512365123651236512365123651236512365")),
})
})
require.NoError(t, err)

// Call stats again and ensure it uses the cached relationship size value, even if we'd addded more relationships.
stats, err = ds.Statistics(ctx)
require.NoError(t, err)
require.Equal(t, uint64(3), stats.EstimatedRelationshipCount)
}

0 comments on commit df9b26c

Please sign in to comment.