From e1c93b1b2c7bc120ffc15810637dbd8102aa3ef1 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 18 Nov 2020 12:13:47 -0500 Subject: [PATCH 1/2] kv: cap COCKROACH_SCHEDULER_CONCURRENCY at 96 Relates to #56851. In investigations like #56851, we've seen the mutex in the Raft scheduler collapse due to too much concurrency. To address this, we needed to drop the scheduler's goroutine pool size to bound the amount of contention on the mutex to ensure that the scheduler was able to schedule any goroutines. This commit caps this concurrency to 96, instead of letting it grow unbounded as a function of the number of cores on the system. Release note (performance improvement): The Raft processing goroutine pool's size is now capped at 96. This was observed to prevent instability on large machines (32+ vCPU) in clusters with many ranges (50k+ per node). --- pkg/kv/kvserver/store.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 6a247beb02c3..55e128037e1c 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -99,11 +99,18 @@ const ( ) var storeSchedulerConcurrency = envutil.EnvOrDefaultInt( - "COCKROACH_SCHEDULER_CONCURRENCY", 8*runtime.NumCPU()) + // For small machines, we scale the scheduler concurrency by the number of + // CPUs. 8*runtime.NumCPU() was determined in 9a68241 (April 2017) as the + // optimal concurrency level on 8 CPU machines. For larger machines, we've + // seen (#56851) that this scaling curve can be too aggressive and lead to + // too much contention in the Raft scheduler, so we cap the concurrency + // level at 96. + // + // As of November 2020, this default value could be re-tuned. + "COCKROACH_SCHEDULER_CONCURRENCY", min(8*runtime.NumCPU(), 96)) var logSSTInfoTicks = envutil.EnvOrDefaultInt( - "COCKROACH_LOG_SST_INFO_TICKS_INTERVAL", 60, -) + "COCKROACH_LOG_SST_INFO_TICKS_INTERVAL", 60) // bulkIOWriteLimit is defined here because it is used by BulkIOWriteLimiter. var bulkIOWriteLimit = settings.RegisterPublicByteSizeSetting( @@ -2641,3 +2648,10 @@ func ReadClusterVersion( func init() { tracing.RegisterTagRemapping("s", "store") } + +func min(a, b int) int { + if a < b { + return a + } + return b +} From 145b01167847d55e42dc88e0217df3dd6f3fd8f0 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 18 Nov 2020 12:41:31 -0500 Subject: [PATCH 2/2] kv: prioritize NodeLiveness Range in Raft scheduler Relates to #56851. In #56851 and in many other investigations, we've seen cases where the NodeLiveness Range has a hard time performing writes when a system is under heavy load. We already split RPC traffic into two classes, ensuring that NodeLiveness traffic does not get stuck behind traffic on user ranges. However, to this point, it was still possible for the NodeLiveness range to get stuck behind other Ranges in the Raft scheduler, leading to high scheduling latency for Raft operations. This commit addresses this by prioritizing the NodeLiveness range above all others in the Raft scheduler. This prioritization mechanism is naive, but should be effective. It should also not run into any issues with fairness or starvation of other ranges, as such starvation is not possible as long as the scheduler concurrency (8*num_cpus) is above the number of high priority ranges (1). Release note (performance improvement): The Raft scheduler now prioritizes the node liveness Range. This was observed to prevent instability on large machines (32+ vCPU) in clusters with many ranges (50k+ per node). --- pkg/kv/kvserver/client_replica_test.go | 20 ++++++++++ pkg/kv/kvserver/helpers_test.go | 5 +++ pkg/kv/kvserver/replica_init.go | 8 ++++ pkg/kv/kvserver/scheduler.go | 54 +++++++++++++++++++++++--- pkg/kv/kvserver/scheduler_test.go | 39 ++++++++++++++++++- 5 files changed, 119 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 0e7ee066ce3b..4a59f89174c3 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -3286,3 +3286,23 @@ func makeReplicationTargets(ids ...int) (targets []roachpb.ReplicationTarget) { } return targets } + +func TestRaftSchedulerPrioritizesNodeLiveness(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + + // Determine the node liveness range ID. + livenessRepl := store.LookupReplica(roachpb.RKey(keys.NodeLivenessPrefix)) + livenessRangeID := livenessRepl.RangeID + + // Assert that the node liveness range is prioritized. + priorityID := store.RaftSchedulerPriorityID() + require.Equal(t, livenessRangeID, priorityID) +} diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 1996e97c329d..a23658609136 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -189,6 +189,11 @@ func (s *Store) ReservationCount() int { return len(s.snapshotApplySem) } +// RaftSchedulerPriorityID returns the Raft scheduler's prioritized range. +func (s *Store) RaftSchedulerPriorityID() roachpb.RangeID { + return s.scheduler.PriorityID() +} + // ClearClosedTimestampStorage clears the closed timestamp storage of all // knowledge about closed timestamps. func (s *Store) ClearClosedTimestampStorage() { diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index dbb25634bf07..725f15bee844 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -11,10 +11,12 @@ package kvserver import ( + "bytes" "context" "math/rand" "time" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" @@ -312,4 +314,10 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey)) r.concMgr.OnRangeDescUpdated(desc) r.mu.state.Desc = desc + + // Prioritize the NodeLiveness Range in the Raft scheduler above all other + // Ranges to ensure that liveness never sees high Raft scheduler latency. + if bytes.HasPrefix(desc.StartKey, keys.NodeLivenessPrefix) { + r.store.scheduler.SetPriorityID(desc.RangeID) + } } diff --git a/pkg/kv/kvserver/scheduler.go b/pkg/kv/kvserver/scheduler.go index ef444f8aaa12..f7462c16c20d 100644 --- a/pkg/kv/kvserver/scheduler.go +++ b/pkg/kv/kvserver/scheduler.go @@ -60,16 +60,31 @@ func (c *rangeIDChunk) Len() int { // amortizing the allocation/GC cost. Using a chunk queue avoids any copying // that would occur if a slice were used (the copying would occur on slice // reallocation). +// +// The queue has a naive understanding of priority and fairness. For the most +// part, it implements a FIFO queueing policy with no prioritization of some +// ranges over others. However, the queue can be configured with up to one +// high-priority range, which will always be placed at the front when added. type rangeIDQueue struct { + len int + + // Default priority. chunks list.List - len int + + // High priority. + priorityID roachpb.RangeID + priorityQueued bool } -func (q *rangeIDQueue) PushBack(id roachpb.RangeID) { +func (q *rangeIDQueue) Push(id roachpb.RangeID) { + q.len++ + if q.priorityID == id { + q.priorityQueued = true + return + } if q.chunks.Len() == 0 || q.back().WriteCap() == 0 { q.chunks.PushBack(&rangeIDChunk{}) } - q.len++ if !q.back().PushBack(id) { panic(fmt.Sprintf( "unable to push rangeID to chunk: len=%d, cap=%d", @@ -81,13 +96,17 @@ func (q *rangeIDQueue) PopFront() (roachpb.RangeID, bool) { if q.len == 0 { return 0, false } + q.len-- + if q.priorityQueued { + q.priorityQueued = false + return q.priorityID, true + } frontElem := q.chunks.Front() front := frontElem.Value.(*rangeIDChunk) id, ok := front.PopFront() if !ok { panic("encountered empty chunk") } - q.len-- if front.Len() == 0 && front.WriteCap() == 0 { q.chunks.Remove(frontElem) } @@ -98,6 +117,15 @@ func (q *rangeIDQueue) Len() int { return q.len } +func (q *rangeIDQueue) SetPriorityID(id roachpb.RangeID) { + if q.priorityID != 0 && q.priorityID != id { + panic(fmt.Sprintf( + "priority range ID already set: old=%d, new=%d", + q.priorityID, id)) + } + q.priorityID = id +} + func (q *rangeIDQueue) back() *rangeIDChunk { return q.chunks.Back().Value.(*rangeIDChunk) } @@ -172,6 +200,20 @@ func (s *raftScheduler) Wait(context.Context) { s.done.Wait() } +// SetPriorityID configures the single range that the scheduler will prioritize +// above others. Once set, callers are not permitted to change this value. +func (s *raftScheduler) SetPriorityID(id roachpb.RangeID) { + s.mu.Lock() + s.mu.queue.SetPriorityID(id) + s.mu.Unlock() +} + +func (s *raftScheduler) PriorityID() roachpb.RangeID { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.queue.priorityID +} + func (s *raftScheduler) worker(ctx context.Context) { defer s.done.Done() @@ -235,7 +277,7 @@ func (s *raftScheduler) worker(ctx context.Context) { } else { // There was a concurrent call to one of the Enqueue* methods. Queue the // range ID for further processing. - s.mu.queue.PushBack(id) + s.mu.queue.Push(id) s.mu.cond.Signal() } } @@ -251,7 +293,7 @@ func (s *raftScheduler) enqueue1Locked(addState raftScheduleState, id roachpb.Ra if newState&stateQueued == 0 { newState |= stateQueued queued++ - s.mu.queue.PushBack(id) + s.mu.queue.Push(id) } s.mu.state[id] = newState return queued diff --git a/pkg/kv/kvserver/scheduler_test.go b/pkg/kv/kvserver/scheduler_test.go index 56a5190d557e..1381e2321693 100644 --- a/pkg/kv/kvserver/scheduler_test.go +++ b/pkg/kv/kvserver/scheduler_test.go @@ -20,9 +20,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/pkg/errors" + "github.com/stretchr/testify/require" ) func TestRangeIDChunk(t *testing.T) { @@ -93,7 +95,7 @@ func TestRangeIDQueue(t *testing.T) { const count = 3 * rangeIDChunkSize for i := 1; i <= count; i++ { - q.PushBack(roachpb.RangeID(i)) + q.Push(roachpb.RangeID(i)) if e := i; e != q.Len() { t.Fatalf("expected %d, but found %d", e, q.Len()) } @@ -119,6 +121,41 @@ func TestRangeIDQueue(t *testing.T) { } } +func TestRangeIDQueuePrioritization(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var q rangeIDQueue + for _, withPriority := range []bool{false, true} { + if withPriority { + q.SetPriorityID(3) + } + + // Push 5 ranges in order, then pop them off. + for i := 1; i <= 5; i++ { + q.Push(roachpb.RangeID(i)) + require.Equal(t, i, q.Len()) + } + var popped []int + for i := 5; ; i-- { + require.Equal(t, i, q.Len()) + id, ok := q.PopFront() + if !ok { + require.Equal(t, i, 0) + break + } + popped = append(popped, int(id)) + } + + // Assert pop order. + if withPriority { + require.Equal(t, []int{3, 1, 2, 4, 5}, popped) + } else { + require.Equal(t, []int{1, 2, 3, 4, 5}, popped) + } + } +} + type testProcessor struct { mu struct { syncutil.Mutex