Skip to content

Commit

Permalink
Merge pull request #1770 from josephschorr/crdb-perf-improvement
Browse files Browse the repository at this point in the history
Change CRDB driver to use new method for getting transaction timestamp
  • Loading branch information
josephschorr committed Feb 29, 2024
2 parents 66a871c + 8e6ade9 commit 57206a1
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 82 deletions.
50 changes: 33 additions & 17 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ const (
errUnableToInstantiate = "unable to instantiate datastore"
errRevision = "unable to find revision: %w"

querySelectNow = "SELECT cluster_logical_timestamp()"
queryShowZoneConfig = "SHOW ZONE CONFIGURATION FOR RANGE default;"
querySelectNow = "SELECT cluster_logical_timestamp()"
queryTransactionNowPreV23 = querySelectNow
queryTransactionNow = "SHOW COMMIT TIMESTAMP"
queryShowZoneConfig = "SHOW ZONE CONFIGURATION FOR RANGE default;"

livingTupleConstraint = "pk_relation_tuple"
)
Expand Down Expand Up @@ -122,6 +124,12 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
changefeedQuery = queryChangefeedPreV22
}

transactionNowQuery := queryTransactionNow
if version.Major < 23 {
log.Info().Object("version", version).Msg("using transaction now query for CRDB version < 23")
transactionNowQuery = queryTransactionNowPreV23
}

clusterTTLNanos, err := readClusterTTLNanos(initCtx, initPool)
if err != nil {
return nil, fmt.Errorf("unable to read cluster gc window: %w", err)
Expand Down Expand Up @@ -172,8 +180,9 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
writeOverlapKeyer: keyer,
overlapKeyInit: keySetInit,
disableStats: config.disableStats,
beginChangefeedQuery: changefeedQuery,
transactionNowQuery: transactionNowQuery,
analyzeBeforeStatistics: config.analyzeBeforeStatistics,
}
ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal)

Expand Down Expand Up @@ -254,9 +263,10 @@ type crdbDatastore struct {
watchBufferWriteTimeout time.Duration
writeOverlapKeyer overlapKeyer
overlapKeyInit func(ctx context.Context) keySet
disableStats bool
analyzeBeforeStatistics bool

beginChangefeedQuery string
transactionNowQuery string

featureGroup singleflight.Group[string, *datastore.Features]

Expand Down Expand Up @@ -321,21 +331,11 @@ func (cds *crdbDatastore) ReadWriteTx(
}
}

if cds.disableStats {
var err error
commitTimestamp, err = readCRDBNow(ctx, querier)
if err != nil {
return fmt.Errorf("error getting commit timestamp: %w", err)
}
return nil
}

var err error
commitTimestamp, err = updateCounter(ctx, tx, rwt.relCountChange)
commitTimestamp, err = cds.readTransactionCommitRev(ctx, querier)
if err != nil {
return fmt.Errorf("error updating relationship counter: %w", err)
return fmt.Errorf("error getting commit timestamp: %w", err)
}

