Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch spanner datastore to use the built-in stats table for estimating rel count #1892

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 8 additions & 27 deletions internal/datastore/spanner/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (

type spannerReadWriteTXN struct {
spannerReader
spannerRWT *spanner.ReadWriteTransaction
disableStats bool
spannerRWT *spanner.ReadWriteTransaction
}

const inLimit = 10_000 // https://cloud.google.com/spanner/quotas#query-limits
Expand All @@ -40,12 +39,6 @@ func (rwt spannerReadWriteTXN) WriteRelationships(ctx context.Context, mutations
}
}

if !rwt.disableStats {
vroldanbet marked this conversation as resolved.
Show resolved Hide resolved
if err := updateCounter(ctx, rwt.spannerRWT, rowCountChange); err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
}
}

return nil
}

Expand Down Expand Up @@ -74,15 +67,15 @@ func spannerMutation(
}

func (rwt spannerReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
limitReached, err := deleteWithFilter(ctx, rwt.spannerRWT, filter, rwt.disableStats, opts...)
limitReached, err := deleteWithFilter(ctx, rwt.spannerRWT, filter, opts...)
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

return limitReached, nil
}

func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool, opts ...options.DeleteOptionsOption) (bool, error) {
func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
var delLimit uint64
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
Expand All @@ -94,34 +87,28 @@ func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, fi

var numDeleted int64
if delLimit > 0 {
nu, err := deleteWithFilterAndLimit(ctx, rwt, filter, disableStats, delLimit)
nu, err := deleteWithFilterAndLimit(ctx, rwt, filter, delLimit)
if err != nil {
return false, err
}
numDeleted = nu
} else {
nu, err := deleteWithFilterAndNoLimit(ctx, rwt, filter, disableStats)
nu, err := deleteWithFilterAndNoLimit(ctx, rwt, filter)
if err != nil {
return false, err
}

numDeleted = nu
}

if !disableStats {
if err := updateCounter(ctx, rwt, -1*numDeleted); err != nil {
return false, err
}
}

if delLimit > 0 && uint64(numDeleted) == delLimit {
return true, nil
}

return false, nil
}

