Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support LATENCY LATEST, LATEST HISTORY command parsing #614

Merged
merged 15 commits into from Jul 6, 2022
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Expand Up @@ -16,7 +16,7 @@ jobs:
uses: golangci/golangci-lint-action@v2
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.36
version: v1.46.2

# Optional: working directory, useful for monorepos
# working-directory: somedir
Expand Down
24 changes: 24 additions & 0 deletions redis/redis.go
Expand Up @@ -187,3 +187,27 @@ type SlowLog struct {
// ClientName is the name set via the CLIENT SETNAME command (4.0 only).
ClientName string
}

// Latency represents a redis LATENCY LATEST.
type Latency struct {
// Latency event type.
EventType string
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved

// Time of the latest latency spike for the event.
Time time.Time

// Latest is the latest recorded latency for the named event.
Latest time.Duration

// Max is the maximum latency for the named event.
Max time.Duration
}

// LatencyHistory represents a redis LATENCY HISTORY.
type LatencyHistory struct {
// Time is the unix timestamp at which the event was processed.
Time time.Time

// ExecutationTime is the amount of time needed for the command execution.
ExecutionTime time.Duration
}
85 changes: 85 additions & 0 deletions redis/reply.go
Expand Up @@ -645,3 +645,88 @@ func SlowLogs(result interface{}, err error) ([]SlowLog, error) {
}
return logs, nil
}

