From 1f8b1adf9116ddd21df84985a2ecf16f23ba7fca Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Wed, 27 Apr 2022 14:22:06 +0100 Subject: [PATCH] Remove smoothing/bucketing in favor of using a sample metric --- api.go | 2 +- fsm.go | 2 +- saturation.go | 79 ++++++++++++---------------------------------- saturation_test.go | 45 +++++--------------------- 4 files changed, 31 insertions(+), 97 deletions(-) diff --git a/api.go b/api.go index 79a97c614..eda33c3f0 100644 --- a/api.go +++ b/api.go @@ -556,7 +556,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna leadershipTransferCh: make(chan *leadershipTransferFuture, 1), leaderNotifyCh: make(chan struct{}, 1), followerNotifyCh: make(chan struct{}, 1), - mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second, 5), + mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second), } r.conf.Store(*conf) diff --git a/fsm.go b/fsm.go index cfed3450c..6d26a9a28 100644 --- a/fsm.go +++ b/fsm.go @@ -223,7 +223,7 @@ func (r *Raft) runFSM() { req.respond(err) } - saturation := newSaturationMetric([]string{"raft", "thread", "fsm", "saturation"}, 1*time.Second, 5) + saturation := newSaturationMetric([]string{"raft", "thread", "fsm", "saturation"}, 1*time.Second) for { saturation.sleeping() diff --git a/saturation.go b/saturation.go index 38ba2a93c..0124016eb 100644 --- a/saturation.go +++ b/saturation.go @@ -19,44 +19,32 @@ import ( type saturationMetric struct { reportInterval time.Duration - // buckets contains measurements for both the current, and a configurable - // number of previous periods (to smooth out spikes). - buckets []saturationMetricBucket - currentBucket int - - sleepBegan, workBegan time.Time - - // These are overwritten in tests. - nowFn func() time.Time - reportFn func(float32) -} - -// saturationMetricBucket contains the measurements for a period. -type saturationMetricBucket struct { - // beginning of the period for which this bucket contains measurements. - beginning time.Time - // slept contains time for which the event processing loop was sleeping rather - // than working. + // than working in the last period. slept time.Duration // lost contains time that was lost due to incorrect instrumentation of the // event processing loop (e.g. calling sleeping() or working() multiple times - // in succession). + // in succession) in the last period. lost time.Duration + + lastReport, sleepBegan, workBegan time.Time + + // These are overwritten in tests. + nowFn func() time.Time + reportFn func(float32) } // newSaturationMetric creates a saturationMetric that will update the gauge // with the given name at the given reportInterval. keepPrev determines the // number of previous measurements that will be used to smooth out spikes. -func newSaturationMetric(name []string, reportInterval time.Duration, keepPrev uint) *saturationMetric { +func newSaturationMetric(name []string, reportInterval time.Duration) *saturationMetric { m := &saturationMetric{ reportInterval: reportInterval, - buckets: make([]saturationMetricBucket, 1+keepPrev), nowFn: time.Now, - reportFn: func(sat float32) { metrics.SetGauge(name, sat) }, + lastReport: time.Now(), + reportFn: func(sat float32) { metrics.AddSample(name, sat) }, } - m.buckets[0].beginning = time.Now() return m } @@ -68,7 +56,7 @@ func (s *saturationMetric) sleeping() { if !s.sleepBegan.IsZero() { // sleeping called twice in succession. Count that time as lost rather than // measuring nonsense. - s.buckets[s.currentBucket].lost += now.Sub(s.sleepBegan) + s.lost += now.Sub(s.sleepBegan) } s.sleepBegan = now @@ -80,20 +68,19 @@ func (s *saturationMetric) sleeping() { // proceeded by a call to sleeping. func (s *saturationMetric) working() { now := s.nowFn() - bucket := &s.buckets[s.currentBucket] if s.workBegan.IsZero() { if s.sleepBegan.IsZero() { // working called before the initial call to sleeping. Count that time as // lost rather than measuring nonsense. - bucket.lost += now.Sub(bucket.beginning) + s.lost += now.Sub(s.lastReport) } else { - bucket.slept += now.Sub(s.sleepBegan) + s.slept += now.Sub(s.sleepBegan) } } else { // working called twice in succession. Count that time as lost rather than // measuring nonsense. - bucket.lost += now.Sub(s.workBegan) + s.lost += now.Sub(s.workBegan) } s.workBegan = now @@ -104,45 +91,21 @@ func (s *saturationMetric) working() { // report updates the gauge if reportInterval has passed since our last report. func (s *saturationMetric) report() { now := s.nowFn() - timeSinceLastReport := now.Sub(s.buckets[s.currentBucket].beginning) + timeSinceLastReport := now.Sub(s.lastReport) if timeSinceLastReport < s.reportInterval { return } - var ( - beginning time.Time - slept, lost time.Duration - ) - for _, bucket := range s.buckets { - if bucket.beginning.IsZero() { - continue - } - - if beginning.IsZero() || bucket.beginning.Before(beginning) { - beginning = bucket.beginning - } - - slept += bucket.slept - lost += bucket.lost - } - - total := now.Sub(beginning) - lost - var saturation float64 + total := timeSinceLastReport - s.lost if total != 0 { - saturation = float64(total-slept) / float64(total) + saturation = float64(total-s.slept) / float64(total) saturation = math.Round(saturation*100) / 100 } s.reportFn(float32(saturation)) - s.currentBucket++ - if s.currentBucket >= len(s.buckets) { - s.currentBucket = 0 - } - - bucket := &s.buckets[s.currentBucket] - bucket.slept = 0 - bucket.lost = 0 - bucket.beginning = now + s.slept = 0 + s.lost = 0 + s.lastReport = now } diff --git a/saturation_test.go b/saturation_test.go index 572ccaef9..35046f347 100644 --- a/saturation_test.go +++ b/saturation_test.go @@ -9,9 +9,9 @@ import ( func TestSaturationMetric(t *testing.T) { t.Run("without smoothing", func(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond, 0) + sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond) - now := sat.buckets[0].beginning + now := sat.lastReport sat.nowFn = func() time.Time { return now } var reported float32 @@ -46,42 +46,13 @@ func TestSaturationMetric(t *testing.T) { // Should be 0% saturation. require.Equal(t, float32(0), reported) }) - - t.Run("with smoothing", func(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond, 1) - - now := sat.buckets[0].beginning - sat.nowFn = func() time.Time { return now } - - var reported float32 - sat.reportFn = func(val float32) { reported = val } - - // First report window: 50ms sleeping + 75ms working. - sat.sleeping() - - now = now.Add(50 * time.Millisecond) - sat.working() - - now = now.Add(75 * time.Millisecond) - sat.sleeping() - - // Second report window: 75ms sleeping + 50ms working. - now = now.Add(75 * time.Millisecond) - sat.working() - - now = now.Add(50 * time.Millisecond) - sat.sleeping() - - // Should average out to 50% saturation. - require.Equal(t, float32(0.5), reported) - }) } func TestSaturationMetric_IncorrectUsage(t *testing.T) { t.Run("calling sleeping() consecutively", func(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 50*time.Millisecond, 0) + sat := newSaturationMetric([]string{"metric"}, 50*time.Millisecond) - now := sat.buckets[0].beginning + now := sat.lastReport sat.nowFn = func() time.Time { return now } var reported float32 @@ -119,9 +90,9 @@ func TestSaturationMetric_IncorrectUsage(t *testing.T) { }) t.Run("calling working() consecutively", func(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 30*time.Millisecond, 0) + sat := newSaturationMetric([]string{"metric"}, 30*time.Millisecond) - now := sat.buckets[0].beginning + now := sat.lastReport sat.nowFn = func() time.Time { return now } var reported float32 @@ -151,9 +122,9 @@ func TestSaturationMetric_IncorrectUsage(t *testing.T) { }) t.Run("calling working() first", func(t *testing.T) { - sat := newSaturationMetric([]string{"metric"}, 10*time.Millisecond, 0) + sat := newSaturationMetric([]string{"metric"}, 10*time.Millisecond) - now := sat.buckets[0].beginning.Add(10 * time.Millisecond) + now := sat.lastReport sat.nowFn = func() time.Time { return now } var reported float32