forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
overload_tpcc_olap.go
134 lines (125 loc) · 4.06 KB
/
overload_tpcc_olap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package main
import (
"context"
"fmt"
"strings"
"time"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
// tpccOlapQuery is a contrived query that seems to do serious damage to a
// cluster. The query itself is a hash join with a selective filter and a
// limited sort.
const tpccOlapQuery = `SELECT
i_id, s_w_id, s_quantity, i_price
FROM
stock JOIN item ON s_i_id = i_id
WHERE
s_quantity < 100 AND i_price > 90
ORDER BY
i_price DESC, s_quantity ASC
LIMIT
100;`
type tpccOLAPSpec struct {
Nodes int
CPUs int
Warehouses int
Concurrency int
}
func (s tpccOLAPSpec) run(ctx context.Context, t *test, c *cluster) {
crdbNodes, workloadNode := setupTPCC(ctx, t, c, s.Warehouses, false /* zfs */, nil /* versions */)
const queryFileName = "queries.sql"
// querybench expects the entire query to be on a single line.
queryLine := `"` + strings.Replace(tpccOlapQuery, "\n", " ", -1) + `"`
c.Run(ctx, workloadNode, "echo", queryLine, "> "+queryFileName)
t.Status("waiting")
m := newMonitor(ctx, c, crdbNodes)
rampDuration := 2 * time.Minute
duration := 3 * time.Minute
m.Go(func(ctx context.Context) error {
t.WorkerStatus("running querybench")
cmd := fmt.Sprintf(
"./workload run querybench --db tpcc"+
" --tolerate-errors=t"+
" --concurrency=%d"+
" --query-file %s"+
" --histograms="+perfArtifactsDir+"/stats.json "+
" --ramp=%s --duration=%s {pgurl:1-%d}",
s.Concurrency, queryFileName, rampDuration, duration, c.spec.NodeCount-1)
c.Run(ctx, workloadNode, cmd)
return nil
})
m.Wait()
verifyNodeLiveness(ctx, c, t, duration)
}
// Check that node liveness did not fail more than maxFailures times across
// all of the nodes.
func verifyNodeLiveness(ctx context.Context, c *cluster, t *test, runDuration time.Duration) {
const maxFailures = 10
adminURLs := c.ExternalAdminUIAddr(ctx, c.Node(1))
now := timeutil.Now()
var response tspb.TimeSeriesQueryResponse
// Retry because timeseries queries can fail if the underlying inter-node
// connections are in a failed state which can happen due to overload.
// Now that the load has stopped, this should resolve itself soon.
if err := retry.WithMaxAttempts(ctx, retry.Options{
MaxBackoff: 500 * time.Millisecond,
}, 3, func() (err error) {
response, err = getMetrics(adminURLs[0], now.Add(-runDuration), now, []tsQuery{
{
name: "cr.node.liveness.heartbeatfailures",
queryType: total,
},
})
return err
}); err != nil {
t.Fatalf("failed to fetch liveness metrics: %v", err)
}
if len(response.Results[0].Datapoints) <= 1 {
t.Fatalf("not enough datapoints in timeseries query response: %+v", response)
}
datapoints := response.Results[0].Datapoints
finalCount := datapoints[len(datapoints)-1].Value
initialCount := datapoints[0].Value
if failures := finalCount - initialCount; failures > maxFailures {
t.Fatalf("Node liveness failed %d times, expected no more than %d",
failures, maxFailures)
} else {
t.logger().Printf("Node liveness failed %d times which is fewer than %d",
failures, maxFailures)
}
}
func registerTPCCOverloadSpec(r *testRegistry, s tpccOLAPSpec) {
name := fmt.Sprintf("overload/tpcc_olap/nodes=%d/cpu=%d/w=%d/c=%d",
s.Nodes, s.CPUs, s.Warehouses, s.Concurrency)
r.Add(testSpec{
Name: name,
Cluster: makeClusterSpec(s.Nodes+1, cpu(s.CPUs)),
Run: s.run,
MinVersion: "v19.2.0",
Timeout: 20 * time.Minute,
})
}
func registerOverload(r *testRegistry) {
specs := []tpccOLAPSpec{
{
CPUs: 8,
Concurrency: 96,
Nodes: 3,
Warehouses: 50,
},
}
for _, s := range specs {
registerTPCCOverloadSpec(r, s)
}
}