diff --git a/lib/mock_redis/stream.rb b/lib/mock_redis/stream.rb index 4579ef0a..05b3f269 100644 --- a/lib/mock_redis/stream.rb +++ b/lib/mock_redis/stream.rb @@ -23,14 +23,25 @@ def last_id def add(id, values) @last_id = MockRedis::Stream::Id.new(id, min: @last_id) + if @last_id.to_s == '0-0' + raise Redis::CommandError, + 'ERR The ID specified in XADD is equal or smaller than ' \ + 'the target stream top item' + # TOOD: Redis version 6.0.4, w redis 4.2.1 generates the following error message: + # 'ERR The ID specified in XADD must be greater than 0-0' + end members.add [@last_id, Hash[values.map { |k, v| [k.to_s, v.to_s] }]] @last_id.to_s end def trim(count) deleted = @members.size - count - @members = @members.to_a[-count..-1].to_set - deleted + if deleted > 0 + @members = @members.to_a[-count..-1].to_set + deleted + else + 0 + end end def range(start, finish, reversed, *opts_in) @@ -45,6 +56,11 @@ def range(start, finish, reversed, *opts_in) items end + def read(id) + stream_id = MockRedis::Stream::Id.new(id) + members.select { |m| (stream_id < m[0]) }.map { |m| [m[0].to_s, m[1]] } + end + def each members.each { |m| yield m } end diff --git a/lib/mock_redis/stream/id.rb b/lib/mock_redis/stream/id.rb index 864717de..c70ea2a1 100644 --- a/lib/mock_redis/stream/id.rb +++ b/lib/mock_redis/stream/id.rb @@ -31,13 +31,6 @@ def initialize(id, min: nil, sequence: 0) @timestamp = id end @sequence = @sequence.nil? ? sequence : @sequence.to_i - if @timestamp == 0 && @sequence == 0 - raise Redis::CommandError, - 'ERR The ID specified in XADD is equal or smaller than ' \ - 'the target stream top item' - # TOOD: Redis version 6.0.4, w redis 4.2.1 generates the following error message: - # 'ERR The ID specified in XADD must be greater than 0-0' - end if self <= min raise Redis::CommandError, 'ERR The ID specified in XADD is equal or smaller than ' \ diff --git a/lib/mock_redis/stream_methods.rb b/lib/mock_redis/stream_methods.rb index 2f4d2498..bb346a74 100644 --- a/lib/mock_redis/stream_methods.rb +++ b/lib/mock_redis/stream_methods.rb @@ -4,7 +4,6 @@ # TODO: Implement the following commands # -# * xread # * xgroup # * xreadgroup # * xack @@ -67,6 +66,20 @@ def xrevrange(key, last = '+', first = '-', count: nil) end end + # TODO: Implement count and block parameters + def xread(keys, ids) + result = {} + keys = keys.is_a?(Array) ? keys : [keys] + ids = ids.is_a?(Array) ? ids : [ids] + keys.each_with_index do |key, index| + with_stream_at(key) do |stream| + data = stream.read(ids[index]) + result[key] = data unless data.empty? + end + end + result + end + private def with_stream_at(key, &blk) diff --git a/spec/commands/xadd_spec.rb b/spec/commands/xadd_spec.rb index 901d1a62..14f2336e 100644 --- a/spec/commands/xadd_spec.rb +++ b/spec/commands/xadd_spec.rb @@ -101,4 +101,15 @@ ] ) end + + it 'supports a maxlen greater than the current size' do + @redises.xadd(@key, { key1: 'value1' }, id: '1234567891234-0') + @redises.xadd(@key, { key2: 'value2' }, id: '1234567891245-0', maxlen: 1000) + expect(@redises.xrange(@key, '-', '+')).to eq( + [ + ['1234567891234-0', { 'key1' => 'value1' }], + ['1234567891245-0', { 'key2' => 'value2' }], + ] + ) + end end diff --git a/spec/commands/xrange_spec.rb b/spec/commands/xrange_spec.rb index 4659a9d7..a949294e 100644 --- a/spec/commands/xrange_spec.rb +++ b/spec/commands/xrange_spec.rb @@ -54,6 +54,19 @@ ) end + it 'returns all entries with a lower limit of 0-0' do + expect(@redises.xrange(@key, '0-0', '+')).to eq( + [ + ['1234567891234-0', { 'key1' => 'value1' }], + ['1234567891245-0', { 'key2' => 'value2' }], + ['1234567891245-1', { 'key3' => 'value3' }], + ['1234567891278-0', { 'key4' => 'value4' }], + ['1234567891278-1', { 'key5' => 'value5' }], + ['1234567891299-0', { 'key6' => 'value6' }] + ] + ) + end + it 'returns entries with an upper limit' do expect(@redises.xrange(@key, '-', '1234567891285-0')).to eq( [ diff --git a/spec/commands/xread_spec.rb b/spec/commands/xread_spec.rb new file mode 100644 index 00000000..77376b81 --- /dev/null +++ b/spec/commands/xread_spec.rb @@ -0,0 +1,50 @@ +require 'spec_helper' + +describe '#xread(keys, ids)' do + before :all do + sleep 1 - (Time.now.to_f % 1) + @key = 'mock-redis-test:xread' + @key1 = 'mock-redis-test:xread1' + end + + it 'reads a single entry' do + @redises.xadd(@key, { key: 'value' }, id: '1234567891234-0') + expect(@redises.xread(@key, '0-0')) + .to eq({ @key => [['1234567891234-0', { 'key' => 'value' }]] }) + end + + it 'reads multiple entries from the beginning of the stream' do + @redises.xadd(@key, { key0: 'value0' }, id: '1234567891234-0') + @redises.xadd(@key, { key1: 'value1' }, id: '1234567891234-1') + expect(@redises.xread(@key, '0-0')) + .to eq({ @key => [['1234567891234-0', { 'key0' => 'value0' }], + ['1234567891234-1', { 'key1' => 'value1' }]] }) + end + + it 'reads entries greater than the ID passed' do + @redises.xadd(@key, { key0: 'value0' }, id: '1234567891234-0') + @redises.xadd(@key, { key1: 'value1' }, id: '1234567891234-1') + expect(@redises.xread(@key, '1234567891234-0')) + .to eq({ @key => [['1234567891234-1', { 'key1' => 'value1' }]] }) + end + + it 'reads from multiple streams' do + @redises.xadd(@key, { key: 'value' }, id: '1234567891234-0') + @redises.xadd(@key1, { key1: 'value1' }, id: '1234567891234-0') + expect(@redises.xread([@key, @key1], %w[0-0 0-0])) + .to eq({ @key => [['1234567891234-0', { 'key' => 'value' }]], + @key1 => [['1234567891234-0', { 'key1' => 'value1' }]] }) + end + + it 'reads from multiple streams at the given IDs' do + @redises.xadd(@key, { key: 'value0' }, id: '1234567891234-0') + @redises.xadd(@key, { key: 'value1' }, id: '1234567891234-1') + @redises.xadd(@key, { key: 'value2' }, id: '1234567891234-2') + @redises.xadd(@key1, { key1: 'value0' }, id: '1234567891234-0') + @redises.xadd(@key1, { key1: 'value1' }, id: '1234567891234-1') + @redises.xadd(@key1, { key1: 'value2' }, id: '1234567891234-2') + # The first stream won't return anything since we specify the last ID + expect(@redises.xread([@key, @key1], %w[1234567891234-2 1234567891234-1])) + .to eq({ @key1 => [['1234567891234-2', { 'key1' => 'value2' }]] }) + end +end diff --git a/spec/commands/xtrim_spec.rb b/spec/commands/xtrim_spec.rb index 6d291493..b1a9a08b 100644 --- a/spec/commands/xtrim_spec.rb +++ b/spec/commands/xtrim_spec.rb @@ -16,6 +16,12 @@ expect(@redises.xtrim(@key, 4)).to eq 2 end + it 'returns 0 if count is greater than size' do + initial = @redises.xrange(@key, '-', '+') + expect(@redises.xtrim(@key, 1000)).to eq 0 + expect(@redises.xrange(@key, '-', '+')).to eql(initial) + end + it 'deletes the oldes elements' do @redises.xtrim(@key, 4) expect(@redises.xrange(@key, '-', '+')).to eq(