/
stream.rb
82 lines (70 loc) · 2.18 KB
/
stream.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
require 'forwardable'
require 'set'
require 'date'
require 'mock_redis/stream/id'
class MockRedis
class Stream
include Enumerable
extend Forwardable
attr_accessor :members
def_delegators :members, :empty?
def initialize
@members = Set.new
@last_id = nil
end
def last_id
@last_id.to_s
end
def add(id, values)
@last_id = MockRedis::Stream::Id.new(id, min: @last_id)
if @last_id.to_s == '0-0'
raise Redis::CommandError,
'ERR The ID specified in XADD is equal or smaller than ' \
'the target stream top item'
# TOOD: Redis version 6.0.4, w redis 4.2.1 generates the following error message:
# 'ERR The ID specified in XADD must be greater than 0-0'
end
members.add [@last_id, Hash[values.map { |k, v| [k.to_s, v.to_s] }]]
@last_id.to_s
end
def trim(count)
deleted = @members.size - count
if deleted > 0
@members = if count == 0
Set.new
else
@members.to_a[-count..-1].to_set
end
deleted
else
0
end
end
def range(start, finish, reversed, *opts_in)
opts = options opts_in, ['count']
start_id = MockRedis::Stream::Id.new(start)
finish_id = MockRedis::Stream::Id.new(finish, sequence: Float::INFINITY)
items = members
.select { |m| (start_id <= m[0]) && (finish_id >= m[0]) }
.map { |m| [m[0].to_s, m[1]] }
items.reverse! if reversed
return items.first(opts['count'].to_i) if opts.key?('count')
items
end
def read(id)
stream_id = MockRedis::Stream::Id.new(id)
members.select { |m| (stream_id < m[0]) }.map { |m| [m[0].to_s, m[1]] }
end
def each
members.each { |m| yield m }
end
private
def options(opts_in, permitted)
opts_out = {}
raise Redis::CommandError, 'ERR syntax error' unless (opts_in.length % 2).zero?
opts_in.each_slice(2).map { |pair| opts_out[pair[0].downcase] = pair[1] }
raise Redis::CommandError, 'ERR syntax error' unless (opts_out.keys - permitted).empty?
opts_out
end
end
end