Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Redis 6.2] Add LMOVE/BLMOVE commands #1034

Merged
merged 1 commit into from Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 68 additions & 0 deletions lib/redis.rb
Expand Up @@ -1176,6 +1176,59 @@ def llen(key)
end
end

# Remove the first/last element in a list, append/prepend it to another list and return it.
#
# @param [String] source source key
# @param [String] destination destination key
# @param [String, Symbol] where_source from where to remove the element from the source list
# e.g. 'LEFT' - from head, 'RIGHT' - from tail
# @param [String, Symbol] where_destination where to push the element to the source list
# e.g. 'LEFT' - to head, 'RIGHT' - to tail
#
# @return [nil, String] the element, or nil when the source key does not exist
#
# @note This command comes in place of the now deprecated RPOPLPUSH.
# Doing LMOVE RIGHT LEFT is equivalent.
def lmove(source, destination, where_source, where_destination)
where_source, where_destination = _normalize_move_wheres(where_source, where_destination)

synchronize do |client|
client.call([:lmove, source, destination, where_source, where_destination])
end
end

# Remove the first/last element in a list and append/prepend it
# to another list and return it, or block until one is available.
#
# @example With timeout
# element = redis.blmove("foo", "bar", "LEFT", "RIGHT", timeout: 5)
# # => nil on timeout
# # => "element" on success
# @example Without timeout
# element = redis.blmove("foo", "bar", "LEFT", "RIGHT")
# # => "element"
#
# @param [String] source source key
# @param [String] destination destination key
# @param [String, Symbol] where_source from where to remove the element from the source list
# e.g. 'LEFT' - from head, 'RIGHT' - from tail
# @param [String, Symbol] where_destination where to push the element to the source list
# e.g. 'LEFT' - to head, 'RIGHT' - to tail
# @param [Hash] options
# - `:timeout => Numeric`: timeout in seconds, defaults to no timeout
#
# @return [nil, String] the element, or nil when the source key does not exist or the timeout expired
#
def blmove(source, destination, where_source, where_destination, timeout: 0)
where_source, where_destination = _normalize_move_wheres(where_source, where_destination)

synchronize do |client|
command = [:blmove, source, destination, where_source, where_destination, timeout]
timeout += client.timeout if timeout > 0
client.call_with_timeout(command, timeout)
end
end

# Prepend one or more values to a list, creating the list if it doesn't exist
#
# @param [String] key
Expand Down Expand Up @@ -3716,6 +3769,21 @@ def _xread(args, keys, ids, blocking_timeout_msec)
end
end
end

def _normalize_move_wheres(where_source, where_destination)
where_source = where_source.to_s.upcase
where_destination = where_destination.to_s.upcase

if where_source != "LEFT" && where_source != "RIGHT"
raise ArgumentError, "where_source must be 'LEFT' or 'RIGHT'"
end

if where_destination != "LEFT" && where_destination != "RIGHT"
raise ArgumentError, "where_destination must be 'LEFT' or 'RIGHT'"
end

[where_source, where_destination]
end
end

require_relative "redis/version"
Expand Down
15 changes: 15 additions & 0 deletions lib/redis/distributed.rb
Expand Up @@ -403,6 +403,21 @@ def llen(key)
node_for(key).llen(key)
end

# Remove the first/last element in a list, append/prepend it to another list and return it.
def lmove(source, destination, where_source, where_destination)
ensure_same_node(:lmove, [source, destination]) do |node|
node.lmove(source, destination, where_source, where_destination)
end
end

# Remove the first/last element in a list and append/prepend it
# to another list and return it, or block until one is available.
def blmove(source, destination, where_source, where_destination, timeout: 0)
ensure_same_node(:lmove, [source, destination]) do |node|
node.blmove(source, destination, where_source, where_destination, timeout: timeout)
end
end

# Prepend one or more values to a list.
def lpush(key, value)
node_for(key).lpush(key, value)
Expand Down
8 changes: 8 additions & 0 deletions test/blocking_commands_test.rb
Expand Up @@ -21,6 +21,14 @@ def assert_takes_longer_than_client_timeout
end
end

def test_blmove_disable_client_timeout
target_version "6.2" do
assert_takes_longer_than_client_timeout do |r|
assert_equal '0', r.blmove('foo', 'bar', 'LEFT', 'RIGHT')
end
end
end

def test_blpop_disable_client_timeout
assert_takes_longer_than_client_timeout do |r|
assert_equal %w[foo 0], r.blpop('foo')
Expand Down
6 changes: 6 additions & 0 deletions test/cluster_commands_on_lists_test.rb
Expand Up @@ -9,6 +9,12 @@ class TestClusterCommandsOnLists < Minitest::Test
include Helper::Cluster
include Lint::Lists

def test_lmove
target_version "6.2" do
assert_raises(Redis::CommandError) { super }
end
end

def test_rpoplpush
assert_raises(Redis::CommandError) { super }
end
Expand Down
8 changes: 8 additions & 0 deletions test/distributed_blocking_commands_test.rb
Expand Up @@ -7,6 +7,14 @@ class TestDistributedBlockingCommands < Minitest::Test
include Helper::Distributed
include Lint::BlockingCommands

def test_blmove_raises
target_version "6.2" do
assert_raises(Redis::Distributed::CannotDistribute) do
r.blmove('foo', 'bar', 'LEFT', 'RIGHT')
end
end
end

