Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add connection times histogram #626

Merged
merged 7 commits into from Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions README.md
Expand Up @@ -52,13 +52,13 @@ You can install from source:
The [releases](https://github.com/fortio/fortio/releases) page has binaries for many OS/architecture combinations (see assets).

```shell
curl -L https://github.com/fortio/fortio/releases/download/v1.37.1/fortio-linux_amd64-1.37.1.tgz \
curl -L https://github.com/fortio/fortio/releases/download/v1.38.0/fortio-linux_amd64-1.38.0.tgz \
| sudo tar -C / -xvzpf -
# or the debian package
wget https://github.com/fortio/fortio/releases/download/v1.37.1/fortio_1.37.1_amd64.deb
dpkg -i fortio_1.37.1_amd64.deb
wget https://github.com/fortio/fortio/releases/download/v1.38.0/fortio_1.38.0_amd64.deb
dpkg -i fortio_1.38.0_amd64.deb
# or the rpm
rpm -i https://github.com/fortio/fortio/releases/download/v1.37.1/fortio-1.37.1-1.x86_64.rpm
rpm -i https://github.com/fortio/fortio/releases/download/v1.38.0/fortio-1.38.0-1.x86_64.rpm
# and more, see assets in release page
```

Expand All @@ -68,7 +68,7 @@ On a MacOS you can also install Fortio using [Homebrew](https://brew.sh/):
brew install fortio
```

On Windows, download https://github.com/fortio/fortio/releases/download/v1.37.1/fortio_win_1.37.1.zip and extract `fortio.exe` to any location, then using the Windows Command Prompt:
On Windows, download https://github.com/fortio/fortio/releases/download/v1.38.0/fortio_win_1.38.0.zip and extract `fortio.exe` to any location, then using the Windows Command Prompt:
```
fortio.exe server
```
Expand Down Expand Up @@ -116,7 +116,7 @@ Full list of command line flags (`fortio help`):
<details>
<!-- use release/updateFlags.sh to update this section -->
<pre>
Φορτίο 1.37.1 usage:
Φορτίο 1.38.0 usage:
fortio command [flags] target
where command is one of: load (load testing), server (starts ui, rest api,
http-echo, redirect, proxies, tcp-echo and grpc ping servers), tcp-echo (only
Expand Down
42 changes: 28 additions & 14 deletions fhttp/http_client.go
Expand Up @@ -45,9 +45,9 @@ type Fetcher interface {
Fetch() (int, []byte, int)
// Close() cleans up connections and state - must be paired with NewClient calls.
Close()
// GetIPAddress() returns the occurrence of ip address used by this client connection.
// and how many sockets have been used
GetIPAddress() (*stats.Occurrence, int)
// GetIPAddress() returns the occurrence of ip address used by this client connection,
// and the connection time histogram (which includes the count).
GetIPAddress() (*stats.Occurrence, *stats.Histogram)
}

const (
Expand Down Expand Up @@ -90,6 +90,9 @@ func (h *HTTPOptions) Init(url string) *HTTPOptions {
log.Warnf("Invalid timeout %v, setting to %v", h.HTTPReqTimeOut, HTTPReqTimeOutDefaultValue)
h.HTTPReqTimeOut = HTTPReqTimeOutDefaultValue
}
if h.Resolution <= 0 {
h.Resolution = 0.001
}
h.URLSchemeCheck()
return h
}
Expand Down Expand Up @@ -185,6 +188,10 @@ type HTTPOptions struct {
ConnReuseRange [2]int // range of max number of connection to reuse for each thread.
// When false, re-resolve the DNS name when the connection breaks.
NoResolveEachConn bool
// Optional Offset Duration; to offset the histogram of the Connection duration
Offset time.Duration
// Optional resolution divider for the Connection duration histogram. In seconds. Defaults to 0.001 or 1 millisecond.
Resolution float64
}

// ResetHeaders resets all the headers, including the User-Agent: one (and the Host: logical special header).
Expand Down Expand Up @@ -354,8 +361,8 @@ type Client struct {
bodyContainsUUID bool // if body contains the "{uuid}" pattern (lowercase)
logErrors bool
id int
socketCount int
ipAddrUsage *stats.Occurrence
connectStats *stats.Histogram
}

// Close cleans up any resources used by NewStdClient.
Expand Down Expand Up @@ -443,9 +450,9 @@ func (c *Client) Fetch() (int, []byte, int) {
return code, data, 0
}

// GetIPAddress get the ip address that DNS resolves to when using stdClient.
func (c *Client) GetIPAddress() (*stats.Occurrence, int) {
return c.ipAddrUsage, c.socketCount
// GetIPAddress get the ip address that DNS resolves to when using stdClient and connection stats.
func (c *Client) GetIPAddress() (*stats.Occurrence, *stats.Histogram) {
return c.ipAddrUsage, c.connectStats
}

// NewClient creates either a standard or fast client (depending on
Expand Down Expand Up @@ -483,6 +490,8 @@ func NewStdClient(o *HTTPOptions) (*Client, error) {
id: o.ID,
logErrors: o.LogErrors,
ipAddrUsage: stats.NewOccurrence(),
// Keep track of timing for connection (re)establishment.
connectStats: stats.NewHistogram(o.Offset.Seconds(), o.Resolution),
}

tr := http.Transport{
Expand All @@ -497,21 +506,20 @@ func NewStdClient(o *HTTPOptions) (*Client, error) {
addr = o.Resolve + addr[strings.LastIndex(addr, ":"):]
}
var conn net.Conn
now := time.Now()
conn, err = (&net.Dialer{
Timeout: o.HTTPReqTimeOut,
}).DialContext(ctx, network, addr)

client.connectStats.Record(time.Since(now).Seconds())
if conn != nil {
newRemoteAddress := conn.RemoteAddr().String()
// No change when it wasn't set before (first time) and when the value isn't actually changing either.
if req.RemoteAddr != "" && newRemoteAddress != req.RemoteAddr {
log.Infof("[%d] Standard client IP address changed from %s to %s", client.id, req.RemoteAddr, newRemoteAddress)
}
req.RemoteAddr = newRemoteAddress
client.socketCount++
client.ipAddrUsage.Record(req.RemoteAddr)
}

return conn, err
},
TLSHandshakeTimeout: o.HTTPReqTimeOut,
Expand Down Expand Up @@ -561,7 +569,7 @@ type FastClient struct {
req []byte
dest net.Addr
socket net.Conn
socketCount int
socketCount int // number of sockets attempts, same as the new connectStats.Count() + DNS errors if any.
size int
code int
errorCount int
Expand All @@ -588,11 +596,12 @@ type FastClient struct {
connReuseRange [2]int
connReuse int
reuseCount int
connectStats *stats.Histogram
}

// GetIPAddress get ip address that DNS resolved to when using fast client.
func (c *FastClient) GetIPAddress() (*stats.Occurrence, int) {
return c.ipAddrUsage, c.socketCount
// GetIPAddress get ip address that DNS resolved to when using fast client and connection stats.
func (c *FastClient) GetIPAddress() (*stats.Occurrence, *stats.Histogram) {
return c.ipAddrUsage, c.connectStats
}

// Close cleans up any resources used by FastClient.
Expand Down Expand Up @@ -657,6 +666,8 @@ func NewFastClient(o *HTTPOptions) (Fetcher, error) { //nolint:funlen
http10: o.HTTP10, halfClose: o.AllowHalfClose, logErrors: o.LogErrors, id: o.ID,
https: o.https, connReuseRange: o.ConnReuseRange, connReuse: connReuse,
resolve: o.Resolve, noResolveEachConn: o.NoResolveEachConn, ipAddrUsage: stats.NewOccurrence(),
// Keep track of timing for connection (re)establishment.
connectStats: stats.NewHistogram(o.Offset.Seconds(), o.Resolution),
}
if o.https {
bc.tlsConfig, err = o.TLSOptions.TLSClientConfig()
Expand Down Expand Up @@ -751,14 +762,17 @@ func (c *FastClient) connect() net.Conn {
}

d := &net.Dialer{Timeout: c.reqTimeout}
now := time.Now()
if c.https {
socket, err = tls.DialWithDialer(d, c.dest.Network(), c.dest.String(), c.tlsConfig)
c.connectStats.Record(time.Since(now).Seconds())
if err != nil {
log.Errf("[%d] Unable to TLS connect to %v : %v", c.id, c.dest, err)
return nil
}
} else {
socket, err = d.Dial(c.dest.Network(), c.dest.String())
c.connectStats.Record(time.Since(now).Seconds())
if err != nil {
log.Errf("[%d] Unable to connect to %v : %v", c.id, c.dest, err)
return nil
Expand Down
5 changes: 5 additions & 0 deletions fhttp/http_loglevel_test.go
Expand Up @@ -32,3 +32,8 @@ func TestDebugMode(t *testing.T) {
TestNoFirstChunkSizeInitially(t)
TestFetchAndOnBehalfOf(t)
}

func TesWarningMode(t *testing.T) {
log.SetLogLevel(log.Warning)
TestHTTPRunner(t)
}
30 changes: 24 additions & 6 deletions fhttp/httprunner.go
Expand Up @@ -45,8 +45,10 @@ type HTTPRunnerResults struct {
HTTPOptions
Sizes *stats.HistogramData
HeaderSizes *stats.HistogramData
Sockets []int
SocketCount int
Sockets []int64
SocketCount int64
// Connection Time stats
ConnectionStats *stats.HistogramData
// http code to abort the run on (-1 for connection or other socket error)
AbortOn int
aborter *periodic.Aborter
Expand Down Expand Up @@ -85,7 +87,7 @@ type HTTPRunnerOptions struct {

// RunHTTPTest runs an http test and returns the aggregated stats.
//
//nolint:funlen, gocognit, gocyclo
//nolint:funlen, gocognit, gocyclo, maintidx
func RunHTTPTest(o *HTTPRunnerOptions) (*HTTPRunnerResults, error) {
o.RunType = "HTTP"
warmupMode := "parallel"
Expand All @@ -100,6 +102,12 @@ func RunHTTPTest(o *HTTPRunnerOptions) (*HTTPRunnerResults, error) {
log.Infof("Starting http test for %s with %d threads at %.1f qps and %s warmup%s",
o.URL, o.NumThreads, o.QPS, warmupMode, connReuseMsg)
r := periodic.NewPeriodicRunner(&o.RunnerOptions)
if o.HTTPOptions.Resolution <= 0 {
// Set both connect histogram params when Resolution isn't set explicitly on the HTTP options
// (that way you can set the offet to 0 in connect and to something else for the call)
o.HTTPOptions.Resolution = r.Options().Resolution
o.HTTPOptions.Offset = r.Options().Offset
}
defer r.Options().Abort()
numThreads := r.Options().NumThreads // can change during run for c > 2 n
o.HTTPOptions.Init(o.URL)
Expand Down Expand Up @@ -188,17 +196,20 @@ func RunHTTPTest(o *HTTPRunnerOptions) (*HTTPRunnerResults, error) {
fm.Close()
_, _ = fmt.Fprintf(out, "Wrote profile data to %s.{cpu|mem}\n", o.Profiler)
}
// Connection stats, aggregated
connectionStats := stats.NewHistogram(o.HTTPOptions.Offset.Seconds(), o.HTTPOptions.Resolution)
// Numthreads may have reduced:
numThreads = total.RunnerResults.NumThreads
// But we also must cleanup all the created clients.
keys := []int{}
fmt.Fprintf(out, "# Socket and IP used for each connection:\n")
for i := 0; i < numThreads; i++ {
// Get the report on the IP address each thread use to send traffic
occurrence, currentSocketUsed := httpstate[i].client.GetIPAddress()
occurrence, connStats := httpstate[i].client.GetIPAddress()
currentSocketUsed := connStats.Count
httpstate[i].client.Close()
fmt.Fprintf(out, "[%d] %3d socket used, resolved to %s\n", i, currentSocketUsed, occurrence.PrintAndAggregate(total.IPCountMap))

fmt.Fprintf(out, "[%d] %3d socket used, resolved to %s ", i, currentSocketUsed, occurrence.PrintAndAggregate(total.IPCountMap))
connStats.Counter.Print(out, "connection timing")
total.SocketCount += currentSocketUsed
total.Sockets = append(total.Sockets, currentSocketUsed)
// Q: is there some copying each time stats[i] is used?
Expand All @@ -210,6 +221,13 @@ func RunHTTPTest(o *HTTPRunnerOptions) (*HTTPRunnerResults, error) {
}
total.sizes.Transfer(httpstate[i].sizes)
total.headerSizes.Transfer(httpstate[i].headerSizes)
connectionStats.Transfer(connStats)
}
total.ConnectionStats = connectionStats.Export().CalcPercentiles(o.Percentiles)
if log.Log(log.Info) {
total.ConnectionStats.Print(out, "Connection time histogram (s)")
} else if log.Log(log.Warning) {
connectionStats.Counter.Print(out, "Connection time (s)")
}

// Sort the ip address form largest to smallest based on its usage count
Expand Down
10 changes: 5 additions & 5 deletions fhttp/httprunner_test.go
Expand Up @@ -58,7 +58,7 @@ func TestHTTPRunner(t *testing.T) {
if totalReq != httpOk {
t.Errorf("Mismatch between requests %d and ok %v", totalReq, res.RetCodes)
}
if res.SocketCount != res.RunnerResults.NumThreads {
if res.SocketCount != int64(res.RunnerResults.NumThreads) {
t.Errorf("%d socket used, expected same as thread# %d", res.SocketCount, res.RunnerResults.NumThreads)
}
count := getIPUsageCount(res.IPCountMap)
Expand Down Expand Up @@ -126,7 +126,7 @@ func testHTTPNotLeaking(t *testing.T, opts *HTTPRunnerOptions) {
if ngAfter > ngBefore2+8 {
t.Errorf("Goroutines after test %d, expected it to stay near %d", ngAfter, ngBefore2)
}
if res.SocketCount != res.RunnerResults.NumThreads {
if res.SocketCount != int64(res.RunnerResults.NumThreads) {
t.Errorf("%d socket used, expected same as thread# %d", res.SocketCount, res.RunnerResults.NumThreads)
}
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func testClosingAndSocketCount(t *testing.T, o *HTTPRunnerOptions) {
if totalReq != httpOk {
t.Errorf("Mismatch between requests %d and ok %v", totalReq, res.RetCodes)
}
if int64(res.SocketCount) != numReq {
if res.SocketCount != numReq {
t.Errorf("When closing, got %d while expected as many sockets as requests %d", res.SocketCount, numReq)
}
}
Expand Down Expand Up @@ -502,14 +502,14 @@ func TestConnectionReuseRange(t *testing.T) {
t.Error(err)
}

if res.SocketCount != (int)(expectedSocketReuse) {
if res.SocketCount != (int64)(expectedSocketReuse) {
t.Errorf("Expecting %f socket to be used, got %d", expectedSocketReuse, res.SocketCount)
}
}

// Test when connection reuse range min != max.
// The actual socket count should always be 2 as the connection reuse range varies between 5 and 9.
expectedSocketReuse := 2
expectedSocketReuse := int64(2)
opts.ConnReuseRange = [2]int{5, 9}
// Check a few times that despite the range and random 2-9 we still always get 2 connections
for i := 0; i < 5; i++ {
Expand Down
6 changes: 4 additions & 2 deletions periodic/periodic.go
Expand Up @@ -171,9 +171,11 @@ type RunnerOptions struct {
// Note that this actually maps to gorountines and not actual threads
// but threads seems like a more familiar name to use for non go users
// and in a benchmarking context
NumThreads int
NumThreads int
// List of percentiles to calculate.
Percentiles []float64
Resolution float64
// Divider to apply to duration data in seconds. Defaults to 0.001 or 1 millisecond.
Resolution float64
// Where to write the textual version of the results, defaults to stdout
Out io.Writer `json:"-"`
// Extra data to be copied back to the results (to be saved/JSON serialized)
Expand Down
2 changes: 1 addition & 1 deletion rapi/restHandler_test.go
Expand Up @@ -117,7 +117,7 @@ func TestHTTPRunnerRESTApi(t *testing.T) {
if totalReq != httpOk {
t.Errorf("Mismatch between requests %d and ok %v (%+v)", totalReq, res.RetCodes, res)
}
if res.SocketCount != res.RunnerResults.NumThreads {
if res.SocketCount != int64(res.RunnerResults.NumThreads) {
t.Errorf("%d socket used, expected same as thread# %d", res.SocketCount, res.RunnerResults.NumThreads)
}

Expand Down
2 changes: 1 addition & 1 deletion stats/stats.go
Expand Up @@ -196,7 +196,7 @@ type HistogramData struct {
Avg float64
StdDev float64
Data []Bucket
Percentiles []Percentile
Percentiles []Percentile `json:"Percentiles,omitempty"`
}

// NewHistogram creates a new histogram (sets up the buckets).
Expand Down