-
Notifications
You must be signed in to change notification settings - Fork 107
/
cmp.go
523 lines (454 loc) · 20.1 KB
/
cmp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
package cmp
import (
"context"
"fmt"
"os"
"sort"
"time"
"github.com/prometheus/client_golang/api"
prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/oasislabs/oasis-core/go/common/logging"
nodeCommon "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common"
"github.com/oasislabs/oasis-core/go/oasis-node/cmd/common/metrics"
"github.com/oasislabs/oasis-core/go/oasis-test-runner/cmd/common"
)
const (
cfgMetrics = "metrics"
cfgMetricsP = "m"
cfgMetricsTargetGitBranch = "metrics.target.git_branch"
cfgMetricsSourceGitBranch = "metrics.source.git_branch"
cfgMetricsNetDevice = "metrics.net.device"
)
var (
cmpCmd = &cobra.Command{
Use: "cmp",
Short: "compare benchmark results of last two batches",
Long: `cmp connects to prometheus, fetches results of the benchmark batches and
compares them. By default, the most recent batch (source) is fetched and
compared to the pre-last (target). If --metrics.{target|source}.git_branch is
provided, it compares the most recent batches in the corresponding branches.
cmp compares all metrics provided by --metrics parameter and computes ratio
source/target of metric values. If any of the metrics exceeds
max_threshold.<metric>.{avg|max}_ratio or doesn't reach
min_threshold.<metric>.{avg|max}_ratio, ba exits with error code 1.`,
Run: runCmp,
}
allMetrics = map[string]*Metric{
"time": &Metric{
getter: getDuration,
maxThresholdAvgRatio: 1.1,
maxThresholdMaxRatio: 1.1,
},
"du": &Metric{
getter: getDiskUsage,
maxThresholdAvgRatio: 1.06,
maxThresholdMaxRatio: 1.15,
},
"io": &Metric{
getter: getIOWork,
maxThresholdAvgRatio: 1.2,
maxThresholdMaxRatio: 1.2,
},
"mem": &Metric{
getter: getRssAnonMemory,
maxThresholdAvgRatio: 1.1,
maxThresholdMaxRatio: 1.1,
},
"cpu": &Metric{
getter: getCPUTime,
maxThresholdAvgRatio: 1.05,
maxThresholdMaxRatio: 1.05,
},
"net": &Metric{
getter: getNetwork,
// Network stats suffer effects from other processes too and varies.
maxThresholdAvgRatio: 1.3,
maxThresholdMaxRatio: 1.3,
},
}
userMetrics []string
client api.Client
cmpLogger *logging.Logger
)
// Metric is a base class for getting a specific prometheus metric and required thresholds to test.
//
// There is one instance of this struct for each test for each metric.
type Metric struct {
// getter fetches given coarse time series with finer granularity and returns average and maximum values of all runs
// in the same batch.
getter func(context.Context, string, *model.SampleStream) (float64, float64, error)
// maxThresholdAvgRatio is maximum allowed ratio between the average values of source and target batches.
maxThresholdAvgRatio float64
// maxThresholdMaxRatio is maximum allowed ratio between the maximum values of source and target batches.
maxThresholdMaxRatio float64
// minThresholdAvgRatio is minimum required ratio between the average values of source and target batches.
minThresholdAvgRatio float64
// minThresholdMaxRatio is minimum required ratio between the maximum values of source and target batches.
minThresholdMaxRatio float64
}
// getDuration returns average and maximum running times of the given coarse benchmark instance ("oasis_up" metric w/
// minute resolution time series).
func getDuration(ctx context.Context, test string, bi *model.SampleStream) (float64, float64, error) {
instance := string(bi.Metric[metrics.MetricsLabelInstance])
// Re-fetch the given benchmark instance with second resolution. Each obtained time series corresponds to one run.
v1api := prometheusAPI.NewAPI(client)
r := prometheusAPI.Range{
Start: bi.Values[0].Timestamp.Time().Add(-1 * time.Minute),
End: bi.Values[len(bi.Values)-1].Timestamp.Time().Add(time.Minute),
Step: time.Second,
}
query := fmt.Sprintf("%s %s == 1.0", metrics.MetricUp, bi.Metric.String())
result, warnings, err := v1api.QueryRange(ctx, query, r)
if err != nil {
nodeCommon.EarlyLogAndExit(fmt.Errorf("error querying Prometheus: %w", err))
}
if len(warnings) > 0 {
cmpLogger.Warn("warnings while querying Prometheus", "warnings", warnings)
}
if len(result.(model.Matrix)) == 0 {
return 0, 0, fmt.Errorf("getDuration: no time series matched test: %s and instance: %s", test, instance)
}
// Compute average and max duration of runs. Since we have a second-resolution, each point denotes 1 second of run's
// uptime. Just count all points and divide them by the number of runs.
avgDuration := 0.0
maxDuration := 0.0
for _, s := range result.(model.Matrix) {
avgDuration += float64(len(s.Values))
if maxDuration < float64(len(s.Values)) {
maxDuration = float64(len(s.Values))
}
}
avgDuration /= float64(len(result.(model.Matrix)))
return avgDuration, maxDuration, nil
}
// getIOWork returns average and maximum sum of read and written bytes by all workers of the given coarse benchmark
// instance ("oasis_up" metric).
func getIOWork(ctx context.Context, test string, bi *model.SampleStream) (float64, float64, error) {
readAvg, readMax, err := getSummableMetric(ctx, metrics.MetricDiskReadBytes, test, bi)
if err != nil {
return 0, 0, err
}
writtenAvg, writtenMax, err := getSummableMetric(ctx, metrics.MetricDiskWrittenBytes, test, bi)
if err != nil {
return 0, 0, err
}
return readAvg + writtenAvg, readMax + writtenMax, nil
}
// getDiskUsage returns average and maximum sum of disk usage for all workers of the given coarse benchmark instance
// ("oasis_up" metric).
func getDiskUsage(ctx context.Context, test string, bi *model.SampleStream) (float64, float64, error) {
return getSummableMetric(ctx, metrics.MetricDiskUsageBytes, test, bi)
}
// getRssAnonMemory returns average and maximum sum of anonymous resident memory for all workers of the given coarse
// benchmark instance ("oasis_up" metric).
func getRssAnonMemory(ctx context.Context, test string, bi *model.SampleStream) (float64, float64, error) {
return getSummableMetric(ctx, metrics.MetricMemRssAnonBytes, test, bi)
}
// getCPUTime returns average and maximum sum of utime and stime for all workers of the given coarse benchmark instance
// ("oasis_up" metric).
func getCPUTime(ctx context.Context, test string, bi *model.SampleStream) (float64, float64, error) {
utimeAvg, utimeMax, err := getSummableMetric(ctx, metrics.MetricCPUUTimeSeconds, test, bi)
if err != nil {
return 0, 0, err
}
stimeAvg, stimeMax, err := getSummableMetric(ctx, metrics.MetricCPUSTimeSeconds, test, bi)
if err != nil {
return 0, 0, err
}
return utimeAvg + stimeAvg, utimeMax + stimeMax, nil
}
// getSummableMetric returns average and maximum sum of metrics for all workers of the given coarse benchmark instance
// ("oasis_up" metric).
func getSummableMetric(ctx context.Context, metric string, test string, bi *model.SampleStream) (float64, float64, error) {
instance := string(bi.Metric[metrics.MetricsLabelInstance])
labels := bi.Metric.Clone()
// Existing job denotes the "oasis-test-runner" worker only. We want to sum disk space across all workers.
delete(labels, "job")
// We will average metric over all runs.
delete(labels, "run")
v1api := prometheusAPI.NewAPI(client)
query := fmt.Sprintf("sum by (run) (%s %s)", metric, labels.String())
// Fetch value at last recorded time. Some metrics might not be available anymore, if prometheus was shut down.
// Add one additional minute to capture reported values within the last minute period.
t := bi.Values[len(bi.Values)-1].Timestamp.Time().Add(time.Minute)
result, warnings, err := v1api.Query(ctx, query, t)
if err != nil {
nodeCommon.EarlyLogAndExit(fmt.Errorf("error querying Prometheus: %w", err))
}
if len(warnings) > 0 {
cmpLogger.Warn("warnings while querying Prometheus", "warnings", warnings)
}
if len(result.(model.Vector)) == 0 {
return 0, 0, fmt.Errorf("getSummableMetric: no time series matched test: %s and instance: %s", test, instance)
}
// Compute average and max values.
avg := 0.0
max := 0.0
for _, s := range result.(model.Vector) {
avg += float64(s.Value)
if max < float64(s.Value) {
max = float64(s.Value)
}
}
avg /= float64(len(result.(model.Vector)))
return avg, max, nil
}
// getNetwork returns average and maximum amount of network activity for all workers of the given coarse benchmark
// instance ("oasis_up" metric).
func getNetwork(ctx context.Context, test string, bi *model.SampleStream) (float64, float64, error) {
instance := string(bi.Metric[metrics.MetricsLabelInstance])
labels := bi.Metric.Clone()
// We will group by job to fetch traffic across all workers.
delete(labels, "job")
// We will average metric over all runs.
delete(labels, "run")
// We will consider traffic from loopback device only.
labels["device"] = model.LabelValue(viper.GetString(cfgMetricsNetDevice))
v1api := prometheusAPI.NewAPI(client)
r := prometheusAPI.Range{
Start: bi.Values[0].Timestamp.Time().Add(-1 * time.Minute),
End: bi.Values[len(bi.Values)-1].Timestamp.Time().Add(time.Minute),
Step: time.Second,
}
// We store total network traffic values. Compute the difference.
bytesTotalAvg := map[string]float64{}
bytesTotalMax := map[string]float64{}
for _, rxtx := range []string{metrics.MetricNetReceiveBytesTotal, metrics.MetricNetTransmitBytesTotal} {
query := fmt.Sprintf("(%s %s)", rxtx, labels.String())
result, warnings, err := v1api.QueryRange(ctx, query, r)
if err != nil {
nodeCommon.EarlyLogAndExit(fmt.Errorf("error querying Prometheus: %w", err))
}
if len(warnings) > 0 {
cmpLogger.Warn("warnings while querying Prometheus", "warnings", warnings)
}
if len(result.(model.Matrix)) == 0 {
return 0, 0, fmt.Errorf("getNetworkMetric: no time series matched test: %s and instance: %s", test, instance)
}
// Compute average and max values.
avg := 0.0
max := 0.0
for _, s := range result.(model.Matrix) {
// Network traffic is difference between last and first reading.
avg += float64(s.Values[len(s.Values)-1].Value - s.Values[0].Value)
if max < float64(s.Values[len(s.Values)-1].Value-s.Values[0].Value) {
max = float64(s.Values[len(s.Values)-1].Value - s.Values[0].Value)
}
}
avg /= float64(len(result.(model.Matrix)))
bytesTotalAvg[rxtx] = avg
bytesTotalMax[rxtx] = max
}
return (bytesTotalAvg[metrics.MetricNetReceiveBytesTotal] + bytesTotalAvg[metrics.MetricNetTransmitBytesTotal]) / 2.0,
(bytesTotalMax[metrics.MetricNetReceiveBytesTotal] + bytesTotalMax[metrics.MetricNetTransmitBytesTotal]) / 2.0,
nil
}
// getCoarseBenchmarkInstances finds time series based on "oasis_up" metric w/ minute resolution for the given test and
// labels ordered from the oldest to the most recent ones.
//
// This function is called initially to determine benchmark instances to compare. Then, the metric-specific operation
// fetches time series with finer (second) granularity.
//
// NB: Due to Prometheus limit, this function fetches time series in the past 183 hours only.
func getCoarseBenchmarkInstances(ctx context.Context, test string, labels map[string]string) (model.Matrix, error) {
v1api := prometheusAPI.NewAPI(client)
r := prometheusAPI.Range{
// XXX: Hardcoded max resolution in Prometheus is 11,000 points or ~183 hours with minute resolution.
Start: time.Now().Add(-183 * time.Hour),
End: time.Now(),
Step: time.Minute,
}
ls := model.LabelSet{
"job": metrics.MetricsJobTestRunner,
metrics.MetricsLabelTest: model.LabelValue(test),
}
for k, v := range labels {
ls[model.LabelName(k)] = model.LabelValue(v)
}
query := fmt.Sprintf("max(%s %s) by (%s) == 1.0", metrics.MetricUp, ls.String(), metrics.MetricsLabelInstance)
result, warnings, err := v1api.QueryRange(ctx, query, r)
if err != nil {
cmpLogger.Error("error querying Prometheus", "err", err)
os.Exit(1)
}
if len(warnings) > 0 {
cmpLogger.Warn("warnings while querying Prometheus", "warnings", warnings)
}
// Go through all obtained time series and order them by the timestamp of the first sample.
sort.Slice(result.(model.Matrix), func(i, j int) bool {
return result.(model.Matrix)[i].Values[0].Timestamp < result.(model.Matrix)[j].Values[0].Timestamp
})
return result.(model.Matrix), nil
}
// instanceNames extracts instance names from given Prometheus time series matrix.
func instanceNames(ts model.Matrix) []string {
var names []string
for _, t := range ts {
names = append(names, instanceName(t))
}
return names
}
// instanceName returns the instance name label of the given sample.
func instanceName(s *model.SampleStream) string {
return string(s.Metric[metrics.MetricsLabelInstance])
}
// fetchAndCompare fetches the given metric from prometheus and compares the results.
//
// Returns false, if metric-specific ratios are exceeded or there is a problem obtaining time series. Otherwise true.
func fetchAndCompare(ctx context.Context, m string, test string, sInstance *model.SampleStream, tInstance *model.SampleStream) (succ bool) {
getMetric := allMetrics[m].getter
succ = true
sAvg, sMax, err := getMetric(ctx, test, sInstance)
if err != nil {
cmpLogger.Error("error fetching source benchmark instance", "metric", m, "test", test, "instance", instanceName(sInstance), "err", err)
return false
}
tAvg, tMax, err := getMetric(ctx, test, tInstance)
if err != nil {
cmpLogger.Error("error fetching target test instance", "metric", m, "test", test, "instance", instanceName(sInstance), "err", err)
return false
}
// Compare average and max metric values and log error, if they exceed or don't reach required ratios.
maxAvgRatio := allMetrics[m].maxThresholdAvgRatio
maxMaxRatio := allMetrics[m].maxThresholdMaxRatio
minAvgRatio := allMetrics[m].minThresholdAvgRatio
minMaxRatio := allMetrics[m].minThresholdMaxRatio
cmpLogger.Info("obtained average ratio", "metric", m, "test", test, "source_avg", sAvg, "target_avg", tAvg, "ratio", sAvg/tAvg)
if maxAvgRatio != 0 && sAvg/tAvg > maxAvgRatio {
cmpLogger.Error("average metric value exceeds max allowed ratio", "metric", m, "test", test, "source_avg", sAvg, "target_avg", tAvg, "ratio", sAvg/tAvg, "max_allowed_avg_ratio", maxAvgRatio)
succ = false
}
if minAvgRatio != 0 && sAvg/tAvg < minAvgRatio {
cmpLogger.Error("average metric value doesn't reach min required ratio", "metric", m, "test", test, "source_avg", sAvg, "target_avg", tAvg, "ratio", sAvg/tAvg, "min_required_avg_ratio", minAvgRatio)
succ = false
}
cmpLogger.Info("obtained max ratio", "metric", m, "test", test, "source_max", sMax, "target_max", tMax, "ratio", sMax/tMax)
if maxMaxRatio != 0 && sMax/tMax > maxMaxRatio {
cmpLogger.Error("maximum metric value exceeds max ratio", "metric", m, "test", test, "source_max", sMax, "target_max", tMax, "ratio", sMax/tMax, "max_allowed_max_ratio", maxMaxRatio)
succ = false
}
if minMaxRatio != 0 && sMax/tMax < maxMaxRatio {
cmpLogger.Error("maximum metric value doesn't reach min required ratio", "metric", m, "test", test, "source_max", sMax, "target_max", tMax, "ratio", sMax/tMax, "min_required_max_ratio", maxMaxRatio)
succ = false
}
return
}
func initCmpLogger() error {
var logFmt logging.Format
if err := logFmt.Set(viper.GetString(common.CfgLogFmt)); err != nil {
return fmt.Errorf("root: failed to set log format: %w", err)
}
var logLevel logging.Level
if err := logLevel.Set(viper.GetString(common.CfgLogLevel)); err != nil {
return fmt.Errorf("root: failed to set log level: %w", err)
}
if err := logging.Initialize(os.Stdout, logFmt, logLevel, nil); err != nil {
return fmt.Errorf("root: failed to initialize logging: %w", err)
}
cmpLogger = logging.GetLogger("cmd/cmp")
return nil
}
func runCmp(cmd *cobra.Command, args []string) {
if err := initCmpLogger(); err != nil {
fmt.Println(err)
os.Exit(1)
}
var err error
client, err = api.NewClient(api.Config{
Address: viper.GetString(metrics.CfgMetricsAddr),
})
if err != nil {
cmpLogger.Error("error creating client", "err", err)
os.Exit(1)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
tests := viper.GetStringSlice(common.CfgTest)
if len(tests) == 0 {
for _, s := range common.GetDefaultScenarios() {
tests = append(tests, s.Name())
}
}
succ := true
for _, test := range tests {
sLabels, tLabels := map[string]string{}, map[string]string{}
if viper.IsSet(cfgMetricsSourceGitBranch) {
sLabels[metrics.MetricsLabelGitBranch] = viper.GetString(cfgMetricsSourceGitBranch)
}
if viper.IsSet(cfgMetricsTargetGitBranch) {
tLabels[metrics.MetricsLabelGitBranch] = viper.GetString(cfgMetricsTargetGitBranch)
}
// Set other required Prometheus labels, if passed.
// TODO: Integrate test parameters and parameter set combinations if multiple values provided like we do in oasis-test-runner.
for k, v := range viper.GetStringMapString(metrics.CfgMetricsLabels) {
sLabels[k] = v
tLabels[k] = v
}
sInstances, err := getCoarseBenchmarkInstances(ctx, test, sLabels)
if err != nil {
cmpLogger.Error("error querying for source test instances", "err", err)
os.Exit(1)
}
sNames := instanceNames(sInstances)
tInstances, err := getCoarseBenchmarkInstances(ctx, test, tLabels)
if err != nil {
cmpLogger.Error("error querying for target test instances", "err", err)
os.Exit(1)
}
tNames := instanceNames(tInstances)
if len(sNames) == 0 {
cmpLogger.Info("test does not have any source benchmark instances to compare, ignoring", "test", test)
continue
}
if len(tNames) == 0 {
cmpLogger.Info("test does not have any target benchmark instances to compare, ignoring", "test", test)
continue
}
var sInstance, tInstance *model.SampleStream
if sNames[len(sNames)-1] != tNames[len(tNames)-1] {
// Benchmark instances differ e.g. because of different gitBranch.
sInstance = sInstances[len(sInstances)-1]
tInstance = tInstances[len(tInstances)-1]
} else {
// Last benchmark instances are equal, pick the pre-last one from the target instances.
if len(tNames) < 2 {
cmpLogger.Info("test has only one benchmark instance, ignoring", "test", test, "source_instances", sNames, "target_instances", tNames)
continue
}
sInstance = sInstances[len(sInstances)-1]
tInstance = tInstances[len(tInstances)-2]
}
cmpLogger.Info("obtained source and target instance", "test", test, "source_instance", instanceName(sInstance), "target_instance", instanceName(tInstance))
for _, m := range userMetrics {
// Don't put succ = succ && f oneliner here, because f won't get executed once succ = false.
fSucc := fetchAndCompare(ctx, m, test, sInstance, tInstance)
succ = succ && fSucc
}
}
if !succ {
os.Exit(1)
}
defer cancel()
}
// Register oasis-test-runner cmp sub-command and all of it's children.
func Register(parentCmd *cobra.Command) {
cmpFlags := flag.NewFlagSet("", flag.ContinueOnError)
var metricNames []string
for k := range allMetrics {
metricNames = append(metricNames, k)
cmpFlags.Float64Var(&allMetrics[k].maxThresholdAvgRatio, fmt.Sprintf("max_threshold.%s.avg_ratio", k), allMetrics[k].maxThresholdAvgRatio, fmt.Sprintf("maximum allowed ratio between average %s metrics", k))
cmpFlags.Float64Var(&allMetrics[k].maxThresholdMaxRatio, fmt.Sprintf("max_threshold.%s.max_ratio", k), allMetrics[k].maxThresholdMaxRatio, fmt.Sprintf("maximum allowed ratio between maximum %s metrics", k))
cmpFlags.Float64Var(&allMetrics[k].minThresholdAvgRatio, fmt.Sprintf("min_threshold.%s.avg_ratio", k), allMetrics[k].minThresholdAvgRatio, fmt.Sprintf("minimum required ratio between average %s metrics", k))
cmpFlags.Float64Var(&allMetrics[k].minThresholdMaxRatio, fmt.Sprintf("min_threshold.%s.max_ratio", k), allMetrics[k].minThresholdMaxRatio, fmt.Sprintf("minimum required ratio between maximum %s metrics", k))
}
cmpFlags.StringSliceVarP(&userMetrics, cfgMetrics, cfgMetricsP, metricNames, "metrics to compare")
cmpFlags.String(cfgMetricsSourceGitBranch, "", "(optional) git_branch label for the source benchmark instance")
cmpFlags.String(cfgMetricsTargetGitBranch, "", "(optional) git_branch label for the target benchmark instance")
cmpFlags.String(cfgMetricsNetDevice, "lo", "network device traffic to compare")
_ = viper.BindPFlags(cmpFlags)
cmpCmd.Flags().AddFlagSet(cmpFlags)
parentCmd.AddCommand(cmpCmd)
}