diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 0e7ee066ce3b..4a59f89174c3 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -3286,3 +3286,23 @@ func makeReplicationTargets(ids ...int) (targets []roachpb.ReplicationTarget) { } return targets } + +func TestRaftSchedulerPrioritizesNodeLiveness(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + + // Determine the node liveness range ID. + livenessRepl := store.LookupReplica(roachpb.RKey(keys.NodeLivenessPrefix)) + livenessRangeID := livenessRepl.RangeID + + // Assert that the node liveness range is prioritized. + priorityID := store.RaftSchedulerPriorityID() + require.Equal(t, livenessRangeID, priorityID) +} diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 1996e97c329d..a23658609136 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -189,6 +189,11 @@ func (s *Store) ReservationCount() int { return len(s.snapshotApplySem) } +// RaftSchedulerPriorityID returns the Raft scheduler's prioritized range. +func (s *Store) RaftSchedulerPriorityID() roachpb.RangeID { + return s.scheduler.PriorityID() +} + // ClearClosedTimestampStorage clears the closed timestamp storage of all // knowledge about closed timestamps. func (s *Store) ClearClosedTimestampStorage() { diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index dbb25634bf07..01791ba3ca9b 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -11,6 +11,7 @@ package kvserver import ( + "bytes" "context" "math/rand" "time" @@ -312,4 +313,10 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey)) r.concMgr.OnRangeDescUpdated(desc) r.mu.state.Desc = desc + + // Prioritize the NodeLiveness Range in the Raft scheduler above all other + // Ranges to ensure that liveness never sees high Raft scheduler latency. + if bytes.HasPrefix(desc.StartKey, keys.NodeLivenessPrefix) { + r.store.scheduler.SetPriorityID(desc.RangeID) + } } diff --git a/pkg/kv/kvserver/scheduler.go b/pkg/kv/kvserver/scheduler.go index ef444f8aaa12..f7462c16c20d 100644 --- a/pkg/kv/kvserver/scheduler.go +++ b/pkg/kv/kvserver/scheduler.go @@ -60,16 +60,31 @@ func (c *rangeIDChunk) Len() int { // amortizing the allocation/GC cost. Using a chunk queue avoids any copying // that would occur if a slice were used (the copying would occur on slice // reallocation). +// +// The queue has a naive understanding of priority and fairness. For the most +// part, it implements a FIFO queueing policy with no prioritization of some +// ranges over others. However, the queue can be configured with up to one +// high-priority range, which will always be placed at the front when added. type rangeIDQueue struct { + len int + + // Default priority. chunks list.List - len int + + // High priority. + priorityID roachpb.RangeID + priorityQueued bool } -func (q *rangeIDQueue) PushBack(id roachpb.RangeID) { +func (q *rangeIDQueue) Push(id roachpb.RangeID) { + q.len++ + if q.priorityID == id { + q.priorityQueued = true + return + } if q.chunks.Len() == 0 || q.back().WriteCap() == 0 { q.chunks.PushBack(&rangeIDChunk{}) } - q.len++ if !q.back().PushBack(id) { panic(fmt.Sprintf( "unable to push rangeID to chunk: len=%d, cap=%d", @@ -81,13 +96,17 @@ func (q *rangeIDQueue) PopFront() (roachpb.RangeID, bool) { if q.len == 0 { return 0, false } + q.len-- + if q.priorityQueued { + q.priorityQueued = false + return q.priorityID, true + } frontElem := q.chunks.Front() front := frontElem.Value.(*rangeIDChunk) id, ok := front.PopFront() if !ok { panic("encountered empty chunk") } - q.len-- if front.Len() == 0 && front.WriteCap() == 0 { q.chunks.Remove(frontElem) } @@ -98,6 +117,15 @@ func (q *rangeIDQueue) Len() int { return q.len } +func (q *rangeIDQueue) SetPriorityID(id roachpb.RangeID) { + if q.priorityID != 0 && q.priorityID != id { + panic(fmt.Sprintf( + "priority range ID already set: old=%d, new=%d", + q.priorityID, id)) + } + q.priorityID = id +} + func (q *rangeIDQueue) back() *rangeIDChunk { return q.chunks.Back().Value.(*rangeIDChunk) } @@ -172,6 +200,20 @@ func (s *raftScheduler) Wait(context.Context) { s.done.Wait() } +// SetPriorityID configures the single range that the scheduler will prioritize +// above others. Once set, callers are not permitted to change this value. +func (s *raftScheduler) SetPriorityID(id roachpb.RangeID) { + s.mu.Lock() + s.mu.queue.SetPriorityID(id) + s.mu.Unlock() +} + +func (s *raftScheduler) PriorityID() roachpb.RangeID { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.queue.priorityID +} + func (s *raftScheduler) worker(ctx context.Context) { defer s.done.Done() @@ -235,7 +277,7 @@ func (s *raftScheduler) worker(ctx context.Context) { } else { // There was a concurrent call to one of the Enqueue* methods. Queue the // range ID for further processing. - s.mu.queue.PushBack(id) + s.mu.queue.Push(id) s.mu.cond.Signal() } } @@ -251,7 +293,7 @@ func (s *raftScheduler) enqueue1Locked(addState raftScheduleState, id roachpb.Ra if newState&stateQueued == 0 { newState |= stateQueued queued++ - s.mu.queue.PushBack(id) + s.mu.queue.Push(id) } s.mu.state[id] = newState return queued diff --git a/pkg/kv/kvserver/scheduler_test.go b/pkg/kv/kvserver/scheduler_test.go index 56a5190d557e..1381e2321693 100644 --- a/pkg/kv/kvserver/scheduler_test.go +++ b/pkg/kv/kvserver/scheduler_test.go @@ -20,9 +20,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/pkg/errors" + "github.com/stretchr/testify/require" ) func TestRangeIDChunk(t *testing.T) { @@ -93,7 +95,7 @@ func TestRangeIDQueue(t *testing.T) { const count = 3 * rangeIDChunkSize for i := 1; i <= count; i++ { - q.PushBack(roachpb.RangeID(i)) + q.Push(roachpb.RangeID(i)) if e := i; e != q.Len() { t.Fatalf("expected %d, but found %d", e, q.Len()) } @@ -119,6 +121,41 @@ func TestRangeIDQueue(t *testing.T) { } } +func TestRangeIDQueuePrioritization(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var q rangeIDQueue + for _, withPriority := range []bool{false, true} { + if withPriority { + q.SetPriorityID(3) + } + + // Push 5 ranges in order, then pop them off. + for i := 1; i <= 5; i++ { + q.Push(roachpb.RangeID(i)) + require.Equal(t, i, q.Len()) + } + var popped []int + for i := 5; ; i-- { + require.Equal(t, i, q.Len()) + id, ok := q.PopFront() + if !ok { + require.Equal(t, i, 0) + break + } + popped = append(popped, int(id)) + } + + // Assert pop order. + if withPriority { + require.Equal(t, []int{3, 1, 2, 4, 5}, popped) + } else { + require.Equal(t, []int{1, 2, 3, 4, 5}, popped) + } + } +} + type testProcessor struct { mu struct { syncutil.Mutex