Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support LATENCY LATEST, LATEST HISTORY command parsing #614

Merged
merged 15 commits into from Jul 6, 2022
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Expand Up @@ -16,7 +16,7 @@ jobs:
uses: golangci/golangci-lint-action@v2
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.36
version: v1.46.2

# Optional: working directory, useful for monorepos
# working-directory: somedir
Expand Down
24 changes: 24 additions & 0 deletions redis/redis.go
Expand Up @@ -187,3 +187,27 @@ type SlowLog struct {
// ClientName is the name set via the CLIENT SETNAME command (4.0 only).
ClientName string
}

// Latency represents a redis LATENCY LATEST.
type Latency struct {
// Name of the latest latency spike event.
Name string

// Time of the latest latency spike for the event.
Time time.Time

// Latest is the latest recorded latency for the named event.
Latest time.Duration

// Max is the maximum latency for the named event.
Max time.Duration
}

// LatencyHistory represents a redis LATENCY HISTORY.
type LatencyHistory struct {
// Time is the unix timestamp at which the event was processed.
Time time.Time

// ExecutationTime is the amount of time needed for the command execution.
ExecutionTime time.Duration
}
88 changes: 88 additions & 0 deletions redis/reply.go
Expand Up @@ -645,3 +645,91 @@ func SlowLogs(result interface{}, err error) ([]SlowLog, error) {
}
return logs, nil
}

// Latencies is a helper that parses the LATENCY LATEST command output and
// return the slice of Latency values.
func Latencies(result interface{}, err error) ([]Latency, error) {
rawLatencies, err := Values(result, err)
if err != nil {
return nil, err
}

latencies := make([]Latency, len(rawLatencies))
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
for i, e := range rawLatencies {
rawLatency, ok := e.([]interface{})
if !ok {
return nil, fmt.Errorf("redigo: rawLatency element is not an array, got %T", e)
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

var event Latency
if len(rawLatency) != 4 {
return nil, fmt.Errorf("redigo: LATENCY LATEST element has %d elements, expected 4", len(rawLatency))
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

event.Name, err = String(rawLatency[0], nil)
if err != nil {
return nil, fmt.Errorf("redigo: LATENCY LATEST element[0] is not a string: %w", err)
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

timestamp, ok := rawLatency[1].(int64)
if !ok {
return nil, fmt.Errorf("redigo: LATENCY LATEST element[1] not an int64, got %T", rawLatency[1])
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

event.Time = time.Unix(timestamp, 0)

latestDuration, ok := rawLatency[2].(int64)
if !ok {
return nil, fmt.Errorf("redigo: LATENCY LATEST element[2] not an int64, got %T", rawLatency[2])
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

event.Latest = time.Duration(latestDuration) * time.Millisecond

maxDuration, ok := rawLatency[3].(int64)
if !ok {
return nil, fmt.Errorf("redigo: LATENCY LATEST element[3] not an int64, got %T", rawLatency[3])
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

event.Max = time.Duration(maxDuration) * time.Millisecond

latencies[i] = event
}

return latencies, nil
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
}

// LatencyHistories is a helper that parse the LATENCY HISTORY command output and
// return the array of LatencyHistory.
stevenh marked this conversation as resolved.
Show resolved Hide resolved
func LatencyHistories(result interface{}, err error) ([]LatencyHistory, error) {
rawLogs, err := Values(result, err)
if err != nil {
return nil, err
}

latencyHistories := make([]LatencyHistory, len(rawLogs))
for i, e := range rawLogs {
rawLog, ok := e.([]interface{})
if !ok {
return nil, fmt.Errorf("LATENCY HISTORY: latencyHistory element is not an array, got %T", e)
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

var event LatencyHistory
timestamp, ok := rawLog[0].(int64)
if !ok {
return nil, fmt.Errorf("redigo: LATENCY HISTORY element[0] not an int64, got %T", rawLog[0])
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

event.Time = time.Unix(timestamp, 0)

duration, ok := rawLog[1].(int64)
if !ok {
return nil, fmt.Errorf("redigo: LATENCY HISTORY element[1] not an int64, got %T", rawLog[1])
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
}

event.ExecutionTime = time.Duration(duration) * time.Millisecond

latencyHistories[i] = event
}

return latencyHistories, nil
}
89 changes: 89 additions & 0 deletions redis/reply_test.go
Expand Up @@ -16,6 +16,7 @@ package redis_test

import (
"fmt"
"github.com/stretchr/testify/require"
"math"
"reflect"
"strconv"
Expand Down Expand Up @@ -225,6 +226,94 @@ func TestSlowLog(t *testing.T) {
}
}

func TestLatency(t *testing.T) {
c, err := dial()
require.NoError(t, err)
defer c.Close()

resultStr, err := redis.Strings(c.Do("CONFIG", "GET", "latency-monitor-threshold"))
require.NoError(t, err)
// LATENCY commands were added in 2.8.13 so might not be supported.
if len(resultStr) == 0 {
t.Skip("Latency commands not supported")
}
latencyMonitorThresholdOldCfg, err := strconv.Atoi(resultStr[1])
require.NoError(t, err)
// Enable latency monitoring for events that take 1ms or longer.
result, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", "1")
// reset the old configuration after test.
defer func() {
res, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", latencyMonitorThresholdOldCfg)
require.NoError(t, err)
require.Equal(t, "OK", res)
}()

require.NoError(t, err)
require.Equal(t, "OK", result)

// Sleep for 1ms to register a slow event.
_, err = c.Do("DEBUG", "SLEEP", 0.001)
require.NoError(t, err)

result, err = c.Do("LATENCY", "LATEST")
require.NoError(t, err)

latestLatencies, err := redis.Latencies(result, err)
require.NoError(t, err)

require.Equal(t, 1, len(latestLatencies))

latencyEvent := latestLatencies[0]
expected := redis.Latency{
Name: "command",
Latest: time.Millisecond,
Max: time.Millisecond,
Time: latencyEvent.Time,
}
require.Equal(t, latencyEvent, expected)
}

func TestLatencyHistories(t *testing.T) {
c, err := dial()
require.NoError(t, err)
defer c.Close()

res, err := redis.Strings(c.Do("CONFIG", "GET", "latency-monitor-threshold"))
require.NoError(t, err)

// LATENCY commands were added in 2.8.13 so might not be supported.
if len(res) == 0 {
t.Skip("Latency commands not supported")
}
latencyMonitorThresholdOldCfg, err := strconv.Atoi(res[1])
require.NoError(t, err)

// Enable latency monitoring for events that take 1ms or longer
result, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", "1")
// reset the old configuration after test.
defer func() {
res, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", latencyMonitorThresholdOldCfg)
require.NoError(t, err)
require.Equal(t, "OK", res)
}()
require.NoError(t, err)
require.Equal(t, "OK", result)

// Sleep for 1ms to register a slow event
_, err = c.Do("DEBUG", "SLEEP", 0.001)
require.NoError(t, err)

result, err = c.Do("LATENCY", "HISTORY", "command")
require.NoError(t, err)

latencyHistory, err := redis.LatencyHistories(result, err)
require.NoError(t, err)

require.Len(t, latencyHistory, 1)
latencyEvent := latencyHistory[0]
require.Equal(t, time.Millisecond, latencyEvent.ExecutionTime)
}

// dial wraps DialDefaultServer() with a more suitable function name for examples.
func dial() (redis.Conn, error) {
return redis.DialDefaultServer()
Expand Down