func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool, delLimit uint64) (int64, error) {
func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, delLimit uint64) (int64, error) {
query := queryTuplesForDelete
filteredQuery, err := applyFilterToQuery(query, filter)
if err != nil {
Expand Down Expand Up @@ -172,7 +159,7 @@ func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransac
return int64(len(mutations)), nil
}

func deleteWithFilterAndNoLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool) (int64, error) {
func deleteWithFilterAndNoLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter) (int64, error) {
query := sql.Delete(tableRelationship)
filteredQuery, err := applyFilterToQuery(query, filter)
if err != nil {
Expand Down Expand Up @@ -278,7 +265,7 @@ func (rwt spannerReadWriteTXN) WriteNamespaces(_ context.Context, newConfigs ...
func (rwt spannerReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error {
for _, nsName := range nsNames {
relFilter := &v1.RelationshipFilter{ResourceType: nsName}
if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter, rwt.disableStats); err != nil {
if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter); err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
}

Expand Down Expand Up @@ -313,12 +300,6 @@ func (rwt spannerReadWriteTXN) BulkLoad(ctx context.Context, iter datastore.Bulk
return 0, fmt.Errorf(errUnableToBulkLoadRelationships, err)
}

if !rwt.disableStats {
if err := updateCounter(ctx, rwt.spannerRWT, int64(numLoaded)); err != nil {
return 0, fmt.Errorf(errUnableToBulkLoadRelationships, err)
}
}

return numLoaded, nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/datastore/spanner/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var ParseRevisionString = revisions.RevisionParser(revisions.Timestamp)

func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
func (sd *spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
now, err := sd.now(ctx)
if err != nil {
return datastore.NoRevision, fmt.Errorf(errRevision, err)
Expand All @@ -22,11 +22,11 @@ func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.
return revisions.NewForTime(now), nil
}

func (sd spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
func (sd *spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
return sd.headRevisionInternal(ctx)
}

func (sd spannerDatastore) now(ctx context.Context) (time.Time, error) {
func (sd *spannerDatastore) now(ctx context.Context) (time.Time, error) {
var timestamp time.Time
if err := sd.client.Single().Query(ctx, spanner.NewStatement("SELECT CURRENT_TIMESTAMP()")).Do(func(r *spanner.Row) error {
return r.Columns(&timestamp)
Expand Down
4 changes: 0 additions & 4 deletions internal/datastore/spanner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ const (

tableMetadata = "metadata"
colUniqueID = "unique_id"

tableCounters = "relationship_estimate_counters"
colID = "id"
colCount = "count"
)

var allRelationshipCols = []string{
Expand Down
34 changes: 22 additions & 12 deletions internal/datastore/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"regexp"
"strconv"
"sync"
"time"

"cloud.google.com/go/spanner"
Expand Down Expand Up @@ -61,6 +62,8 @@ const (
defaultChangeStreamRetention = 24 * time.Hour
)

const tableSizesStatsTable = "spanner_sys.table_sizes_stats_1hour"

var (
sql = sq.StatementBuilder.PlaceholderFormat(sq.AtP)
tracer = otel.Tracer("spicedb/internal/datastore/spanner")
Expand All @@ -78,6 +81,11 @@ type spannerDatastore struct {
client *spanner.Client
config spannerOptions
database string

cachedEstimatedBytesPerRelationshipLock sync.RWMutex
cachedEstimatedBytesPerRelationship uint64

tableSizesStatsTable string
}

// NewSpannerDatastore returns a datastore backed by cloud spanner
Expand Down Expand Up @@ -143,7 +151,7 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())*
config.maxRevisionStalenessPercent) * time.Nanosecond

ds := spannerDatastore{
ds := &spannerDatastore{
RemoteClockRevisions: revisions.NewRemoteClockRevisions(
defaultChangeStreamRetention,
maxRevisionStaleness,
Expand All @@ -153,11 +161,14 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
CommonDecoder: revisions.CommonDecoder{
Kind: revisions.Timestamp,
},
client: client,
config: config,
database: database,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
watchBufferLength: config.watchBufferLength,
client: client,
config: config,
database: database,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
watchBufferLength: config.watchBufferLength,
cachedEstimatedBytesPerRelationship: 0,
cachedEstimatedBytesPerRelationshipLock: sync.RWMutex{},
tableSizesStatsTable: tableSizesStatsTable,
}
ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal)

Expand Down Expand Up @@ -195,7 +206,7 @@ func (t *traceableRTX) Query(ctx context.Context, statement spanner.Statement) *
return t.delegate.Query(ctx, statement)
}

func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader {
func (sd *spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader {
r := revisionRaw.(revisions.TimestampRevision)

txSource := func() readTX {
Expand All @@ -205,7 +216,7 @@ func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datast
return spannerReader{executor, txSource}
}

func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
func (sd *spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
vroldanbet marked this conversation as resolved.
Show resolved Hide resolved
config := options.NewRWTOptionsWithOptions(opts...)

ctx, span := tracer.Start(ctx, "ReadWriteTx")
Expand All @@ -221,7 +232,6 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF
rwt := spannerReadWriteTXN{
spannerReader{executor, txSource},
spannerRWT,
sd.config.disableStats,
}
err := func() error {
innerCtx, innerSpan := tracer.Start(ctx, "TxUserFunc")
Expand All @@ -248,7 +258,7 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF
return revisions.NewForTime(ts), nil
}

func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
func (sd *spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
headMigration, err := migrations.SpannerMigrations.HeadRevision()
if err != nil {
return datastore.ReadyState{}, fmt.Errorf("invalid head migration found for spanner: %w", err)
Expand All @@ -275,11 +285,11 @@ func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState
}, nil
}

func (sd spannerDatastore) Features(_ context.Context) (*datastore.Features, error) {
func (sd *spannerDatastore) Features(_ context.Context) (*datastore.Features, error) {
return &datastore.Features{Watch: datastore.Feature{Enabled: true}}, nil
}

func (sd spannerDatastore) Close() error {
func (sd *spannerDatastore) Close() error {
sd.client.Close()
return nil
}
Expand Down
109 changes: 107 additions & 2 deletions internal/datastore/spanner/spanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@ import (
"testing"
"time"

"cloud.google.com/go/spanner"
admin "cloud.google.com/go/spanner/admin/database/apiv1"
"cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

testdatastore "github.com/authzed/spicedb/internal/testserver/datastore"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/test"
corev1 "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
)

// Implement TestableDatastore interface
func (sd spannerDatastore) ExampleRetryableError() error {
func (sd *spannerDatastore) ExampleRetryableError() error {
return status.New(codes.Aborted, "retryable").Err()
}

Expand All @@ -38,5 +43,105 @@ func TestSpannerDatastore(t *testing.T) {
return ds
})
return ds, nil
}), test.WithCategories(test.GCCategory, test.WatchCategory))
}), test.WithCategories(test.GCCategory, test.WatchCategory, test.StatsCategory))

t.Run("TestFakeStats", createDatastoreTest(
b,
FakeStatsTest,
))
}

type datastoreTestFunc func(t *testing.T, ds datastore.Datastore)

func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) {
return func(t *testing.T) {
ctx := context.Background()
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := NewSpannerDatastore(ctx, uri, options...)
require.NoError(t, err)
return ds
})
defer ds.Close()

tf(t, ds)
}
}

const createFakeStatsTable = `
CREATE TABLE fake_stats_table (
interval_end TIMESTAMP,
table_name STRING(MAX),
used_bytes INT64,
) PRIMARY KEY (table_name, interval_end)
`

func FakeStatsTest(t *testing.T, ds datastore.Datastore) {
spannerDS := ds.(*spannerDatastore)
spannerDS.tableSizesStatsTable = "fake_stats_table"
vroldanbet marked this conversation as resolved.
Show resolved Hide resolved

spannerClient := spannerDS.client
ctx := context.Background()

adminClient, err := admin.NewDatabaseAdminClient(ctx)
require.NoError(t, err)

// Manually add the stats table to simulate the table that the emulator doesn't create.
updateOp, err := adminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{
Database: spannerClient.DatabaseName(),
Statements: []string{
createFakeStatsTable,
},
})
require.NoError(t, err)

err = updateOp.Wait(ctx)
require.NoError(t, err)

// Call stats with no stats rows and no relationship rows.
stats, err := ds.Statistics(ctx)
require.NoError(t, err)
require.Equal(t, uint64(0), stats.EstimatedRelationshipCount)

// Add some relationships.
_, err = ds.ReadWriteTx(ctx, func(ctx context.Context, tx datastore.ReadWriteTransaction) error {
return tx.WriteRelationships(ctx, []*corev1.RelationTupleUpdate{
tuple.Create(tuple.MustParse("document:foo#viewer@user:tom")),
tuple.Create(tuple.MustParse("document:foo#viewer@user:sarah")),
tuple.Create(tuple.MustParse("document:foo#viewer@user:fred")),
})
})
require.NoError(t, err)

// Call stats with no stats rows and some relationship rows.
stats, err = ds.Statistics(ctx)
require.NoError(t, err)
require.Equal(t, uint64(0), stats.EstimatedRelationshipCount)

// Add some stats row with a byte count.
_, err = spannerClient.Apply(ctx, []*spanner.Mutation{
spanner.Insert("fake_stats_table", []string{"interval_end", "table_name", "used_bytes"}, []interface{}{
time.Now().UTC().Add(-100 * time.Second), tableRelationship, 100,
}),
})
require.NoError(t, err)

// Call stats with a stats row and some relationship rows and ensure we get an estimate.
stats, err = ds.Statistics(ctx)
require.NoError(t, err)
require.Equal(t, uint64(3), stats.EstimatedRelationshipCount)

// Add some more relationships.
_, err = ds.ReadWriteTx(ctx, func(ctx context.Context, tx datastore.ReadWriteTransaction) error {
return tx.WriteRelationships(ctx, []*corev1.RelationTupleUpdate{
tuple.Create(tuple.MustParse("document:foo#viewer@user:tommy1236512365123651236512365123612365123655")),
tuple.Create(tuple.MustParse("document:foo#viewer@user:sara1236512365123651236512365123651236512365")),
tuple.Create(tuple.MustParse("document:foo#viewer@user:freddy1236512365123651236512365123651236512365")),
})
})
require.NoError(t, err)

// Call stats again and ensure it uses the cached relationship size value, even if we'd addded more relationships.
stats, err = ds.Statistics(ctx)
require.NoError(t, err)
require.Equal(t, uint64(3), stats.EstimatedRelationshipCount)
}