Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement #watch and #multi specially for cluster-client #1256

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 56 additions & 0 deletions cluster/lib/redis/cluster.rb
Expand Up @@ -96,6 +96,62 @@ def cluster(subcommand, *args)
send_command([:cluster, subcommand] + args, &block)
end

# Transactions need different implementations in cluster mode, using purpose-built
# primitives available in redis-cluster-client. These methods (watch and multii
# implement the same interface as the methods in ::Redis::Commands::Transactions.

def watch(*keys)
synchronize do |client|
# client is a ::Redis::Cluster::Client instance, which is a subclass of
# ::RedisClient::Cluster

if @active_watcher
# We're already within a #watch block, just add keys to the existing watch
@active_watcher.watch(keys)
else
unless block_given?
raise ArgumentError, "#{self.class.name} requires that the initial #watch call of a transaction " \
"passes a block"
end

client.watch(keys) do |watcher|
@active_watcher = watcher
yield self
ensure
@active_watcher = nil
end

end
end
end

def multi
synchronize do |client|
if @active_watcher
# If we're inside a #watch block, use that to execute the transaction
@active_watcher.multi do |tx|
yield MultiConnection.new(tx)
end
else
# Make a new transaction from whole cloth.
client.multi do |tx|
yield MultiConnection.new(tx)
end
end
end
end

def unwatch
synchronize do
if @active_watcher
@active_watcher.unwatch
else
# This will raise an AmbiguiousNodeError
super
end
end
end

private

def initialize_client(options)
Expand Down
4 changes: 4 additions & 0 deletions cluster/lib/redis/cluster/client.rb
Expand Up @@ -98,6 +98,10 @@ def multi(watch: nil, &block)
handle_errors { super(watch: watch, &block) }
end

def watch(keys, &block)
handle_errors { super(keys, &block) }
end

private

def handle_errors
Expand Down
69 changes: 69 additions & 0 deletions cluster/test/client_transactions_test.rb
Expand Up @@ -48,4 +48,73 @@ def test_cluster_client_does_not_support_transaction_by_multiple_keys
assert_nil(redis.get("key#{i}"))
end
end

def test_cluster_client_does_support_transaction_with_optimistic_locking
redis.mset('{key}1', '1', '{key}2', '2')

another = Fiber.new do
cli = build_another_client
cli.mset('{key}1', '3', '{key}2', '4')
cli.close
end

redis.watch('{key}1', '{key}2') do
another.resume
v1 = redis.get('{key}1')
v2 = redis.get('{key}2')
redis.multi do |tx|
tx.set('{key}1', v2)
tx.set('{key}2', v1)
end
end

assert_equal %w[3 4], redis.mget('{key}1', '{key}2')
end

def test_cluster_client_can_unwatch_transaction
redis.set('key1', 'initial_value')

another = Fiber.new do
cli = build_another_client
cli.set('key1', 'another_value')
end

redis.watch('key1') do
another.resume
redis.unwatch
end
# After calling unwatch, the same connection can be used to open a transaction which
# isn't conditional and so will commit
got = redis.multi do |tx|
tx.set('key1', 'final_value')
end

assert_equal ['OK'], got
assert_equal 'final_value', redis.get('key1')
end

def test_cluster_client_unwatches_on_exception
redis.set('key1', 'initial_value')

another = Fiber.new do
cli = build_another_client
cli.set('key1', 'another_value')
end

assert_raises(RuntimeError) do
redis.watch('key1') do
another.resume
raise 'bang'
end
end
# After catching the exception, the same connection can be used to open a transaction which
# isn't conditional and so will commit
# n.b. the actual behaviour which ensures this is actually in redis-cluster-client
got = redis.multi do |tx|
tx.set('key1', 'final_value')
end

assert_equal ['OK'], got
assert_equal 'final_value', redis.get('key1')
end
end
6 changes: 2 additions & 4 deletions cluster/test/commands_on_transactions_test.rb
Expand Up @@ -38,10 +38,8 @@ def test_unwatch
end

def test_watch
assert_raises(Redis::CommandError, "CROSSSLOT Keys in request don't hash to the same slot") do
redis.watch('key1', 'key2')
assert_raises(Redis::Cluster::TransactionConsistencyError) do
redis.watch('key1', 'key2') {}
end

assert_equal 'OK', redis.watch('{key}1', '{key}2')
end
end