From d6854479365f0307560fa28e18e2bd0634b05229 Mon Sep 17 00:00:00 2001 From: dmitri-lerko Date: Wed, 6 Jul 2022 12:30:13 +0100 Subject: [PATCH] feat: add support for latency command parsing (#614) Add support for LATENCY LATEST, LATEST HISTORY command parsing. --- redis/redis.go | 24 ++++++++++++ redis/reply.go | 88 ++++++++++++++++++++++++++++++++++++++++++++ redis/reply_test.go | 89 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 201 insertions(+) diff --git a/redis/redis.go b/redis/redis.go index faf574e0..e3a3968c 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -187,3 +187,27 @@ type SlowLog struct { // ClientName is the name set via the CLIENT SETNAME command (4.0 only). ClientName string } + +// Latency represents a redis LATENCY LATEST. +type Latency struct { + // Name of the latest latency spike event. + Name string + + // Time of the latest latency spike for the event. + Time time.Time + + // Latest is the latest recorded latency for the named event. + Latest time.Duration + + // Max is the maximum latency for the named event. + Max time.Duration +} + +// LatencyHistory represents a redis LATENCY HISTORY. +type LatencyHistory struct { + // Time is the unix timestamp at which the event was processed. + Time time.Time + + // ExecutationTime is the amount of time needed for the command execution. + ExecutionTime time.Duration +} diff --git a/redis/reply.go b/redis/reply.go index 7a2b3fef..aabf5989 100644 --- a/redis/reply.go +++ b/redis/reply.go @@ -645,3 +645,91 @@ func SlowLogs(result interface{}, err error) ([]SlowLog, error) { } return logs, nil } + +// Latencies is a helper that parses the LATENCY LATEST command output and +// return the slice of Latency values. +func Latencies(result interface{}, err error) ([]Latency, error) { + rawLatencies, err := Values(result, err) + if err != nil { + return nil, err + } + + latencies := make([]Latency, len(rawLatencies)) + for i, e := range rawLatencies { + rawLatency, ok := e.([]interface{}) + if !ok { + return nil, fmt.Errorf("redigo: latencies element is not slice, got %T", e) + } + + var event Latency + if len(rawLatency) != 4 { + return nil, fmt.Errorf("redigo: latencies element has %d elements, expected 4", len(rawLatency)) + } + + event.Name, err = String(rawLatency[0], nil) + if err != nil { + return nil, fmt.Errorf("redigo: latencies element[0] is not a string: %w", err) + } + + timestamp, ok := rawLatency[1].(int64) + if !ok { + return nil, fmt.Errorf("redigo: latencies element[1] not an int64, got %T", rawLatency[1]) + } + + event.Time = time.Unix(timestamp, 0) + + latestDuration, ok := rawLatency[2].(int64) + if !ok { + return nil, fmt.Errorf("redigo: latencies element[2] not an int64, got %T", rawLatency[2]) + } + + event.Latest = time.Duration(latestDuration) * time.Millisecond + + maxDuration, ok := rawLatency[3].(int64) + if !ok { + return nil, fmt.Errorf("redigo: latencies element[3] not an int64, got %T", rawLatency[3]) + } + + event.Max = time.Duration(maxDuration) * time.Millisecond + + latencies[i] = event + } + + return latencies, nil +} + +// LatencyHistories is a helper that parse the LATENCY HISTORY command output and +// returns a LatencyHistory slice. +func LatencyHistories(result interface{}, err error) ([]LatencyHistory, error) { + rawLogs, err := Values(result, err) + if err != nil { + return nil, err + } + + latencyHistories := make([]LatencyHistory, len(rawLogs)) + for i, e := range rawLogs { + rawLog, ok := e.([]interface{}) + if !ok { + return nil, fmt.Errorf("redigo: latency history element is not an slice, got %T", e) + } + + var event LatencyHistory + timestamp, ok := rawLog[0].(int64) + if !ok { + return nil, fmt.Errorf("redigo: latency history element[0] not an int64, got %T", rawLog[0]) + } + + event.Time = time.Unix(timestamp, 0) + + duration, ok := rawLog[1].(int64) + if !ok { + return nil, fmt.Errorf("redigo: latency history element[1] not an int64, got %T", rawLog[1]) + } + + event.ExecutionTime = time.Duration(duration) * time.Millisecond + + latencyHistories[i] = event + } + + return latencyHistories, nil +} diff --git a/redis/reply_test.go b/redis/reply_test.go index 8f43828c..7362a36c 100644 --- a/redis/reply_test.go +++ b/redis/reply_test.go @@ -16,6 +16,7 @@ package redis_test import ( "fmt" + "github.com/stretchr/testify/require" "math" "reflect" "strconv" @@ -225,6 +226,94 @@ func TestSlowLog(t *testing.T) { } } +func TestLatency(t *testing.T) { + c, err := dial() + require.NoError(t, err) + defer c.Close() + + resultStr, err := redis.Strings(c.Do("CONFIG", "GET", "latency-monitor-threshold")) + require.NoError(t, err) + // LATENCY commands were added in 2.8.13 so might not be supported. + if len(resultStr) == 0 { + t.Skip("Latency commands not supported") + } + latencyMonitorThresholdOldCfg, err := strconv.Atoi(resultStr[1]) + require.NoError(t, err) + // Enable latency monitoring for events that take 1ms or longer. + result, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", "1") + // reset the old configuration after test. + defer func() { + res, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", latencyMonitorThresholdOldCfg) + require.NoError(t, err) + require.Equal(t, "OK", res) + }() + + require.NoError(t, err) + require.Equal(t, "OK", result) + + // Sleep for 1ms to register a slow event. + _, err = c.Do("DEBUG", "SLEEP", 0.001) + require.NoError(t, err) + + result, err = c.Do("LATENCY", "LATEST") + require.NoError(t, err) + + latestLatencies, err := redis.Latencies(result, err) + require.NoError(t, err) + + require.Equal(t, 1, len(latestLatencies)) + + latencyEvent := latestLatencies[0] + expected := redis.Latency{ + Name: "command", + Latest: time.Millisecond, + Max: time.Millisecond, + Time: latencyEvent.Time, + } + require.Equal(t, latencyEvent, expected) +} + +func TestLatencyHistories(t *testing.T) { + c, err := dial() + require.NoError(t, err) + defer c.Close() + + res, err := redis.Strings(c.Do("CONFIG", "GET", "latency-monitor-threshold")) + require.NoError(t, err) + + // LATENCY commands were added in 2.8.13 so might not be supported. + if len(res) == 0 { + t.Skip("Latency commands not supported") + } + latencyMonitorThresholdOldCfg, err := strconv.Atoi(res[1]) + require.NoError(t, err) + + // Enable latency monitoring for events that take 1ms or longer + result, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", "1") + // reset the old configuration after test. + defer func() { + res, err := c.Do("CONFIG", "SET", "latency-monitor-threshold", latencyMonitorThresholdOldCfg) + require.NoError(t, err) + require.Equal(t, "OK", res) + }() + require.NoError(t, err) + require.Equal(t, "OK", result) + + // Sleep for 1ms to register a slow event + _, err = c.Do("DEBUG", "SLEEP", 0.001) + require.NoError(t, err) + + result, err = c.Do("LATENCY", "HISTORY", "command") + require.NoError(t, err) + + latencyHistory, err := redis.LatencyHistories(result, err) + require.NoError(t, err) + + require.Len(t, latencyHistory, 1) + latencyEvent := latencyHistory[0] + require.Equal(t, time.Millisecond, latencyEvent.ExecutionTime) +} + // dial wraps DialDefaultServer() with a more suitable function name for examples. func dial() (redis.Conn, error) { return redis.DialDefaultServer()