def test_blpop_raises
assert_raises(Redis::Distributed::CannotDistribute) do
r.blpop(%w[foo bar])
Expand Down
8 changes: 8 additions & 0 deletions test/distributed_commands_on_lists_test.rb
Expand Up @@ -7,6 +7,14 @@ class TestDistributedCommandsOnLists < Minitest::Test
include Helper::Distributed
include Lint::Lists

def test_lmove
target_version "6.2" do
assert_raises Redis::Distributed::CannotDistribute do
r.lmove('foo', 'bar', 'LEFT', 'RIGHT')
end
end
end

def test_rpoplpush
assert_raises Redis::Distributed::CannotDistribute do
r.rpoplpush('foo', 'bar')
Expand Down
13 changes: 13 additions & 0 deletions test/distributed_commands_requiring_clustering_test.rb
Expand Up @@ -23,6 +23,19 @@ def test_renamenx
assert_equal "s2", r.get("{qux}bar")
end

def test_lmove
target_version "6.2" do
r.rpush("{qux}foo", "s1")
r.rpush("{qux}foo", "s2")
r.rpush("{qux}bar", "s3")
r.rpush("{qux}bar", "s4")

assert_equal "s1", r.lmove("{qux}foo", "{qux}bar", "LEFT", "RIGHT")
assert_equal ["s2"], r.lrange("{qux}foo", 0, -1)
assert_equal ["s3", "s4", "s1"], r.lrange("{qux}bar", 0, -1)
end
end

def test_brpoplpush
r.rpush "{qux}foo", "s1"
r.rpush "{qux}foo", "s2"
Expand Down
31 changes: 31 additions & 0 deletions test/lint/blocking_commands.rb
Expand Up @@ -32,6 +32,10 @@ def mock(options = {}, &blk)

def build_mock_commands(options = {})
{
blmove: lambda do |*args|
sleep options[:delay] if options.key?(:delay)
to_protocol(args.last)
end,
blpop: lambda do |*args|
sleep options[:delay] if options.key?(:delay)
to_protocol([args.first, args.last])
Expand All @@ -55,6 +59,23 @@ def build_mock_commands(options = {})
}
end

def test_blmove
target_version "6.2" do
assert_equal 's1', r.blmove('{zap}foo', '{zap}bar', 'LEFT', 'RIGHT')
assert_equal ['s2'], r.lrange('{zap}foo', 0, -1)
assert_equal ['s1', 's2', 's1'], r.lrange('{zap}bar', 0, -1)
end
end

def test_blmove_timeout
target_version "6.2" do
mock do |r|
assert_equal '0', r.blmove('{zap}foo', '{zap}bar', 'LEFT', 'RIGHT')
assert_equal LOW_TIMEOUT.to_s, r.blmove('{zap}foo', '{zap}bar', 'LEFT', 'RIGHT', timeout: LOW_TIMEOUT)
end
end
end

def test_blpop
assert_equal ['{zap}foo', 's1'], r.blpop('{zap}foo')
assert_equal ['{zap}foo', 's2'], r.blpop(['{zap}foo'])
Expand Down Expand Up @@ -166,6 +187,16 @@ def test_bzpopmax
end

driver(:ruby, :hiredis) do
def test_blmove_socket_timeout
target_version "6.2" do
mock(delay: LOW_TIMEOUT * 5) do |r|
assert_raises(Redis::TimeoutError) do
r.blmove('{zap}foo', '{zap}bar', 'LEFT', 'RIGHT', timeout: LOW_TIMEOUT)
end
end
end
end

def test_blpop_socket_timeout
mock(delay: LOW_TIMEOUT * 5) do |r|
assert_raises(Redis::TimeoutError) do
Expand Down
27 changes: 27 additions & 0 deletions test/lint/lists.rb
Expand Up @@ -2,6 +2,33 @@

module Lint
module Lists
def test_lmove
target_version "6.2" do
r.lpush("foo", "s1")
r.lpush("foo", "s2") # foo = [s2, s1]
r.lpush("bar", "s3")
r.lpush("bar", "s4") # bar = [s4, s3]

assert_nil r.lmove("nonexistent", "foo", "LEFT", "LEFT")

assert_equal "s2", r.lmove("foo", "foo", "LEFT", "RIGHT") # foo = [s1, s2]
assert_equal "s1", r.lmove("foo", "foo", "LEFT", "LEFT") # foo = [s1, s2]

assert_equal "s1", r.lmove("foo", "bar", "LEFT", "RIGHT") # foo = [s2], bar = [s4, s3, s1]
assert_equal ["s2"], r.lrange("foo", 0, -1)
assert_equal ["s4", "s3", "s1"], r.lrange("bar", 0, -1)

assert_equal "s2", r.lmove("foo", "bar", "LEFT", "LEFT") # foo = [], bar = [s2, s4, s3, s1]
assert_nil r.lmove("foo", "bar", "LEFT", "LEFT") # foo = [], bar = [s2, s4, s3, s1]
assert_equal ["s2", "s4", "s3", "s1"], r.lrange("bar", 0, -1)

error = assert_raises(ArgumentError) do
r.lmove("foo", "bar", "LEFT", "MIDDLE")
end
assert_equal "where_destination must be 'LEFT' or 'RIGHT'", error.message
end
end

def test_lpush
r.lpush "foo", "s1"
r.lpush "foo", "s2"
Expand Down