Skip to content

Commit

Permalink
Switch spanner datastore to use the built-in stats table for estimati…
Browse files Browse the repository at this point in the history
…ng rel count

While this will be far less accurate of an estimate, it removes the need to write to a stats table on every write and delete, which should help with performance
  • Loading branch information
josephschorr committed May 8, 2024
1 parent 07a5551 commit 19718f3
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 149 deletions.
62 changes: 38 additions & 24 deletions internal/datastore/spanner/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ import (
)

type spannerOptions struct {
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
revisionQuantization time.Duration
followerReadDelay time.Duration
maxRevisionStalenessPercent float64
credentialsFilePath string
emulatorHost string
disableStats bool
readMaxOpen int
writeMaxOpen int
minSessions uint64
maxSessions uint64
migrationPhase string
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
revisionQuantization time.Duration
followerReadDelay time.Duration
maxRevisionStalenessPercent float64
credentialsFilePath string
emulatorHost string
disableStats bool
readMaxOpen int
writeMaxOpen int
minSessions uint64
maxSessions uint64
migrationPhase string
estimatedBytesPerRelationship uint64
}

type migrationPhase uint8
Expand All @@ -45,6 +46,12 @@ const (
maxRevisionQuantization = 24 * time.Hour
)

// DefaultEstimatedBytesPerRelationship is the default *estimated* number of bytes per relationship. This
// is based on various sampling and may be inaccurate for your specific use case, but that's okay, since
// it's used as part of a heuristic to estimate the number of relationships in the system solely for
// telemetry purposes.
const DefaultEstimatedBytesPerRelationship = 20

// Option provides the facility to configure how clients within the Spanner
// datastore interact with the running Spanner database.
type Option func(*spannerOptions)
Expand All @@ -54,17 +61,18 @@ func generateConfig(options []Option) (spannerOptions, error) {
// This determines if there are more CPU cores to increase the default number of connections
defaultNumberConnections := max(1, math.Round(float64(runtime.GOMAXPROCS(0))))
computed := spannerOptions{
watchBufferLength: defaultWatchBufferLength,
watchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
revisionQuantization: defaultRevisionQuantization,
followerReadDelay: defaultFollowerReadDelay,
maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent,
disableStats: defaultDisableStats,
readMaxOpen: int(defaultNumberConnections),
writeMaxOpen: int(defaultNumberConnections),
minSessions: 100,
maxSessions: 400,
migrationPhase: "", // no migration
watchBufferLength: defaultWatchBufferLength,
watchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
revisionQuantization: defaultRevisionQuantization,
followerReadDelay: defaultFollowerReadDelay,
maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent,
disableStats: defaultDisableStats,
readMaxOpen: int(defaultNumberConnections),
writeMaxOpen: int(defaultNumberConnections),
minSessions: 100,
maxSessions: 400,
estimatedBytesPerRelationship: DefaultEstimatedBytesPerRelationship,
migrationPhase: "", // no migration
}

for _, option := range options {
Expand Down Expand Up @@ -193,3 +201,9 @@ func MaxSessionCount(maxSessions uint64) Option {
func MigrationPhase(phase string) Option {
return func(po *spannerOptions) { po.migrationPhase = phase }
}

// EstimatedBytesPerRelationship is the estimated number of bytes per relationship
// in the relationship table to use for stats.
func EstimatedBytesPerRelationship(estimatedBytesPerRelationship uint64) Option {
return func(po *spannerOptions) { po.estimatedBytesPerRelationship = estimatedBytesPerRelationship }
}
35 changes: 8 additions & 27 deletions internal/datastore/spanner/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (

type spannerReadWriteTXN struct {
spannerReader
spannerRWT *spanner.ReadWriteTransaction
disableStats bool
spannerRWT *spanner.ReadWriteTransaction
}

const inLimit = 10_000 // https://cloud.google.com/spanner/quotas#query-limits
Expand All @@ -40,12 +39,6 @@ func (rwt spannerReadWriteTXN) WriteRelationships(ctx context.Context, mutations
}
}

if !rwt.disableStats {
if err := updateCounter(ctx, rwt.spannerRWT, rowCountChange); err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
}
}

return nil
}

Expand Down Expand Up @@ -74,15 +67,15 @@ func spannerMutation(
}

func (rwt spannerReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
limitReached, err := deleteWithFilter(ctx, rwt.spannerRWT, filter, rwt.disableStats, opts...)
limitReached, err := deleteWithFilter(ctx, rwt.spannerRWT, filter, opts...)
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

return limitReached, nil
}

func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool, opts ...options.DeleteOptionsOption) (bool, error) {
func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
var delLimit uint64
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
Expand All @@ -94,34 +87,28 @@ func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, fi

var numDeleted int64
if delLimit > 0 {
nu, err := deleteWithFilterAndLimit(ctx, rwt, filter, disableStats, delLimit)
nu, err := deleteWithFilterAndLimit(ctx, rwt, filter, delLimit)
if err != nil {
return false, err
}
numDeleted = nu
} else {
nu, err := deleteWithFilterAndNoLimit(ctx, rwt, filter, disableStats)
nu, err := deleteWithFilterAndNoLimit(ctx, rwt, filter)
if err != nil {
return false, err
}

numDeleted = nu
}

if !disableStats {
if err := updateCounter(ctx, rwt, -1*numDeleted); err != nil {
return false, err
}
}

if delLimit > 0 && uint64(numDeleted) == delLimit {
return true, nil
}

return false, nil
}

