Skip to content

Commit

Permalink
resource_manager: record the max RU per second (tikv#7936) (tikv#7968)
Browse files Browse the repository at this point in the history
close tikv#7908

resource_manager: record the max RU per second

Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: nolouch <nolouch@gmail.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Apr 3, 2024
1 parent db8c5e1 commit 5845336
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 2 deletions.
97 changes: 95 additions & 2 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/failpoint"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand All @@ -41,7 +42,9 @@ const (
defaultConsumptionChanSize = 1024
metricsCleanupInterval = time.Minute
metricsCleanupTimeout = 20 * time.Minute
metricsAvailableRUInterval = 30 * time.Second
metricsAvailableRUInterval = 1 * time.Second
defaultCollectIntervalSec = 20
tickPerSecond = time.Second

reservedDefaultGroupName = "default"
middlePriority = 8
Expand Down Expand Up @@ -359,6 +362,9 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
defer cleanUpTicker.Stop()
availableRUTicker := time.NewTicker(metricsAvailableRUInterval)
defer availableRUTicker.Stop()
recordMaxTicker := time.NewTicker(tickPerSecond)
defer recordMaxTicker.Stop()
maxPerSecTrackers := make(map[string]*maxPerSecCostTracker)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -388,6 +394,13 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
readRequestCountMetrics = requestCount.WithLabelValues(name, name, readTypeLabel)
writeRequestCountMetrics = requestCount.WithLabelValues(name, name, writeTypeLabel)
)
t, ok := maxPerSecTrackers[name]
if !ok {
t = newMaxPerSecCostTracker(name, defaultCollectIntervalSec)
maxPerSecTrackers[name] = t
}
t.CollectConsumption(consumption)

// RU info.
if consumption.RRU > 0 {
rruMetrics.Add(consumption.RRU)
Expand Down Expand Up @@ -435,21 +448,101 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
requestCount.DeleteLabelValues(r.name, r.name, writeTypeLabel)
availableRUCounter.DeleteLabelValues(r.name, r.name, r.ruType)
delete(m.consumptionRecord, r)
delete(maxPerSecTrackers, r.name)
readRequestUnitMaxPerSecCost.DeleteLabelValues(r.name)
writeRequestUnitMaxPerSecCost.DeleteLabelValues(r.name)
}
}
case <-availableRUTicker.C:
m.RLock()
groups := make([]*ResourceGroup, 0, len(m.groups))
for name, group := range m.groups {
if name == reservedDefaultGroupName {
continue
}
groups = append(groups, group)
}
m.RUnlock()
// prevent many groups and hold the lock long time.
for _, group := range groups {
ru := group.getRUToken()
if ru < 0 {
ru = 0
}
availableRUCounter.WithLabelValues(name, name).Set(ru)
availableRUCounter.WithLabelValues(group.Name, group.Name).Set(ru)
}

case <-recordMaxTicker.C:
// Record the sum of RRU and WRU every second.
m.RLock()
names := make([]string, 0, len(m.groups))
for name := range m.groups {
names = append(names, name)
}
m.RUnlock()
for _, name := range names {
if t, ok := maxPerSecTrackers[name]; !ok {
maxPerSecTrackers[name] = newMaxPerSecCostTracker(name, defaultCollectIntervalSec)
} else {
t.FlushMetrics()
}
}
}
}
}

type maxPerSecCostTracker struct {
name string
maxPerSecRRU float64
maxPerSecWRU float64
rruSum float64
wruSum float64
lastRRUSum float64
lastWRUSum float64
flushPeriod int
cnt int
rruMaxMetrics prometheus.Gauge
wruMaxMetrics prometheus.Gauge
}

func newMaxPerSecCostTracker(name string, flushPeriod int) *maxPerSecCostTracker {
return &maxPerSecCostTracker{
name: name,
flushPeriod: flushPeriod,
rruMaxMetrics: readRequestUnitMaxPerSecCost.WithLabelValues(name),
wruMaxMetrics: writeRequestUnitMaxPerSecCost.WithLabelValues(name),
}
}

// CollectConsumption collects the consumption info.
func (t *maxPerSecCostTracker) CollectConsumption(consume *rmpb.Consumption) {
t.rruSum += consume.RRU
t.wruSum += consume.WRU
}

// FlushMetrics and set the maxPerSecRRU and maxPerSecWRU to the metrics.
func (t *maxPerSecCostTracker) FlushMetrics() {
if t.lastRRUSum == 0 && t.lastWRUSum == 0 {
t.lastRRUSum = t.rruSum
t.lastWRUSum = t.wruSum
return
}
deltaRRU := t.rruSum - t.lastRRUSum
deltaWRU := t.wruSum - t.lastWRUSum
t.lastRRUSum = t.rruSum
t.lastWRUSum = t.wruSum
if deltaRRU > t.maxPerSecRRU {
t.maxPerSecRRU = deltaRRU
}
if deltaWRU > t.maxPerSecWRU {
t.maxPerSecWRU = deltaWRU
}
t.cnt++
// flush to metrics in every flushPeriod.
if t.cnt%t.flushPeriod == 0 {
t.rruMaxMetrics.Set(t.maxPerSecRRU)
t.wruMaxMetrics.Set(t.maxPerSecWRU)
t.maxPerSecRRU = 0
t.maxPerSecWRU = 0
}
}
18 changes: 18 additions & 0 deletions pkg/mcs/resourcemanager/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ var (
Name: "write_request_unit_sum",
Help: "Counter of the write request unit cost for all resource groups.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel, typeLabel})

readRequestUnitMaxPerSecCost = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: ruSubsystem,
Name: "read_request_unit_max_per_sec",
Help: "Gauge of the max read request unit per second for all resource groups.",
}, []string{newResourceGroupNameLabel})
writeRequestUnitMaxPerSecCost = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: ruSubsystem,
Name: "write_request_unit_max_per_sec",
Help: "Gauge of the max write request unit per second for all resource groups.",
}, []string{newResourceGroupNameLabel})

sqlLayerRequestUnitCost = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -121,4 +137,6 @@ func init() {
prometheus.MustRegister(sqlCPUCost)
prometheus.MustRegister(requestCount)
prometheus.MustRegister(availableRUCounter)
prometheus.MustRegister(readRequestUnitMaxPerSecCost)
prometheus.MustRegister(writeRequestUnitMaxPerSecCost)
}
51 changes: 51 additions & 0 deletions pkg/mcs/resourcemanager/server/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"fmt"
"testing"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
)

func TestMaxPerSecCostTracker(t *testing.T) {
tracker := newMaxPerSecCostTracker("test", defaultCollectIntervalSec)
re := require.New(t)

// Define the expected max values for each flushPeriod
expectedMaxRU := []float64{19, 39, 59}
expectedSum := []float64{190, 780, 1770}

for i := 0; i < 60; i++ {
// Record data
consumption := &rmpb.Consumption{
RRU: float64(i),
WRU: float64(i),
}
tracker.CollectConsumption(consumption)
tracker.FlushMetrics()

// Check the max values at the end of each flushPeriod
if (i+1)%20 == 0 {
period := i / 20
re.Equal(tracker.maxPerSecRRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecRRU in period %d is incorrect", period+1))
re.Equal(tracker.maxPerSecWRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecWRU in period %d is incorrect", period+1))
re.Equal(tracker.rruSum, expectedSum[period])
re.Equal(tracker.rruSum, expectedSum[period])
}
}
}

0 comments on commit 5845336

Please sign in to comment.