Skip to content

Commit

Permalink
kv: prioritize NodeLiveness Range in Raft scheduler
Browse files Browse the repository at this point in the history
Relates to #56851.

In #56851 and in many other investigations, we've seen cases where the
NodeLiveness Range has a hard time performing writes when a system is
under heavy load. We already split RPC traffic into two classes,
ensuring that NodeLiveness traffic does not get stuck behind traffic on
user ranges. However, to this point, it was still possible for the
NodeLiveness range to get stuck behind other Ranges in the Raft
scheduler, leading to high scheduling latency for Raft operations.

This commit addresses this by prioritizing the NodeLiveness range above
all others in the Raft scheduler. This prioritization mechanism is
naive, but should be effective. It should also not run into any issues
with fairness or starvation of other ranges, as such starvation is not
possible as long as the scheduler concurrency (8*num_cpus) is above the
number of high priority ranges (1).

Release note (performance improvement): The Raft scheduler now prioritizes
the node liveness Range. This was observed to prevent instability on large
machines (32+ vCPU) in clusters with many ranges (50k+ per node).
  • Loading branch information
nvanbenschoten committed May 3, 2021
1 parent e1c93b1 commit 145b011
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 7 deletions.
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Expand Up @@ -3286,3 +3286,23 @@ func makeReplicationTargets(ids ...int) (targets []roachpb.ReplicationTarget) {
}
return targets
}

func TestRaftSchedulerPrioritizesNodeLiveness(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)

// Determine the node liveness range ID.
livenessRepl := store.LookupReplica(roachpb.RKey(keys.NodeLivenessPrefix))
livenessRangeID := livenessRepl.RangeID

// Assert that the node liveness range is prioritized.
priorityID := store.RaftSchedulerPriorityID()
require.Equal(t, livenessRangeID, priorityID)
}
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Expand Up @@ -189,6 +189,11 @@ func (s *Store) ReservationCount() int {
return len(s.snapshotApplySem)
}

// RaftSchedulerPriorityID returns the Raft scheduler's prioritized range.
func (s *Store) RaftSchedulerPriorityID() roachpb.RangeID {
return s.scheduler.PriorityID()
}

// ClearClosedTimestampStorage clears the closed timestamp storage of all
// knowledge about closed timestamps.
func (s *Store) ClearClosedTimestampStorage() {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/replica_init.go
Expand Up @@ -11,10 +11,12 @@
package kvserver

import (
"bytes"
"context"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
Expand Down Expand Up @@ -312,4 +314,10 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R
r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey))
r.concMgr.OnRangeDescUpdated(desc)
r.mu.state.Desc = desc