// Latencies is a helper that parses the LATENCY LATEST command output and
// return the slice of Latency values.
func Latencies(result interface{}, err error) ([]Latency, error) {
rawLatencies, err := Values(result, err)
if err != nil {
return nil, err
}
latencies := make([]Latency, len(rawLatencies))
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
for i, e := range rawLatencies {
rawLatency, ok := e.([]interface{})
if !ok {
return nil, fmt.Errorf("redigo: rawLatency element is not an array, got %T", e)
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

var event Latency
if len(rawLatency) != 4 {
return nil, fmt.Errorf("redigo: LATENCY LATEST element has %d elements, expected 4", len(rawLatency))
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

event.EventType, err = String(rawLatency[0], nil)
if err != nil {
return nil, fmt.Errorf("redigo: LATENCY LATEST element[0] is not a string: %w", err)
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

timestamp, ok := rawLatency[1].(int64)
if !ok {
return nil, fmt.Errorf("redigo: LATENCY LATEST element[1] not an int64, got %T", rawLatency[1])
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

event.Time = time.Unix(timestamp, 0)

latestDuration, ok := rawLatency[2].(int64)
if !ok {
return nil, fmt.Errorf("redigo: LATENCY LATEST element[2] not an int64, got %T", rawLatency[2])
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

event.Latest = time.Duration(latestDuration) * time.Millisecond

maxDuration, ok := rawLatency[3].(int64)
if !ok {
return nil, fmt.Errorf("redigo: LATENCY LATEST element[3] not an int64, got %T", rawLatency[3])
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

event.Max = time.Duration(maxDuration) * time.Millisecond

latencies[i] = event
}
return latencies, nil
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
}

// LatencyHistories is a helper that parse the LATENCY HISTORY command output and
// return the array of LatencyHistory.
stevenh marked this conversation as resolved.
Show resolved Hide resolved
func LatencyHistories(result interface{}, err error) ([]LatencyHistory, error) {
rawLogs, err := Values(result, err)
if err != nil {
return nil, err
}
latencyHistories := make([]LatencyHistory, len(rawLogs))
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
for i, e := range rawLogs {
rawLog, ok := e.([]interface{})
if !ok {
return nil, fmt.Errorf("LATENCY HISTORY: latencyHistory element is not an array, got %T", e)
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

var event LatencyHistory

timestamp, ok := rawLog[0].(int64)
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
return nil, fmt.Errorf("redigo: LATENCY HISTORY element[0] not an int64, got %T", rawLog[0])
stevenh marked this conversation as resolved.
Show resolved Hide resolved
}

event.Time = time.Unix(timestamp, 0)

duration, ok := rawLog[1].(int64)
if !ok {
return nil, fmt.Errorf("redigo: LATENCY HISTORY element[1] not an int64, got %T", rawLog[1])
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
}

event.ExecutionTime = time.Duration(duration) * time.Millisecond

latencyHistories[i] = event
}
return latencyHistories, nil
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
}
76 changes: 76 additions & 0 deletions redis/reply_test.go
Expand Up @@ -16,6 +16,7 @@ package redis_test

import (
"fmt"
"github.com/stretchr/testify/require"
"math"
"reflect"
"strconv"
Expand Down Expand Up @@ -225,6 +226,81 @@ func TestSlowLog(t *testing.T) {
}
}

func TestLatency(t *testing.T) {
c, err := dial()
require.NoError(t, err, "TestLatency failed during dial.")
defer c.Close()

resultStr, err := redis.Strings(c.Do("CONFIG", "GET", "latency-monitor-threshold"))
require.NoError(t, err, "TestLatency failed during CONFIG GET latency-monitor-threshold.")
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
// LATENCY commands were added in 2.8.13 so might not be supported.
if len(resultStr) == 0 {
t.Skip("Latency commands not supported")
}
latencyMonitorThresholdOldCfg, err := strconv.Atoi(resultStr[1])
require.NoError(t, err, "TestLatency failed during strconv.Atoi.")
// Enable latency monitoring for events that take 1ms or longer.
result, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", "1")
require.NoError(t, err)
require.Equal(t, "OK", result, "TestLatency failed during CONFIG SET")

// Sleep for 1ms to register a slow event.
_, err = c.Do("DEBUG", "SLEEP", 0.001)
require.NoError(t, err, "TestLatency failed during DEBUG SLEEP 0.001")
result, err = c.Do("LATENCY", "LATEST")
require.NoError(t, err, "TestLatency failed during LATENCY LATEST")
latestLatencies, err := redis.Latencies(result, err)
require.NoError(t, err, "TestLatency failed during redis.Latencies")
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved

require.Equal(t, 1, len(latestLatencies))
latencyEvent := latestLatencies[0]
require.Equal(t, "command", latencyEvent.EventType)
require.Equal(t, time.Millisecond, latencyEvent.Latest)
require.Equal(t, time.Millisecond, latencyEvent.Max)
stevenh marked this conversation as resolved.
Show resolved Hide resolved

// reset the old configuration after test.
result, err = c.Do("CONFIG", "SET", "latency-monitor-threshold", latencyMonitorThresholdOldCfg)
require.NoError(t, err)
require.Equal(t, "OK", result, "TestLatency failed during CONFIG SET")
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
}

func TestLatencyHistories(t *testing.T) {
c, err := dial()
require.NoError(t, err, "TestLatencyHistories failed during dial")
defer c.Close()

resultStr, err := redis.Strings(c.Do("CONFIG", "GET", "latency-monitor-threshold"))
require.NoError(t, err, "TestLatencyHistories failed during CONFIG GET latency-monitor-threshold")
// LATENCY commands were added in 2.8.13 so might not be supported.
if len(resultStr) == 0 {
t.Skip("Latency commands not supported")
}
latencyMonitorThresholdOldCfg, err := strconv.Atoi(resultStr[1])
require.NoError(t, err, "TestLatencyHistories failed during strconv.Atoi")
// Enable latency monitoring for events that take 1ms or longer
result, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", "1")
require.NoError(t, err)
require.Equal(t, "OK", result)

// Sleep for 1ms to register a slow event
_, err = c.Do("DEBUG", "SLEEP", 0.001)
require.NoError(t, err, "TestLatencyHistories failed during DEBUG SLEEP 0.001")

result, err = c.Do("LATENCY", "HISTORY", "command")
require.NoError(t, err, "TestLatencyHistories failed during LATENCY HISTORY command")
latencyHistory, err := redis.LatencyHistories(result, err)
require.NoError(t, err, "TestLatencyHistories failed during redis.LatencyHistories")

require.Len(t, latencyHistory, 1)
latencyEvent := latencyHistory[0]
require.Equal(t, time.Millisecond, latencyEvent.ExecutionTime)

// reset the old configuration after test
result, err = c.Do("CONFIG", "SET", "latency-monitor-threshold", latencyMonitorThresholdOldCfg)
require.NoError(t, err)
require.Equal(t, "OK", result, "TestLatencyHistories failed during CONFIG SET")
}

// dial wraps DialDefaultServer() with a more suitable function name for examples.
func dial() (redis.Conn, error) {
return redis.DialDefaultServer()
Expand Down
2 changes: 1 addition & 1 deletion redis/test_test.go
Expand Up @@ -39,7 +39,7 @@ var (

serverPath = flag.String("redis-server", "redis-server", "Path to redis server binary")
serverAddress = flag.String("redis-address", "127.0.0.1", "The address of the server")
serverBasePort = flag.Int("redis-port", 16379, "Beginning of port range for test servers")
serverBasePort = flag.Int("redis-port", 16112, "Beginning of port range for test servers")
dmitri-lerko marked this conversation as resolved.
Show resolved Hide resolved
serverLogName = flag.String("redis-log", "", "Write Redis server logs to `filename`")
serverLog = ioutil.Discard

Expand Down