Skip to content

Commit

Permalink
Fix trim add xread (#190)
Browse files Browse the repository at this point in the history
* Fix trim and add xread

* Initial xread implementation

* Update specs for trim bug
  • Loading branch information
jmthomas committed Jun 19, 2020
1 parent c4651c7 commit 57345bd
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 10 deletions.
20 changes: 18 additions & 2 deletions lib/mock_redis/stream.rb
Expand Up @@ -23,14 +23,25 @@ def last_id

def add(id, values)
@last_id = MockRedis::Stream::Id.new(id, min: @last_id)
if @last_id.to_s == '0-0'
raise Redis::CommandError,
'ERR The ID specified in XADD is equal or smaller than ' \
'the target stream top item'
# TOOD: Redis version 6.0.4, w redis 4.2.1 generates the following error message:
# 'ERR The ID specified in XADD must be greater than 0-0'
end
members.add [@last_id, Hash[values.map { |k, v| [k.to_s, v.to_s] }]]
@last_id.to_s
end

def trim(count)
deleted = @members.size - count
@members = @members.to_a[-count..-1].to_set
deleted
if deleted > 0
@members = @members.to_a[-count..-1].to_set
deleted
else
0
end
end

def range(start, finish, reversed, *opts_in)
Expand All @@ -45,6 +56,11 @@ def range(start, finish, reversed, *opts_in)
items
end

def read(id)
stream_id = MockRedis::Stream::Id.new(id)
members.select { |m| (stream_id < m[0]) }.map { |m| [m[0].to_s, m[1]] }
end

def each
members.each { |m| yield m }
end
Expand Down
7 changes: 0 additions & 7 deletions lib/mock_redis/stream/id.rb
Expand Up @@ -31,13 +31,6 @@ def initialize(id, min: nil, sequence: 0)
@timestamp = id
end
@sequence = @sequence.nil? ? sequence : @sequence.to_i
if @timestamp == 0 && @sequence == 0
raise Redis::CommandError,
'ERR The ID specified in XADD is equal or smaller than ' \
'the target stream top item'
# TOOD: Redis version 6.0.4, w redis 4.2.1 generates the following error message:
# 'ERR The ID specified in XADD must be greater than 0-0'
end
if self <= min
raise Redis::CommandError,
'ERR The ID specified in XADD is equal or smaller than ' \
Expand Down
15 changes: 14 additions & 1 deletion lib/mock_redis/stream_methods.rb
Expand Up @@ -4,7 +4,6 @@

# TODO: Implement the following commands
#
# * xread
# * xgroup
# * xreadgroup
# * xack
Expand Down Expand Up @@ -67,6 +66,20 @@ def xrevrange(key, last = '+', first = '-', count: nil)
end
end

# TODO: Implement count and block parameters
def xread(keys, ids)
result = {}
keys = keys.is_a?(Array) ? keys : [keys]
ids = ids.is_a?(Array) ? ids : [ids]
keys.each_with_index do |key, index|
with_stream_at(key) do |stream|
data = stream.read(ids[index])
result[key] = data unless data.empty?
end
end
result
end

private

def with_stream_at(key, &blk)
Expand Down
11 changes: 11 additions & 0 deletions spec/commands/xadd_spec.rb
Expand Up @@ -101,4 +101,15 @@
]
)
end

it 'supports a maxlen greater than the current size' do
@redises.xadd(@key, { key1: 'value1' }, id: '1234567891234-0')
@redises.xadd(@key, { key2: 'value2' }, id: '1234567891245-0', maxlen: 1000)
expect(@redises.xrange(@key, '-', '+')).to eq(
[
['1234567891234-0', { 'key1' => 'value1' }],
['1234567891245-0', { 'key2' => 'value2' }],
]
)
end
end
13 changes: 13 additions & 0 deletions spec/commands/xrange_spec.rb
Expand Up @@ -54,6 +54,19 @@
)
end