// Prioritize the NodeLiveness Range in the Raft scheduler above all other
// Ranges to ensure that liveness never sees high Raft scheduler latency.
if bytes.HasPrefix(desc.StartKey, keys.NodeLivenessPrefix) {
r.store.scheduler.SetPriorityID(desc.RangeID)
}
}
54 changes: 48 additions & 6 deletions pkg/kv/kvserver/scheduler.go
Expand Up @@ -60,16 +60,31 @@ func (c *rangeIDChunk) Len() int {
// amortizing the allocation/GC cost. Using a chunk queue avoids any copying
// that would occur if a slice were used (the copying would occur on slice
// reallocation).
//
// The queue has a naive understanding of priority and fairness. For the most
// part, it implements a FIFO queueing policy with no prioritization of some
// ranges over others. However, the queue can be configured with up to one
// high-priority range, which will always be placed at the front when added.
type rangeIDQueue struct {
len int

// Default priority.
chunks list.List
len int

// High priority.
priorityID roachpb.RangeID
priorityQueued bool
}

func (q *rangeIDQueue) PushBack(id roachpb.RangeID) {
func (q *rangeIDQueue) Push(id roachpb.RangeID) {
q.len++
if q.priorityID == id {
q.priorityQueued = true
return
}
if q.chunks.Len() == 0 || q.back().WriteCap() == 0 {
q.chunks.PushBack(&rangeIDChunk{})
}
q.len++
if !q.back().PushBack(id) {
panic(fmt.Sprintf(
"unable to push rangeID to chunk: len=%d, cap=%d",
Expand All @@ -81,13 +96,17 @@ func (q *rangeIDQueue) PopFront() (roachpb.RangeID, bool) {
if q.len == 0 {
return 0, false
}
q.len--
if q.priorityQueued {
q.priorityQueued = false
return q.priorityID, true
}
frontElem := q.chunks.Front()
front := frontElem.Value.(*rangeIDChunk)
id, ok := front.PopFront()
if !ok {
panic("encountered empty chunk")
}
q.len--
if front.Len() == 0 && front.WriteCap() == 0 {
q.chunks.Remove(frontElem)
}
Expand All @@ -98,6 +117,15 @@ func (q *rangeIDQueue) Len() int {
return q.len
}

func (q *rangeIDQueue) SetPriorityID(id roachpb.RangeID) {
if q.priorityID != 0 && q.priorityID != id {
panic(fmt.Sprintf(
"priority range ID already set: old=%d, new=%d",
q.priorityID, id))
}
q.priorityID = id
}

func (q *rangeIDQueue) back() *rangeIDChunk {
return q.chunks.Back().Value.(*rangeIDChunk)
}
Expand Down Expand Up @@ -172,6 +200,20 @@ func (s *raftScheduler) Wait(context.Context) {
s.done.Wait()
}

// SetPriorityID configures the single range that the scheduler will prioritize
// above others. Once set, callers are not permitted to change this value.
func (s *raftScheduler) SetPriorityID(id roachpb.RangeID) {
s.mu.Lock()
s.mu.queue.SetPriorityID(id)
s.mu.Unlock()
}

func (s *raftScheduler) PriorityID() roachpb.RangeID {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.queue.priorityID
}

func (s *raftScheduler) worker(ctx context.Context) {
defer s.done.Done()

Expand Down Expand Up @@ -235,7 +277,7 @@ func (s *raftScheduler) worker(ctx context.Context) {
} else {
// There was a concurrent call to one of the Enqueue* methods. Queue the
// range ID for further processing.
s.mu.queue.PushBack(id)
s.mu.queue.Push(id)
s.mu.cond.Signal()
}
}
Expand All @@ -251,7 +293,7 @@ func (s *raftScheduler) enqueue1Locked(addState raftScheduleState, id roachpb.Ra
if newState&stateQueued == 0 {
newState |= stateQueued
queued++
s.mu.queue.PushBack(id)
s.mu.queue.Push(id)
}
s.mu.state[id] = newState
return queued
Expand Down
39 changes: 38 additions & 1 deletion pkg/kv/kvserver/scheduler_test.go
Expand Up @@ -20,9 +20,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

func TestRangeIDChunk(t *testing.T) {
Expand Down Expand Up @@ -93,7 +95,7 @@ func TestRangeIDQueue(t *testing.T) {

const count = 3 * rangeIDChunkSize
for i := 1; i <= count; i++ {
q.PushBack(roachpb.RangeID(i))
q.Push(roachpb.RangeID(i))
if e := i; e != q.Len() {
t.Fatalf("expected %d, but found %d", e, q.Len())
}
Expand All @@ -119,6 +121,41 @@ func TestRangeIDQueue(t *testing.T) {
}
}

func TestRangeIDQueuePrioritization(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var q rangeIDQueue
for _, withPriority := range []bool{false, true} {
if withPriority {
q.SetPriorityID(3)
}

// Push 5 ranges in order, then pop them off.
for i := 1; i <= 5; i++ {
q.Push(roachpb.RangeID(i))
require.Equal(t, i, q.Len())
}
var popped []int
for i := 5; ; i-- {
require.Equal(t, i, q.Len())
id, ok := q.PopFront()
if !ok {
require.Equal(t, i, 0)
break
}
popped = append(popped, int(id))
}

// Assert pop order.
if withPriority {
require.Equal(t, []int{3, 1, 2, 4, 5}, popped)
} else {
require.Equal(t, []int{1, 2, 3, 4, 5}, popped)
}
}
}

type testProcessor struct {
mu struct {
syncutil.Mutex
Expand Down

0 comments on commit 145b011

Please sign in to comment.