Skip to content

Commit

Permalink
feat: add support for latency command parsing (#614)
Browse files Browse the repository at this point in the history
Add support for LATENCY LATEST, LATEST HISTORY command parsing.
  • Loading branch information
dmitri-lerko committed Jul 6, 2022
1 parent f1e923c commit d685447
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 0 deletions.
24 changes: 24 additions & 0 deletions redis/redis.go
Expand Up @@ -187,3 +187,27 @@ type SlowLog struct {
// ClientName is the name set via the CLIENT SETNAME command (4.0 only).
ClientName string
}

// Latency represents a redis LATENCY LATEST.
type Latency struct {
// Name of the latest latency spike event.
Name string

// Time of the latest latency spike for the event.
Time time.Time

// Latest is the latest recorded latency for the named event.
Latest time.Duration

// Max is the maximum latency for the named event.
Max time.Duration
}

// LatencyHistory represents a redis LATENCY HISTORY.
type LatencyHistory struct {
// Time is the unix timestamp at which the event was processed.
Time time.Time

// ExecutationTime is the amount of time needed for the command execution.
ExecutionTime time.Duration
}
88 changes: 88 additions & 0 deletions redis/reply.go
Expand Up @@ -645,3 +645,91 @@ func SlowLogs(result interface{}, err error) ([]SlowLog, error) {
}
return logs, nil
}

// Latencies is a helper that parses the LATENCY LATEST command output and
// return the slice of Latency values.
func Latencies(result interface{}, err error) ([]Latency, error) {
rawLatencies, err := Values(result, err)
if err != nil {
return nil, err
}

latencies := make([]Latency, len(rawLatencies))
for i, e := range rawLatencies {
rawLatency, ok := e.([]interface{})
if !ok {
return nil, fmt.Errorf("redigo: latencies element is not slice, got %T", e)
}

var event Latency
if len(rawLatency) != 4 {
return nil, fmt.Errorf("redigo: latencies element has %d elements, expected 4", len(rawLatency))
}

event.Name, err = String(rawLatency[0], nil)
if err != nil {
return nil, fmt.Errorf("redigo: latencies element[0] is not a string: %w", err)
}

timestamp, ok := rawLatency[1].(int64)
if !ok {
return nil, fmt.Errorf("redigo: latencies element[1] not an int64, got %T", rawLatency[1])
}

event.Time = time.Unix(timestamp, 0)

latestDuration, ok := rawLatency[2].(int64)
if !ok {
return nil, fmt.Errorf("redigo: latencies element[2] not an int64, got %T", rawLatency[2])
}

event.Latest = time.Duration(latestDuration) * time.Millisecond

maxDuration, ok := rawLatency[3].(int64)
if !ok {
return nil, fmt.Errorf("redigo: latencies element[3] not an int64, got %T", rawLatency[3])
}

event.Max = time.Duration(maxDuration) * time.Millisecond

latencies[i] = event
}

return latencies, nil
}

// LatencyHistories is a helper that parse the LATENCY HISTORY command output and
// returns a LatencyHistory slice.
func LatencyHistories(result interface{}, err error) ([]LatencyHistory, error) {
rawLogs, err := Values(result, err)
if err != nil {
return nil, err
}

latencyHistories := make([]LatencyHistory, len(rawLogs))
for i, e := range rawLogs {
rawLog, ok := e.([]interface{})
if !ok {
return nil, fmt.Errorf("redigo: latency history element is not an slice, got %T", e)
}

var event LatencyHistory
timestamp, ok := rawLog[0].(int64)
if !ok {
return nil, fmt.Errorf("redigo: latency history element[0] not an int64, got %T", rawLog[0])
}

event.Time = time.Unix(timestamp, 0)

duration, ok := rawLog[1].(int64)
if !ok {
return nil, fmt.Errorf("redigo: latency history element[1] not an int64, got %T", rawLog[1])
}

event.ExecutionTime = time.Duration(duration) * time.Millisecond

latencyHistories[i] = event
}

return latencyHistories, nil
}
89 changes: 89 additions & 0 deletions redis/reply_test.go
Expand Up @@ -16,6 +16,7 @@ package redis_test

import (
"fmt"
"github.com/stretchr/testify/require"
"math"
"reflect"
"strconv"
Expand Down Expand Up @@ -225,6 +226,94 @@ func TestSlowLog(t *testing.T) {
}
}

func TestLatency(t *testing.T) {
c, err := dial()
require.NoError(t, err)
defer c.Close()

resultStr, err := redis.Strings(c.Do("CONFIG", "GET", "latency-monitor-threshold"))
require.NoError(t, err)
// LATENCY commands were added in 2.8.13 so might not be supported.
if len(resultStr) == 0 {
t.Skip("Latency commands not supported")
}
latencyMonitorThresholdOldCfg, err := strconv.Atoi(resultStr[1])
require.NoError(t, err)
// Enable latency monitoring for events that take 1ms or longer.
result, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", "1")
// reset the old configuration after test.
defer func() {
res, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", latencyMonitorThresholdOldCfg)
require.NoError(t, err)
require.Equal(t, "OK", res)
}()

require.NoError(t, err)
require.Equal(t, "OK", result)

// Sleep for 1ms to register a slow event.
_, err = c.Do("DEBUG", "SLEEP", 0.001)
require.NoError(t, err)

result, err = c.Do("LATENCY", "LATEST")
require.NoError(t, err)

latestLatencies, err := redis.Latencies(result, err)
require.NoError(t, err)

require.Equal(t, 1, len(latestLatencies))

latencyEvent := latestLatencies[0]
expected := redis.Latency{
Name: "command",
Latest: time.Millisecond,
Max: time.Millisecond,
Time: latencyEvent.Time,
}
require.Equal(t, latencyEvent, expected)
}

func TestLatencyHistories(t *testing.T) {
c, err := dial()
require.NoError(t, err)
defer c.Close()

res, err := redis.Strings(c.Do("CONFIG", "GET", "latency-monitor-threshold"))
require.NoError(t, err)

// LATENCY commands were added in 2.8.13 so might not be supported.
if len(res) == 0 {
t.Skip("Latency commands not supported")
}
latencyMonitorThresholdOldCfg, err := strconv.Atoi(res[1])
require.NoError(t, err)

// Enable latency monitoring for events that take 1ms or longer
result, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", "1")
// reset the old configuration after test.
defer func() {
res, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", latencyMonitorThresholdOldCfg)
require.NoError(t, err)
require.Equal(t, "OK", res)
}()
require.NoError(t, err)
require.Equal(t, "OK", result)

// Sleep for 1ms to register a slow event
_, err = c.Do("DEBUG", "SLEEP", 0.001)
require.NoError(t, err)

result, err = c.Do("LATENCY", "HISTORY", "command")
require.NoError(t, err)

latencyHistory, err := redis.LatencyHistories(result, err)
require.NoError(t, err)

require.Len(t, latencyHistory, 1)
latencyEvent := latencyHistory[0]
require.Equal(t, time.Millisecond, latencyEvent.ExecutionTime)
}

// dial wraps DialDefaultServer() with a more suitable function name for examples.
func dial() (redis.Conn, error) {
return redis.DialDefaultServer()
Expand Down

0 comments on commit d685447

Please sign in to comment.