Skip to content

Commit

Permalink
Merge pull request #40 from interchainio/feature/csv-stats
Browse files Browse the repository at this point in the history
Add ability to output aggregate statistics to CSV file
  • Loading branch information
thanethomson committed Jan 21, 2020
2 parents 7176e08 + 80f1f96 commit 870ff06
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 22 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,10 @@
# Changelog

## v0.7.0
* [\#39](https://github.com/interchainio/tm-load-test/issues/39) - Add basic
aggregate statistics output to CSV file.
* Added integration test for standalone execution happy path.

## v0.6.2
* [\#37](https://github.com/interchainio/tm-load-test/pull/37) - Fix average
transaction throughput rate in reporting/metrics.
Expand Down
31 changes: 31 additions & 0 deletions README.md
Expand Up @@ -147,6 +147,37 @@ The following kinds of metrics are made available here:
* Standard Prometheus-provided metrics about the garbage collector in
`tm-load-test`

## Aggregate Statistics
As of `tm-load-test` v0.7.0, one can now write simple aggregate statistics to
a CSV file once testing completes by specifying the `--stats-output` flag:

```bash
# In standalone mode
tm-load-test -c 1 -T 10 -r 1000 -s 250 \
--broadcast-tx-method async \
--endpoints ws://tm-endpoint1.somewhere.com:26657/websocket,ws://tm-endpoint2.somewhere.com:26657/websocket \
--stats-output /path/to/save/stats.csv

# From the master in master/slave mode
tm-load-test \
master \
--expect-slaves 2 \
--bind localhost:26670 \
-c 1 -T 10 -r 1000 -s 250 \
--broadcast-tx-method async \
--endpoints ws://tm-endpoint1.somewhere.com:26657/websocket,ws://tm-endpoint2.somewhere.com:26657/websocket \
--stats-output /path/to/save/stats.csv
```

The output CSV file has the following format at present:

```csv
Parameter,Value,Units
total_time,10.002,seconds
total_txs,9000,count
avg_tx_rate,899.818398,transactions per second
```

## Development
To run the linter and the tests:

Expand Down
5 changes: 3 additions & 2 deletions pkg/loadtest/cli.go
Expand Up @@ -12,7 +12,7 @@ import (
)

// CLIVersion must be manually updated as new versions are released.
const CLIVersion = "v0.6.2"
const CLIVersion = "v0.7.0"

// cliVersionCommitID must be set through linker settings. See
// https://stackoverflow.com/a/11355611/1156132 for details.
Expand Down Expand Up @@ -44,7 +44,7 @@ func buildCLI(cli *CLIConfig, logger logging.Logger) *cobra.Command {
os.Exit(1)
}

if err := executeLoadTest(cfg); err != nil {
if err := ExecuteStandalone(cfg); err != nil {
os.Exit(1)
}
},
Expand All @@ -63,6 +63,7 @@ func buildCLI(cli *CLIConfig, logger logging.Logger) *cobra.Command {
rootCmd.PersistentFlags().IntVar(&cfg.MaxEndpoints, "max-endpoints", 0, "The maximum number of endpoints to use for testing, where 0 means unlimited")
rootCmd.PersistentFlags().IntVar(&cfg.PeerConnectTimeout, "peer-connect-timeout", 600, "The number of seconds to wait for all required peers to connect if expect-peers > 0")
rootCmd.PersistentFlags().IntVar(&cfg.MinConnectivity, "min-peer-connectivity", 0, "The minimum number of peers to which each peer must be connected before starting the load test")
rootCmd.PersistentFlags().StringVar(&cfg.StatsOutputFile, "stats-output", "", "Where to store aggregate statistics (in CSV format) for the load test")
rootCmd.PersistentFlags().BoolVarP(&flagVerbose, "verbose", "v", false, "Increase output logging verbosity to DEBUG level")

var masterCfg MasterConfig
Expand Down
2 changes: 2 additions & 0 deletions pkg/loadtest/config.go
Expand Up @@ -34,6 +34,8 @@ type Config struct {
MaxEndpoints int `json:"max_endpoints"` // The maximum number of endpoints to use for load testing. Set to 0 by default (no maximum).
MinConnectivity int `json:"min_connectivity"` // The minimum number of peers to which each peer must be connected before starting the load test. Set to 0 by default (no minimum).
PeerConnectTimeout int `json:"peer_connect_timeout"` // The maximum time to wait (in seconds) for all peers to connect, if ExpectPeers > 0.
StatsOutputFile string `json:"stats_output_file"` // Where to store the final aggregate statistics file (in CSV format).
NoTrapInterrupts bool `json:"no_trap_interrupts"` // Should we avoid trapping Ctrl+Break? Only relevant for standalone execution mode.
}

// MasterConfig is the configuration options specific to a master node.
Expand Down
142 changes: 138 additions & 4 deletions pkg/loadtest/integration_test.go
@@ -1,11 +1,15 @@
package loadtest_test

import (
"encoding/csv"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
"os"
"path"
"strconv"
"strings"
"testing"
Expand All @@ -18,6 +22,21 @@ import (

const totalTxsPerSlave = 50

type aggregateStats struct {
totalTime float64
totalTxs int
avgTxRate float64
}

func (s *aggregateStats) String() string {
return fmt.Sprintf(
"aggregateStats{totalTime: %.3f, totalTxs: %d, avgTxRate: %.3f}",
s.totalTime,
s.totalTxs,
s.avgTxRate,
)
}

func TestMasterSlaveHappyPath(t *testing.T) {
app := kvstore.NewKVStoreApplication()
node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig)
Expand All @@ -28,7 +47,14 @@ func TestMasterSlaveHappyPath(t *testing.T) {
t.Fatal(err)
}

cfg := testConfig()
tempDir, err := ioutil.TempDir("", "tmloadtest-masterslavehappypath")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tempDir)

expectedTotalTxs := totalTxsPerSlave * 2
cfg := testConfig(tempDir)
masterCfg := loadtest.MasterConfig{
BindAddr: fmt.Sprintf("localhost:%d", freePort),
ExpectSlaves: 2,
Expand Down Expand Up @@ -116,8 +142,8 @@ func TestMasterSlaveHappyPath(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if txCount != (totalTxsPerSlave * 2) {
t.Fatalf("Expected %d transactions to have been recorded by the master, but got %d", totalTxsPerSlave, txCount)
if txCount != expectedTotalTxs {
t.Fatalf("Expected %d transactions to have been recorded by the master, but got %d", expectedTotalTxs, txCount)
}
}
}
Expand All @@ -127,6 +153,60 @@ func TestMasterSlaveHappyPath(t *testing.T) {
if !metricsTested {
t.Fatal("Expected to have tested Prometheus metrics, but did not")
}

// ensure the aggregate stats were generated and computed correctly
stats, err := parseStats(cfg.StatsOutputFile)
if err != nil {
t.Fatal("Failed to parse output stats", err)
}
t.Logf("Got aggregate statistics from CSV: %v", stats)
if stats.totalTxs != expectedTotalTxs {
t.Fatalf("Expected %d transactions to have been recorded in aggregate stats, but got %d", expectedTotalTxs, stats.totalTxs)
}
if !floatsEqualWithTolerance(stats.avgTxRate, float64(stats.totalTxs)/stats.totalTime, 0.1) {
t.Fatalf(
"Average transaction rate (%.3f) does not compute from total time (%.3f) and total transactions (%d)",
stats.avgTxRate,
stats.totalTime,
stats.totalTxs,
)
}
}

func TestStandaloneHappyPath(t *testing.T) {
app := kvstore.NewKVStoreApplication()
node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig)
defer rpctest.StopTendermint(node)

tempDir, err := ioutil.TempDir("", "tmloadtest-standalonehappypath")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tempDir)

expectedTotalTxs := totalTxsPerSlave
cfg := testConfig(tempDir)
if err := loadtest.ExecuteStandalone(cfg); err != nil {
t.Fatal(err)
}

// ensure the aggregate stats were generated and computed correctly
stats, err := parseStats(cfg.StatsOutputFile)
if err != nil {
t.Fatal("Failed to parse output stats", err)
}
t.Logf("Got aggregate statistics from CSV: %v", stats)
if stats.totalTxs != expectedTotalTxs {
t.Fatalf("Expected %d transactions to have been recorded in aggregate stats, but got %d", expectedTotalTxs, stats.totalTxs)
}
if !floatsEqualWithTolerance(stats.avgTxRate, float64(stats.totalTxs)/stats.totalTime, 0.1) {
t.Fatalf(
"Average transaction rate (%.3f) does not compute from total time (%.3f) and total transactions (%d)",
stats.avgTxRate,
stats.totalTime,
stats.totalTxs,
)
}
}

func getRPCAddress() string {
Expand All @@ -137,7 +217,7 @@ func getRPCAddress() string {
return fmt.Sprintf("ws://localhost:%s/websocket", listenURL.Port())
}

func testConfig() loadtest.Config {
func testConfig(tempDir string) loadtest.Config {
return loadtest.Config{
ClientFactory: "kvstore",
Connections: 1,
Expand All @@ -149,6 +229,8 @@ func testConfig() loadtest.Config {
BroadcastTxMethod: "async",
Endpoints: []string{getRPCAddress()},
EndpointSelectMethod: loadtest.SelectSuppliedEndpoints,
StatsOutputFile: path.Join(tempDir, "stats.csv"),
NoTrapInterrupts: true,
}
}

Expand All @@ -165,3 +247,55 @@ func getFreePort() (int, error) {
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}

func parseStats(filename string) (*aggregateStats, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()

reader := csv.NewReader(f)
records, err := reader.ReadAll()
if err != nil {
return nil, err
}

if len(records) < 3 {
return nil, fmt.Errorf("expected at least 3 records in aggregate stats CSV, but got %d", len(records))
}
stats := &aggregateStats{}
for _, record := range records {
if len(record) > 0 {
if len(record) < 3 {
return nil, fmt.Errorf("expected at least 3 columns for each non-empty row in aggregate stats CSV")
}
switch record[0] {
case "total_txs":
totalTxs, err := strconv.ParseInt(record[1], 10, 32)
if err != nil {
return nil, err
}
stats.totalTxs = int(totalTxs)

case "total_time":
stats.totalTime, err = strconv.ParseFloat(record[1], 64)
if err != nil {
return nil, err
}

case "avg_tx_rate":
stats.avgTxRate, err = strconv.ParseFloat(record[1], 64)
if err != nil {
return nil, err
}
}
}
}

return stats, nil
}

func floatsEqualWithTolerance(a, b, tolerance float64) bool {
return math.Abs(a-b) < tolerance
}
24 changes: 20 additions & 4 deletions pkg/loadtest/loadtest.go
Expand Up @@ -6,7 +6,8 @@ import (
"github.com/interchainio/tm-load-test/internal/logging"
)

func executeLoadTest(cfg Config) error {
// ExecuteStandalone will run a standalone (non-master/slave) load test.
func ExecuteStandalone(cfg Config) error {
logger := logging.NewLogrusLogger("loadtest")

// if we need to wait for the network to stabilize first
Expand Down Expand Up @@ -35,14 +36,29 @@ func executeLoadTest(cfg Config) error {
logger.Info("Initiating load test")
tg.Start()

// we want to know if the user hits Ctrl+Break
cancelTrap := trapInterrupts(func() { tg.Cancel() }, logger)
defer close(cancelTrap)
var cancelTrap chan struct{}
if !cfg.NoTrapInterrupts {
// we want to know if the user hits Ctrl+Break
cancelTrap = trapInterrupts(func() { tg.Cancel() }, logger)
defer close(cancelTrap)
} else {
logger.Debug("Skipping trapping of interrupts (e.g. Ctrl+Break)")
}

if err := tg.Wait(); err != nil {
logger.Error("Failed to execute load test", "err", err)
return err
}

// if we need to write the final statistics
if len(cfg.StatsOutputFile) > 0 {
logger.Info("Writing aggregate statistics", "outputFile", cfg.StatsOutputFile)
if err := tg.WriteAggregateStats(cfg.StatsOutputFile); err != nil {
logger.Error("Failed to write aggregate statistics", "err", err)
return err
}
}

logger.Info("Load test complete!")
return nil
}

0 comments on commit 870ff06

Please sign in to comment.