func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool, delLimit uint64) (int64, error) {
func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, delLimit uint64) (int64, error) {
query := queryTuplesForDelete
filteredQuery, err := applyFilterToQuery(query, filter)
if err != nil {
Expand Down Expand Up @@ -172,7 +159,7 @@ func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransac
return int64(len(mutations)), nil
}

func deleteWithFilterAndNoLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool) (int64, error) {
func deleteWithFilterAndNoLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter) (int64, error) {
query := sql.Delete(tableRelationship)
filteredQuery, err := applyFilterToQuery(query, filter)
if err != nil {
Expand Down Expand Up @@ -278,7 +265,7 @@ func (rwt spannerReadWriteTXN) WriteNamespaces(_ context.Context, newConfigs ...
func (rwt spannerReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error {
for _, nsName := range nsNames {
relFilter := &v1.RelationshipFilter{ResourceType: nsName}
if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter, rwt.disableStats); err != nil {
if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter); err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
}

Expand Down Expand Up @@ -313,12 +300,6 @@ func (rwt spannerReadWriteTXN) BulkLoad(ctx context.Context, iter datastore.Bulk
return 0, fmt.Errorf(errUnableToBulkLoadRelationships, err)
}

if !rwt.disableStats {
if err := updateCounter(ctx, rwt.spannerRWT, int64(numLoaded)); err != nil {
return 0, fmt.Errorf(errUnableToBulkLoadRelationships, err)
}
}

return numLoaded, nil
}

Expand Down
14 changes: 8 additions & 6 deletions internal/datastore/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type spannerDatastore struct {
client *spanner.Client
config spannerOptions
database string

estimatedBytesPerRelationship uint64
}

// NewSpannerDatastore returns a datastore backed by cloud spanner
Expand Down Expand Up @@ -153,11 +155,12 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) (
CommonDecoder: revisions.CommonDecoder{
Kind: revisions.Timestamp,
},
client: client,
config: config,
database: database,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
watchBufferLength: config.watchBufferLength,
client: client,
config: config,
database: database,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
watchBufferLength: config.watchBufferLength,
estimatedBytesPerRelationship: config.estimatedBytesPerRelationship,
}
ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal)

Expand Down Expand Up @@ -221,7 +224,6 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF
rwt := spannerReadWriteTXN{
spannerReader{executor, txSource},
spannerRWT,
sd.config.disableStats,
}
err := func() error {
innerCtx, innerSpan := tracer.Start(ctx, "TxUserFunc")
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/spanner/spanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ func TestSpannerDatastore(t *testing.T) {
return ds
})
return ds, nil
}), test.WithCategories(test.GCCategory, test.WatchCategory))
}), test.WithCategories(test.GCCategory, test.WatchCategory, test.StatsCategory))
}
65 changes: 12 additions & 53 deletions internal/datastore/spanner/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ package spanner
import (
"context"
"fmt"
"math/rand"
"time"

"cloud.google.com/go/spanner"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"

log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
)

