-
Notifications
You must be signed in to change notification settings - Fork 11
/
stream.go
143 lines (123 loc) · 3.56 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package gtrs
import (
"context"
"strconv"
"time"
"github.com/dranikpg/gtrs/gtrsconvert"
"github.com/redis/go-redis/v9"
)
var NoExpiration = time.Duration(0)
var NoMaxLen = int64(0)
// now is defined here so it can be overridden in unit tests
var now = time.Now
// Stream represents a redis stream with messages of type T.
type Stream[T any] struct {
client redis.Cmdable
stream string
ttl time.Duration
maxLen int64
approx bool
}
type Options struct {
// TTL is an optional parameter to specify how long entries stay in the stream before expiring,
// it only only works as expected when a non-custom id is used to Add a message.
// The default is No Expiration.
// Note that TTL is performed when messages are Added, so Range requests won't clean up old messages.
TTL time.Duration
// MaxLen is an optional parameter to specify the maximum length of the stream.
MaxLen int64
// Approx causes MaxLen and TTL to be approximate instead of exact.
Approx bool
}
// NewStream create a new stream with messages of type T.
// Options are optional (the parameter can be nil to use defaults).
func NewStream[T any](client redis.Cmdable, stream string, opt *Options) Stream[T] {
var approx bool
maxLen := NoMaxLen
ttl := NoExpiration
if opt != nil {
ttl = opt.TTL
maxLen = opt.MaxLen
approx = opt.Approx
}
return Stream[T]{client: client, stream: stream, ttl: ttl, maxLen: maxLen, approx: approx}
}
// Key returns the redis stream key.
func (s Stream[T]) Key() string {
return s.stream
}
// Add a message to the stream. Calls XADD.
func (s Stream[T]) Add(ctx context.Context, v T, idarg ...string) (string, error) {
id := ""
if len(idarg) > 0 {
id = idarg[0]
}
var maxLen int64
if s.maxLen > NoMaxLen {
maxLen = s.maxLen
}
minID := ""
if s.ttl > NoExpiration {
minID = strconv.Itoa(int(now().Add(-s.ttl).UnixMilli()))
}
vals, err := gtrsconvert.StructToMap(v)
if err != nil {
return "", err
}
id, err = s.client.XAdd(ctx, &redis.XAddArgs{
Stream: s.stream,
Values: vals,
ID: id,
MinID: minID,
MaxLen: maxLen,
Approx: s.approx,
}).Result()
if err != nil {
err = ReadError{Err: err}
}
return id, err
}
// Range returns a portion of the stream. Calls XRANGE.
func (s Stream[T]) Range(ctx context.Context, from, to string, count ...int64) ([]Message[T], error) {
var redisSlice []redis.XMessage
var err error
if len(count) == 0 {
redisSlice, err = s.client.XRange(ctx, s.stream, from, to).Result()
} else {
redisSlice, err = s.client.XRangeN(ctx, s.stream, from, to, count[0]).Result()
}
if err != nil {
return nil, ReadError{Err: err}
}
msgs := make([]Message[T], len(redisSlice))
for i, msg := range redisSlice {
msgs[i] = toMessage[T](msg, s.stream)
}
return msgs, nil
}
// RevRange returns a portion of the stream in reverse order compared to Range. Calls XREVRANGE.
func (s Stream[T]) RevRange(ctx context.Context, from, to string, count ...int64) ([]Message[T], error) {
var redisSlice []redis.XMessage
var err error
if len(count) == 0 {
redisSlice, err = s.client.XRevRange(ctx, s.stream, from, to).Result()
} else {
redisSlice, err = s.client.XRevRangeN(ctx, s.stream, from, to, count[0]).Result()
}
if err != nil {
return nil, ReadError{Err: err}
}
msgs := make([]Message[T], len(redisSlice))
for i, msg := range redisSlice {
msgs[i] = toMessage[T](msg, s.stream)
}
return msgs, nil
}
// Len returns the current stream length. Calls XLEN.
func (s Stream[T]) Len(ctx context.Context) (int64, error) {
len, err := s.client.XLen(ctx, s.stream).Result()
if err != nil {
err = ReadError{Err: err}
}
return len, err
}