it 'returns all entries with a lower limit of 0-0' do
expect(@redises.xrange(@key, '0-0', '+')).to eq(
[
['1234567891234-0', { 'key1' => 'value1' }],
['1234567891245-0', { 'key2' => 'value2' }],
['1234567891245-1', { 'key3' => 'value3' }],
['1234567891278-0', { 'key4' => 'value4' }],
['1234567891278-1', { 'key5' => 'value5' }],
['1234567891299-0', { 'key6' => 'value6' }]
]
)
end

it 'returns entries with an upper limit' do
expect(@redises.xrange(@key, '-', '1234567891285-0')).to eq(
[
Expand Down
50 changes: 50 additions & 0 deletions spec/commands/xread_spec.rb
@@ -0,0 +1,50 @@
require 'spec_helper'

describe '#xread(keys, ids)' do
before :all do
sleep 1 - (Time.now.to_f % 1)
@key = 'mock-redis-test:xread'
@key1 = 'mock-redis-test:xread1'
end

it 'reads a single entry' do
@redises.xadd(@key, { key: 'value' }, id: '1234567891234-0')
expect(@redises.xread(@key, '0-0'))
.to eq({ @key => [['1234567891234-0', { 'key' => 'value' }]] })
end

it 'reads multiple entries from the beginning of the stream' do
@redises.xadd(@key, { key0: 'value0' }, id: '1234567891234-0')
@redises.xadd(@key, { key1: 'value1' }, id: '1234567891234-1')
expect(@redises.xread(@key, '0-0'))
.to eq({ @key => [['1234567891234-0', { 'key0' => 'value0' }],
['1234567891234-1', { 'key1' => 'value1' }]] })
end

it 'reads entries greater than the ID passed' do
@redises.xadd(@key, { key0: 'value0' }, id: '1234567891234-0')
@redises.xadd(@key, { key1: 'value1' }, id: '1234567891234-1')
expect(@redises.xread(@key, '1234567891234-0'))
.to eq({ @key => [['1234567891234-1', { 'key1' => 'value1' }]] })
end

it 'reads from multiple streams' do
@redises.xadd(@key, { key: 'value' }, id: '1234567891234-0')
@redises.xadd(@key1, { key1: 'value1' }, id: '1234567891234-0')
expect(@redises.xread([@key, @key1], %w[0-0 0-0]))
.to eq({ @key => [['1234567891234-0', { 'key' => 'value' }]],
@key1 => [['1234567891234-0', { 'key1' => 'value1' }]] })
end

it 'reads from multiple streams at the given IDs' do
@redises.xadd(@key, { key: 'value0' }, id: '1234567891234-0')
@redises.xadd(@key, { key: 'value1' }, id: '1234567891234-1')
@redises.xadd(@key, { key: 'value2' }, id: '1234567891234-2')
@redises.xadd(@key1, { key1: 'value0' }, id: '1234567891234-0')
@redises.xadd(@key1, { key1: 'value1' }, id: '1234567891234-1')
@redises.xadd(@key1, { key1: 'value2' }, id: '1234567891234-2')
# The first stream won't return anything since we specify the last ID
expect(@redises.xread([@key, @key1], %w[1234567891234-2 1234567891234-1]))
.to eq({ @key1 => [['1234567891234-2', { 'key1' => 'value2' }]] })
end
end
6 changes: 6 additions & 0 deletions spec/commands/xtrim_spec.rb
Expand Up @@ -16,6 +16,12 @@
expect(@redises.xtrim(@key, 4)).to eq 2
end

it 'returns 0 if count is greater than size' do
initial = @redises.xrange(@key, '-', '+')
expect(@redises.xtrim(@key, 1000)).to eq 0
expect(@redises.xrange(@key, '-', '+')).to eql(initial)
end

it 'deletes the oldes elements' do
@redises.xtrim(@key, 4)
expect(@redises.xrange(@key, '-', '+')).to eq(
Expand Down

0 comments on commit 57345bd

Please sign in to comment.