-
Notifications
You must be signed in to change notification settings - Fork 967
/
saturation.go
148 lines (123 loc) · 4.07 KB
/
saturation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package raft
import (
"math"
"time"
"github.com/armon/go-metrics"
)
// saturationMetric measures the saturation (percentage of time spent working vs
// waiting for work) of an event processing loop, such as runFSM. It reports the
// saturation as a gauge metric (at most) once every reportInterval.
//
// Callers must instrument their loop with calls to sleeping and working, starting
// with a call to sleeping.
//
// Note: the caller must be single-threaded and saturationMetric is not safe for
// concurrent use by multiple goroutines.
type saturationMetric struct {
reportInterval time.Duration
// buckets contains measurements for both the current, and a configurable
// number of previous periods (to smooth out spikes).
buckets []saturationMetricBucket
currentBucket int
sleepBegan, workBegan time.Time
// These are overwritten in tests.
nowFn func() time.Time
reportFn func(float32)
}
// saturationMetricBucket contains the measurements for a period.
type saturationMetricBucket struct {
// beginning of the period for which this bucket contains measurements.
beginning time.Time
// slept contains time for which the event processing loop was sleeping rather
// than working.
slept time.Duration
// lost contains time that is considered lost due to incorrect use
// of saturationMetricBucket (e.g. calling sleeping() or working()
// multiple times in succession).
lost time.Duration
}
// newSaturationMetric creates a saturationMetric that will update the gauge
// with the given name at the given reportInterval. keepPrev determines the
// number of previous measurements that will be used to smooth out spikes.
func newSaturationMetric(name []string, reportInterval time.Duration, keepPrev uint) *saturationMetric {
m := &saturationMetric{
reportInterval: reportInterval,
buckets: make([]saturationMetricBucket, 1+keepPrev),
nowFn: time.Now,
reportFn: func(sat float32) { metrics.SetGauge(name, sat) },
}
m.buckets[0].beginning = time.Now()
return m
}
// sleeping records the time at which the loop began waiting for work. After the
// initial call it must always be proceeded by a call to working.
func (s *saturationMetric) sleeping() {
now := s.nowFn()
if !s.sleepBegan.IsZero() {
// sleeping called twice in succession. Count that time as lost rather than
// measuring nonsense.
s.buckets[s.currentBucket].lost += now.Sub(s.sleepBegan)
}
s.sleepBegan = now
s.workBegan = time.Time{}
s.report()
}
// working records the time at which the loop began working. It must always be
// proceeded by a call to sleeping.
func (s *saturationMetric) working() {
now := s.nowFn()
bucket := &s.buckets[s.currentBucket]
if s.workBegan.IsZero() {
if s.sleepBegan.IsZero() {
// working called before the initial call to sleeping. Count that time as
// lost rather than measuring nonsense.
bucket.lost += now.Sub(bucket.beginning)
} else {
bucket.slept += now.Sub(s.sleepBegan)
}
} else {
// working called twice in succession. Count that time as lost rather than
// measuring nonsense.
bucket.lost += now.Sub(s.workBegan)
}
s.workBegan = now
s.sleepBegan = time.Time{}
s.report()
}
// report updates the gauge if reportInterval has passed since our last report.
func (s *saturationMetric) report() {
now := s.nowFn()
timeSinceLastReport := now.Sub(s.buckets[s.currentBucket].beginning)
if timeSinceLastReport < s.reportInterval {
return
}
var (
beginning time.Time
slept, lost time.Duration
)
for _, bucket := range s.buckets {
if bucket.beginning.IsZero() {
continue
}
if beginning.IsZero() || bucket.beginning.Before(beginning) {
beginning = bucket.beginning
}
slept += bucket.slept
lost += bucket.lost
}
total := now.Sub(beginning) - lost
var saturation float64
if total != 0 {
saturation = float64(total-slept) / float64(total)
saturation = math.Round(saturation*100) / 100
}
s.reportFn(float32(saturation))
s.currentBucket++
if s.currentBucket >= len(s.buckets) {
s.currentBucket = 0
}
bucket := &s.buckets[s.currentBucket]
bucket.slept = 0
bucket.lost = 0
bucket.beginning = now
}