return nil
})
if err != nil {
Expand Down Expand Up @@ -371,7 +371,9 @@ func (cds *crdbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState,
return datastore.ReadyState{}, err
}

if version != headMigration {
// TODO(jschorr): Remove the check for the older migration once we are confident
// that all users have migrated past it.
if version != headMigration && version != "add-caveats" {
return datastore.ReadyState{
Message: fmt.Sprintf(
"datastore is not migrated: currently at revision `%s`, but requires `%s`. Please run `spicedb migrate`.",
Expand Down Expand Up @@ -467,6 +469,20 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er
return &features, nil
}

func (cds *crdbDatastore) readTransactionCommitRev(ctx context.Context, reader pgxcommon.DBFuncQuerier) (datastore.Revision, error) {
ctx, span := tracer.Start(ctx, "readTransactionCommitRev")
defer span.End()

var hlcNow decimal.Decimal
if err := reader.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
return row.Scan(&hlcNow)
}, cds.transactionNowQuery); err != nil {
return datastore.NoRevision, fmt.Errorf("unable to read timestamp: %w", err)
}

return revisions.NewForHLC(hlcNow)
}

func readCRDBNow(ctx context.Context, reader pgxcommon.DBFuncQuerier) (datastore.Revision, error) {
ctx, span := tracer.Start(ctx, "readCRDBNow")
defer span.End()
Expand Down
8 changes: 7 additions & 1 deletion internal/datastore/crdb/crdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestCRDBDatastore(t *testing.T) {
RevisionQuantization(revisionQuantization),
WatchBufferLength(watchBufferLength),
OverlapStrategy(overlapStrategyPrefix),
DebugAnalyzeBeforeStatistics(),
)
require.NoError(t, err)
return ds
Expand Down Expand Up @@ -84,6 +85,7 @@ func TestCRDBDatastoreWithFollowerReads(t *testing.T) {
GCWindow(gcWindow),
RevisionQuantization(quantization),
FollowerReadDelay(followerReadDelay),
DebugAnalyzeBeforeStatistics(),
)
require.NoError(err)
return ds
Expand Down Expand Up @@ -134,15 +136,19 @@ func TestWatchFeatureDetection(t *testing.T) {
require.NoError(t, err)
},
expectEnabled: false,
expectMessage: "Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: ERROR: user unprivileged does not have CHANGEFEED privilege on relation relation_tuple (SQLSTATE 42501)",
expectMessage: "(SQLSTATE 42501)",
},
{
name: "rangefeeds enabled, user has permission",
postInit: func(ctx context.Context, adminConn *pgx.Conn) {
_, err = adminConn.Exec(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`)
require.NoError(t, err)

_, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT CHANGEFEED ON TABLE testspicedb.%s TO unprivileged;`, tableTuple))
require.NoError(t, err)

_, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT SELECT ON TABLE testspicedb.%s TO unprivileged;`, tableTuple))
require.NoError(t, err)
},
expectEnabled: true,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package migrations

import (
"context"

"github.com/jackc/pgx/v5"
)

const (
dropStatsTable = `DROP TABLE relationship_estimate_counters;`
)

func init() {
err := CRDBMigrations.Register("remove-stats-table", "add-caveats", removeStatsTable, noAtomicMigration)
if err != nil {
panic("failed to register migration: " + err.Error())
}
}

func removeStatsTable(ctx context.Context, conn *pgx.Conn) error {
if _, err := conn.Exec(ctx, dropStatsTable); err != nil {
return err
}
return nil
}
17 changes: 10 additions & 7 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type crdbOptions struct {
maxRetries uint8
overlapStrategy string
overlapKey string
disableStats bool
enableConnectionBalancing bool
analyzeBeforeStatistics bool

enablePrometheusStats bool
}
Expand Down Expand Up @@ -65,7 +65,6 @@ func generateConfig(options []Option) (crdbOptions, error) {
maxRetries: defaultMaxRetries,
overlapKey: defaultOverlapKey,
overlapStrategy: defaultOverlapStrategy,
disableStats: false,
enablePrometheusStats: defaultEnablePrometheusStats,
enableConnectionBalancing: defaultEnableConnectionBalancing,
connectRate: defaultConnectRate,
Expand Down Expand Up @@ -283,11 +282,6 @@ func OverlapKey(key string) Option {
return func(po *crdbOptions) { po.overlapKey = key }
}

// DisableStats disables recording counts to the stats table
func DisableStats(disable bool) Option {
return func(po *crdbOptions) { po.disableStats = disable }
}

// WithEnablePrometheusStats marks whether Prometheus metrics provided by the Postgres
// clients being used by the datastore are enabled.
//
Expand All @@ -303,3 +297,12 @@ func WithEnablePrometheusStats(enablePrometheusStats bool) Option {
func WithEnableConnectionBalancing(connectionBalancing bool) Option {
return func(po *crdbOptions) { po.enableConnectionBalancing = connectionBalancing }
}

// DebugAnalyzeBeforeStatistics signals to the Statistics method that it should
// run Analyze on the database before returning statistics. This should only be
// used for debug and testing.
//
// Disabled by default.
func DebugAnalyzeBeforeStatistics() Option {
return func(po *crdbOptions) { po.analyzeBeforeStatistics = true }
}
133 changes: 79 additions & 54 deletions internal/datastore/crdb/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,24 @@ package crdb
import (
"context"
"fmt"
"math/rand"
"time"
"slices"

"github.com/Masterminds/squirrel"
"github.com/jackc/pgx/v5"
"github.com/shopspring/decimal"
"github.com/rs/zerolog/log"

pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
"github.com/authzed/spicedb/internal/datastore/revisions"
"github.com/authzed/spicedb/pkg/datastore"
)

const (
tableMetadata = "metadata"
colUniqueID = "unique_id"

tableCounters = "relationship_estimate_counters"
colID = "id"
colCount = "count"
)

var (
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
queryRelationshipEstimate = fmt.Sprintf("SELECT COALESCE(SUM(%s), 0) FROM %s AS OF SYSTEM TIME follower_read_timestamp()", colCount, tableCounters)

upsertCounterQuery = psql.Insert(tableCounters).Columns(
colID,
colCount,
).Suffix(fmt.Sprintf("ON CONFLICT (%[1]s) DO UPDATE SET %[2]s = %[3]s.%[2]s + EXCLUDED.%[2]s RETURNING cluster_logical_timestamp()", colID, colCount, tableCounters))

rng = rand.NewSource(time.Now().UnixNano())

uniqueID string
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
uniqueID string
)

func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
Expand All @@ -52,14 +37,6 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro
}

var nsDefs []datastore.RevisionedNamespace
var relCount int64

if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
return row.Scan(&relCount)
}, queryRelationshipEstimate); err != nil {
return datastore.Stats{}, fmt.Errorf("unable to read relationship count: %w", err)
}

if err := cds.readPool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, "SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()")
if err != nil {
Expand All @@ -76,37 +53,85 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro
return datastore.Stats{}, err
}

// NOTE: this is a stop-gap solution to prevent panics in telemetry collection
if relCount < 0 {
relCount = 0
}

return datastore.Stats{
UniqueID: uniqueID,
EstimatedRelationshipCount: uint64(relCount),
ObjectTypeStatistics: datastore.ComputeObjectTypeStats(nsDefs),
}, nil
}
if cds.analyzeBeforeStatistics {
if err := cds.readPool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error {
if _, err := tx.Exec(ctx, "ANALYZE "+tableTuple); err != nil {
return fmt.Errorf("unable to analyze tuple table: %w", err)
}

func updateCounter(ctx context.Context, tx pgx.Tx, change int64) (datastore.Revision, error) {
counterID := make([]byte, 2)
// nolint:gosec
// G404 use of non cryptographically secure random number generator is not concern here,
// as this is only used to randomly distributed the counters across multiple rows and reduce write row contention
_, err := rand.New(rng).Read(counterID)
if err != nil {
return datastore.NoRevision, fmt.Errorf("unable to select random counter: %w", err)
return nil
}); err != nil {
return datastore.Stats{}, err
}
}

sql, args, err := upsertCounterQuery.Values(counterID, change).ToSql()
if err != nil {
return datastore.NoRevision, fmt.Errorf("unable to prepare upsert counter sql: %w", err)
}
var estimatedRelCount uint64
if err := cds.readPool.QueryFunc(ctx, func(ctx context.Context, rows pgx.Rows) error {
hasRows := false

for rows.Next() {
hasRows = true
values, err := rows.Values()
if err != nil {
log.Warn().Err(err).Msg("unable to read statistics")
return nil
}

// Find the row whose column_names contains the expected columns for the
// full relationship.
isFullRelationshipRow := false
for index, fd := range rows.FieldDescriptions() {
if fd.Name != "column_names" {
continue
}

columnNames, ok := values[index].([]any)
if !ok {
log.Warn().Msg("unable to read column names")
return nil
}

if slices.Contains(columnNames, "namespace") &&
slices.Contains(columnNames, "object_id") &&
slices.Contains(columnNames, "relation") &&
slices.Contains(columnNames, "userset_namespace") &&
slices.Contains(columnNames, "userset_object_id") &&
slices.Contains(columnNames, "userset_relation") {
isFullRelationshipRow = true
break
}
}

if !isFullRelationshipRow {
continue
}

// Read the estimated relationship count.
for index, fd := range rows.FieldDescriptions() {
if fd.Name != "row_count" {
continue
}

rowCount, ok := values[index].(int64)
if !ok {
log.Warn().Msg("unable to read row count")
return nil
}

estimatedRelCount = uint64(rowCount)
return nil
}
}

var timestamp decimal.Decimal
if err := tx.QueryRow(ctx, sql, args...).Scan(&timestamp); err != nil {
return datastore.NoRevision, fmt.Errorf("unable to executed upsert counter query: %w", err)
log.Warn().Bool("has-rows", hasRows).Msg("unable to find row count in statistics query result")
return nil
}, "SHOW STATISTICS FOR TABLE relation_tuple;"); err != nil {
return datastore.Stats{}, fmt.Errorf("unable to query unique estimated row count: %w", err)
}

return revisions.NewForHLC(timestamp)
return datastore.Stats{
UniqueID: uniqueID,
EstimatedRelationshipCount: estimatedRelCount,
ObjectTypeStatistics: datastore.ComputeObjectTypeStats(nsDefs),
}, nil
}
2 changes: 1 addition & 1 deletion internal/testserver/datastore/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

const (
CRDBTestVersionTag = "v22.2.0"
CRDBTestVersionTag = "v23.1.16"

enableRangefeeds = `SET CLUSTER SETTING kv.rangefeed.enabled = true;`
)
Expand Down
1 change: 0 additions & 1 deletion pkg/cmd/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,6 @@ func newCRDBDatastore(ctx context.Context, opts Config) (datastore.Datastore, er
crdb.OverlapStrategy(opts.OverlapStrategy),
crdb.WatchBufferLength(opts.WatchBufferLength),
crdb.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout),
crdb.DisableStats(opts.DisableStats),
crdb.WithEnablePrometheusStats(opts.EnableDatastoreMetrics),
crdb.WithEnableConnectionBalancing(opts.EnableConnectionBalancing),
crdb.ConnectRate(opts.ConnectRate),
Expand Down

0 comments on commit 57206a1

Please sign in to comment.