// Reference: https://cloud.google.com/spanner/docs/introspection/table-sizes-statistics
var (
queryRelationshipEstimate = fmt.Sprintf("SELECT SUM(%s) FROM %s", colCount, tableCounters)

rng = rand.NewSource(time.Now().UnixNano())
queryRelationshipByteEstimate = fmt.Sprintf(`SELECT used_bytes FROM spanner_sys.table_sizes_stats_1hour WHERE
interval_end = (
SELECT MAX(interval_end)
FROM spanner_sys.table_sizes_stats_1hour
)
AND table_name = '%s'`, tableRelationship)
)

func (sd spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
Expand Down Expand Up @@ -46,57 +46,16 @@ func (sd spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, err
return datastore.Stats{}, fmt.Errorf("unable to read namespaces: %w", err)
}

var estimate spanner.NullInt64
if err := sd.client.Single().Query(ctx, spanner.Statement{SQL: queryRelationshipEstimate}).Do(func(r *spanner.Row) error {
return r.Columns(&estimate)
var byteEstimate spanner.NullInt64
if err := sd.client.Single().Query(ctx, spanner.Statement{SQL: queryRelationshipByteEstimate}).Do(func(r *spanner.Row) error {
return r.Columns(&byteEstimate)
}); err != nil {
return datastore.Stats{}, fmt.Errorf("unable to read row counts: %w", err)
return datastore.Stats{}, fmt.Errorf("unable to read tuples byte count: %w", err)
}

return datastore.Stats{
UniqueID: uniqueID,
ObjectTypeStatistics: datastore.ComputeObjectTypeStats(allNamespaces),
EstimatedRelationshipCount: uint64(estimate.Int64),
EstimatedRelationshipCount: uint64(byteEstimate.Int64) / sd.estimatedBytesPerRelationship,
}, nil
}

func updateCounter(ctx context.Context, rwt *spanner.ReadWriteTransaction, change int64) error {
newValue := change

counterID := make([]byte, 2)
// nolint:gosec
// G404 use of non cryptographically secure random number generator is not concern here,
// as this is only used to randomly distributed the counters across multiple rows and reduce write contention
_, err := rand.New(rng).Read(counterID)
if err != nil {
return fmt.Errorf("unable to select random counter: %w", err)
}

counterRow, err := rwt.ReadRow(ctx, tableCounters, spanner.Key{counterID}, []string{colCount})
if err != nil {
if spanner.ErrCode(err) != codes.NotFound {
return fmt.Errorf("unable to read counter value: %w", err)
}
// In this branch we leave newValue alone because the counter doesn't exist
} else {
var currentValue int64
if err := counterRow.Columns(&currentValue); err != nil {
return fmt.Errorf("unable to decode counter value: %w", err)
}
newValue += currentValue
}

log.Ctx(ctx).Trace().
Bytes("counterID", counterID).
Int64("newValue", newValue).
Int64("change", change).
Msg("updating counter")

if err := rwt.BufferWrite([]*spanner.Mutation{
spanner.InsertOrUpdate(tableCounters, []string{colID, colCount}, []any{counterID, newValue}),
}); err != nil {
return fmt.Errorf("unable to buffer update to counter: %w", err)
}

return nil
}

0 comments on commit 19718f3

Please sign in to comment.