Skip to content

Commit

Permalink
Merge #70287
Browse files Browse the repository at this point in the history
70287: spanconfig: introduce spanconfig.StoreWriter (and its impl) r=irfansharif a=irfansharif

In #69172 we introduced a spanconfig.StoreReader interface to abstract
away the gossiped system config span. We motivated that PR by teasing
a future implementation of the same interface, an in-memory data
structure to maintain a mapping between between spans and configs
(powered through a view over system.span_configurations introduced in
[#69047](69047)). This PR introduces just that.

Intended (future) usages:
- [#69614](69614) introduces the KVWatcher interface, listening in on
  system.span_configurations. The updates generated by it will be used
  to populate per-store instantiations of this data structure, with an
  eye towards providing a "drop-in" replacement of the gossiped system
  config span (conveniently implementing the sibling
  spanconfig.StoreReader interface).
- [#69661](69661) introduces the SQLWatcher interface, listening in on changes to
  system.{descriptor,zones} and generating denormalized span config
  updates for every descriptor/zone config change. These updates will
  need to be diffed against a spanconfig.StoreWriter populated with the
  existing contents of KVAccessor to generate the "targeted" diffs
  KVAccessor expects.

Release note: None

Co-authored-by: irfan sharif <irfanmahmoudsharif@gmail.com>
  • Loading branch information
craig[bot] and irfansharif committed Oct 2, 2021
2 parents 3426e8d + 62f1237 commit 109e07b
Show file tree
Hide file tree
Showing 15 changed files with 1,075 additions and 19 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ ALL_TESTS = [
"//pkg/settings:settings_test",
"//pkg/spanconfig/spanconfigkvaccessor:spanconfigkvaccessor_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/spanconfig/spanconfigstore:spanconfigstore_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
"//pkg/sql/catalog/catformat:catformat_test",
"//pkg/sql/catalog/catprivilege:catprivilege_test",
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/zonepb/zone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1429,3 +1429,8 @@ func TestZoneConfigToSpanConfigConversion(t *testing.T) {
require.Equal(t, tc.expectSpanConfig, spanConfig)
}
}

func TestDefaultZoneAndSpanConfigs(t *testing.T) {
converted := DefaultZoneConfigRef().AsSpanConfig()
require.True(t, converted.Equal(roachpb.TestingDefaultSpanConfig()))
}
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestRangeSplitsWithWritePressure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// Override default span config.
cfg := kvserver.TestingDefaultSpanConfig()
cfg := roachpb.TestingDefaultSpanConfig()
cfg.RangeMaxBytes = 1 << 18

// Manually create the local test cluster so that the split queue
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/queue_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestBaseQueueConcurrent(t *testing.T) {
cfg: StoreConfig{
Clock: hlc.NewClock(hlc.UnixNano, time.Second),
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
DefaultSpanConfig: TestingDefaultSpanConfig(),
DefaultSpanConfig: roachpb.TestingDefaultSpanConfig(),
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/split_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestSplitQueueShouldQueue(t *testing.T) {
repl.mu.Lock()
repl.mu.state.Stats = &enginepb.MVCCStats{KeyBytes: test.bytes}
repl.mu.Unlock()
conf := TestingDefaultSpanConfig()
conf := roachpb.TestingDefaultSpanConfig()
conf.RangeMaxBytes = test.maxBytes
repl.SetSpanConfig(conf)

Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2956,8 +2956,3 @@ func min(a, b int) int {
}
return b
}

// TestingDefaultSpanConfig exposes the default span config for testing purposes.
func TestingDefaultSpanConfig() roachpb.SpanConfig {
return zonepb.DefaultZoneConfigRef().AsSpanConfig()
}
20 changes: 20 additions & 0 deletions pkg/roachpb/span_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,23 @@ func (c ConstraintsConjunction) String() string {
}
return sb.String()
}

// TestingDefaultSpanConfig exports the default span config for testing purposes.
func TestingDefaultSpanConfig() SpanConfig {
return SpanConfig{
RangeMinBytes: 128 << 20, // 128 MB
RangeMaxBytes: 512 << 20, // 512 MB
// Use 25 hours instead of the previous 24 to make users successful by
// default. Users desiring to take incremental backups every 24h may
// incorrectly assume that the previous default 24h was sufficient to do
// that. But the equation for incremental backups is:
// GC TTLSeconds >= (desired backup interval) (time to perform incremental backup)
// We think most new users' incremental backups will complete within an
// hour, and larger clusters will have more experienced operators and will
// understand how to change these settings if needed.
GCPolicy: GCPolicy{
TTLSeconds: 25 * 60 * 60,
},
NumReplicas: 3,
}
}
108 changes: 97 additions & 11 deletions pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,110 @@ type ReconciliationDependencies interface {
// through the KVAccessor.
}

// Store is a data structure used to store span configs.
// Store is a data structure used to store spans and their corresponding
// configs.
type Store interface {
StoreWriter
StoreReader

// TODO(irfansharif): We'll want to add a StoreWriter interface here once we
// implement a data structure to store span configs. We expect this data
// structure to be used in KV to eventually replace the use of the
// gossip-backed system config span.
}

// Silence the unused linter.
var _ Store = nil
// StoreWriter is the write-only portion of the Store interface.
type StoreWriter interface {
// Apply applies the given update[1]. It also returns the existing spans that
// were deleted and entries that were newly added to make room for the
// update. The deleted list can double as a list of overlapping spans in the
// Store, provided the update is not a no-op[2].
//
// Span configs are stored in non-overlapping fashion. When an update
// overlaps with existing configs, the existing configs are deleted. If the
// overlap is only partial, the non-overlapping components of the existing
// configs are re-added. If the update itself is adding an entry, that too
// is added. This is best illustrated with the following example:
//
// [--- X --) is a span with config X
//
// Store | [--- A ----)[------------- B -----------)[---------- C -----)
// Update | [------------------ D -------------)
// |
// Deleted | [------------- B -----------)[---------- C -----)
// Added | [------------------ D -------------)[--- C -----)
// Store* | [--- A ----)[------------------ D -------------)[--- C -----)
//
// TODO(irfansharif): We'll make use of the dryrun option in a future PR
// when wiring up the reconciliation job to use the KVAccessor. Since the
// KVAccessor is a "targeted" API (the spans being deleted/upserted
// have to already be present with the exact same bounds), we'll dryrun an
// update against a StoreWriter (pre-populated with the entries present in
// KV) to generate the targeted deletes and upserts we'd need to issue.
// After successfully installing them in KV, we can keep our StoreWriter
// up-to-date by actually applying the update.
//
// There's also the question of a "full reconciliation pass". We'll be
// generating updates reactively listening in on changes to
// system.{descriptor,zones} (see SQLWatcher). It's possible then for a
// suspended tenant's table history to be GC-ed away and for its SQLWatcher
// to never detect that a certain table/index/partition has been deleted.
// Left as is, this results in us never issuing a corresponding span config
// deletion request. We'd be leaving a bunch of delete-able span configs
// lying around, and a bunch of empty ranges as a result of those. A "full
// reconciliation pass" is our attempt to find all these extraneous entries
// in KV and to delete them.
//
// We can use a StoreWriter here too (one that's pre-populated with the
// contents of KVAccessor, as before). We'd iterate through all descriptors,
// find all overlapping spans, issue KVAccessor deletes for them, and upsert
// the descriptor's span config[3]. As for the StoreWriter itself, we'd
// simply delete the overlapping entries. After iterating through all the
// descriptors, we'd finally issue KVAccessor deletes for all span configs
// still remaining in the Store.
//
// TODO(irfansharif): The descriptions above presume holding the entire set
// of span configs in memory, but we could break away from that by adding
// pagination + retrieval limit to the GetSpanConfigEntriesFor API. We'd
// then paginate through chunks of the keyspace at a time, do a "full
// reconciliation pass" over just that chunk, and continue.
//
// [1]: Unless dryrun is true. We'll still generate the same {deleted,added}
// lists.
// [2]: We could alternatively expose a GetAllOverlapping() API to make
// things clearer.
// [3]: We could skip the delete + upsert dance if the descriptor's exact
// span config entry already exists in KV. Using Apply (dryrun=true)
// against a StoreWriter (populated using KVAccessor contents) using
// the descriptor's span config entry would return empty lists,
// indicating a no-op.
Apply(ctx context.Context, update Update, dryrun bool) (
deleted []roachpb.Span, added []roachpb.SpanConfigEntry,
)
}

// StoreReader is the read-only portion of the Store interface. It's an adaptor
// interface implemented by config.SystemConfig to let us later swap out the
// source with one backed by a view of `system.span_configurations`.
// StoreReader is the read-only portion of the Store interface. It doubles as an
// adaptor interface for config.SystemConfig.
type StoreReader interface {
NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool
ComputeSplitKey(ctx context.Context, start, end roachpb.RKey) roachpb.RKey
GetSpanConfigForKey(ctx context.Context, key roachpb.RKey) (roachpb.SpanConfig, error)
}

// Update captures what span has seen a config change. It will be the unit of
// what a {SQL,KV}Watcher emits, and what can be applied to a StoreWriter.
type Update struct {
// Span captures the key span being updated.
Span roachpb.Span

// Config captures the span config the key span was updated to. An empty
// config indicates the span config being deleted.
Config roachpb.SpanConfig
}

// Deletion returns true if the update corresponds to a span config being
// deleted.
func (u Update) Deletion() bool {
return u.Config.IsEmpty()
}

// Addition returns true if the update corresponds to a span config being
// added.
func (u Update) Addition() bool {
return !u.Deletion()
}
35 changes: 35 additions & 0 deletions pkg/spanconfig/spanconfigstore/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "spanconfigstore",
srcs = [
"shadow.go",
"store.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/spanconfig",
"//pkg/util/interval",
"//pkg/util/log",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "spanconfigstore_test",
srcs = ["store_test.go"],
data = glob(["testdata/**"]),
embed = [":spanconfigstore"],
deps = [
"//pkg/roachpb:with-mocks",
"//pkg/spanconfig",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//require",
],
)
91 changes: 91 additions & 0 deletions pkg/spanconfig/spanconfigstore/shadow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package spanconfigstore

import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// ShadowReader wraps around two spanconfig.StoreReaders and logs warnings (if
// expensive logging is enabled) when there are divergent results from the two.
type ShadowReader struct {
new, old spanconfig.StoreReader
}

// NewShadowReader instantiates a new shadow reader.
func NewShadowReader(new, old spanconfig.StoreReader) *ShadowReader {
return &ShadowReader{
new: new,
old: old,
}
}

var _ = NewShadowReader // defeat the unused linter.

var _ spanconfig.StoreReader = &ShadowReader{}

// NeedsSplit is part of the spanconfig.StoreReader interface.
func (s *ShadowReader) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool {
newResult := s.new.NeedsSplit(ctx, start, end)
if log.ExpensiveLogEnabled(ctx, 1) {
oldResult := s.old.NeedsSplit(ctx, start, end)
if newResult != oldResult {
log.Warningf(ctx, "needs split: mismatched responses between old result (%t) and new (%t) for start=%s end=%s",
oldResult, newResult, start.String(), end.String())
}
}

return newResult
}

// ComputeSplitKey is part of the spanconfig.StoreReader interface.
func (s *ShadowReader) ComputeSplitKey(ctx context.Context, start, end roachpb.RKey) roachpb.RKey {
newResult := s.new.ComputeSplitKey(ctx, start, end)
if log.ExpensiveLogEnabled(ctx, 1) {
oldResult := s.old.ComputeSplitKey(ctx, start, end)
if !newResult.Equal(oldResult) {
str := func(k roachpb.RKey) string {
if len(k) == 0 {
return ""
}
return k.String()
}

log.Warningf(ctx, "compute split key: mismatched responses between old result (%s) and new (%s) for start=%s end=%s",
str(oldResult), str(newResult), str(start), str(end))
}
}
return newResult
}

// GetSpanConfigForKey is part of the spanconfig.StoreReader interface.
func (s *ShadowReader) GetSpanConfigForKey(
ctx context.Context, key roachpb.RKey,
) (roachpb.SpanConfig, error) {
newResult, errNew := s.new.GetSpanConfigForKey(ctx, key)
if log.ExpensiveLogEnabled(ctx, 1) {
oldResult, errOld := s.old.GetSpanConfigForKey(ctx, key)
if !newResult.Equal(oldResult) {
log.Warningf(ctx, "get span config for key: mismatched responses between old result (%s) and new(%s) for key=%s",
oldResult.String(), newResult.String(), key.String())
}
if !errors.Is(errNew, errOld) {
log.Warningf(ctx, "get span config for key: mismatched errors between old result (%s) and new (%s) for key=%s",
errOld, errNew, key.String())
}
}
return newResult, errNew
}

0 comments on commit 109e07b

Please sign in to comment.