Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support LATENCY LATEST, LATEST HISTORY command parsing #614

Merged
merged 15 commits into from Jul 6, 2022
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Expand Up @@ -16,7 +16,7 @@ jobs:
uses: golangci/golangci-lint-action@v2
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.36
version: v1.46.2

# Optional: working directory, useful for monorepos
# working-directory: somedir
Expand Down
24 changes: 24 additions & 0 deletions redis/redis.go
Expand Up @@ -187,3 +187,27 @@ type SlowLog struct {
// ClientName is the name set via the CLIENT SETNAME command (4.0 only).
ClientName string
}

// Latency represents a redis LATENCY LATEST.
type Latency struct {
// Name of the latest latency spike event.
Name string

// Time of the latest latency spike for the event.
Time time.Time

// Latest is the latest recorded latency for the named event.
Latest time.Duration

// Max is the maximum latency for the named event.
Max time.Duration
}

// LatencyHistory represents a redis LATENCY HISTORY.
type LatencyHistory struct {
// Time is the unix timestamp at which the event was processed.
Time time.Time

// ExecutationTime is the amount of time needed for the command execution.
ExecutionTime time.Duration
}
88 changes: 88 additions & 0 deletions redis/reply.go
Expand Up @@ -645,3 +645,91 @@ func SlowLogs(result interface{}, err error) ([]SlowLog, error) {
}
return logs, nil
}

// Latencies is a helper that parses the LATENCY LATEST command output and
// return the slice of Latency values.
func Latencies(result interface{}, err error) ([]Latency, error) {
rawLatencies, err := Values(result, err)
if err != nil {
return nil, err
}

latencies := make([]Latency, len(rawLatencies))
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
for i, e := range rawLatencies {
rawLatency, ok := e.([]interface{})
if !ok {
return nil, fmt.Errorf("redigo: latencies element is not slice, got %T", e)
}

var event Latency
if len(rawLatency) != 4 {
return nil, fmt.Errorf("redigo: latencies element has %d elements, expected 4", len(rawLatency))
}

event.Name, err = String(rawLatency[0], nil)
if err != nil {
return nil, fmt.Errorf("redigo: latencies element[0] is not a string: %w", err)
}

timestamp, ok := rawLatency[1].(int64)
if !ok {
return nil, fmt.Errorf("redigo: latencies element[1] not an int64, got %T", rawLatency[1])
}

event.Time = time.Unix(timestamp, 0)

latestDuration, ok := rawLatency[2].(int64)
if !ok {
return nil, fmt.Errorf("redigo: latencies element[2] not an int64, got %T", rawLatency[2])
}

event.Latest = time.Duration(latestDuration) * time.Millisecond

maxDuration, ok := rawLatency[3].(int64)
if !ok {
return nil, fmt.Errorf("redigo: latencies element[3] not an int64, got %T", rawLatency[3])
}

event.Max = time.Duration(maxDuration) * time.Millisecond

latencies[i] = event
}

return latencies, nil
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
}

// LatencyHistories is a helper that parse the LATENCY HISTORY command output and
// returns a LatencyHistory slice.
func LatencyHistories(result interface{}, err error) ([]LatencyHistory, error) {
rawLogs, err := Values(result, err)
if err != nil {
return nil, err
}

latencyHistories := make([]LatencyHistory, len(rawLogs))
for i, e := range rawLogs {
rawLog, ok := e.([]interface{})
if !ok {
return nil, fmt.Errorf("redigo: latency history element is not an slice, got %T", e)
}

var event LatencyHistory
timestamp, ok := rawLog[0].(int64)
if !ok {
return nil, fmt.Errorf("redigo: latency history element[0] not an int64, got %T", rawLog[0])
}

event.Time = time.Unix(timestamp, 0)

duration, ok := rawLog[1].(int64)
if !ok {
return nil, fmt.Errorf("redigo: latency history element[1] not an int64, got %T", rawLog[1])
}

event.ExecutionTime = time.Duration(duration) * time.Millisecond

latencyHistories[i] = event
}

return latencyHistories, nil
}
89 changes: 89 additions & 0 deletions redis/reply_test.go
Expand Up @@ -16,6 +16,7 @@ package redis_test

import (
"fmt"
"github.com/stretchr/testify/require"
"math"
"reflect"
"strconv"
Expand Down Expand Up @@ -225,6 +226,94 @@ func TestSlowLog(t *testing.T) {
}
}

func TestLatency(t *testing.T) {
c, err := dial()
require.NoError(t, err)
defer c.Close()

resultStr, err := redis.Strings(c.Do("CONFIG", "GET", "latency-monitor-threshold"))
require.NoError(t, err)
// LATENCY commands were added in 2.8.13 so might not be supported.
if len(resultStr) == 0 {
t.Skip("Latency commands not supported")
}
latencyMonitorThresholdOldCfg, err := strconv.Atoi(resultStr[1])
require.NoError(t, err)
// Enable latency monitoring for events that take 1ms or longer.
result, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", "1")
// reset the old configuration after test.
defer func() {
res, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", latencyMonitorThresholdOldCfg)
require.NoError(t, err)
require.Equal(t, "OK", res)
}()

require.NoError(t, err)
require.Equal(t, "OK", result)

// Sleep for 1ms to register a slow event.
_, err = c.Do("DEBUG", "SLEEP", 0.001)
require.NoError(t, err)

result, err = c.Do("LATENCY", "LATEST")
require.NoError(t, err)

latestLatencies, err := redis.Latencies(result, err)
require.NoError(t, err)

require.Equal(t, 1, len(latestLatencies))

latencyEvent := latestLatencies[0]
expected := redis.Latency{
Name: "command",
Latest: time.Millisecond,
Max: time.Millisecond,
Time: latencyEvent.Time,
}
require.Equal(t, latencyEvent, expected)
}

func TestLatencyHistories(t *testing.T) {
c, err := dial()
require.NoError(t, err)
defer c.Close()

res, err := redis.Strings(c.Do("CONFIG", "GET", "latency-monitor-threshold"))
require.NoError(t, err)

// LATENCY commands were added in 2.8.13 so might not be supported.
if len(res) == 0 {
t.Skip("Latency commands not supported")
}
latencyMonitorThresholdOldCfg, err := strconv.Atoi(res[1])
require.NoError(t, err)

// Enable latency monitoring for events that take 1ms or longer
result, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", "1")
// reset the old configuration after test.
defer func() {
res, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", latencyMonitorThresholdOldCfg)
require.NoError(t, err)
require.Equal(t, "OK", res)
}()
require.NoError(t, err)
require.Equal(t, "OK", result)

// Sleep for 1ms to register a slow event
_, err = c.Do("DEBUG", "SLEEP", 0.001)
require.NoError(t, err)

result, err = c.Do("LATENCY", "HISTORY", "command")
require.NoError(t, err)

latencyHistory, err := redis.LatencyHistories(result, err)
require.NoError(t, err)

require.Len(t, latencyHistory, 1)
latencyEvent := latencyHistory[0]
require.Equal(t, time.Millisecond, latencyEvent.ExecutionTime)
}

// dial wraps DialDefaultServer() with a more suitable function name for examples.
func dial() (redis.Conn, error) {
return redis.DialDefaultServer()
Expand Down