Skip to content

Commit

Permalink
Merge pull request #1270 from supercaracal/fix-cluster-cas-pattern
Browse files Browse the repository at this point in the history
Fix a cluster client interface for CAS operations to be more compatible with standalone client
  • Loading branch information
byroot committed Apr 29, 2024
2 parents a06456c + 30da9c4 commit c4e6dcc
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 43 deletions.
13 changes: 6 additions & 7 deletions cluster/README.md
Expand Up @@ -82,9 +82,11 @@ Client libraries can make them compatible up to a point, but a part of features
Especially, some cautions are needed to use the transaction feature with an optimistic locking.

```ruby
redis.watch("{my}key") do |client| # The client is an instance of the internal adapter
if redis.get("{my}key") == "some value" # We can't use the client passed by the block argument
client.multi do |tx| # The tx is the same instance of the internal adapter
# The client is an instance of the internal adapter for the optimistic locking
redis.watch("{my}key") do |client|
if client.get("{my}key") == "some value"
# The tx is an instance of the internal adapter for the transaction
client.multi do |tx|
tx.set("{my}key", "other value")
tx.incr("{my}counter")
end
Expand All @@ -95,8 +97,5 @@ end
```

In a cluster mode client, you need to pass a block if you call the watch method and you need to specify an argument to the block.
Also, you should use the block argument as a receiver to call the transaction feature methods in the block.
The commands called by methods of the receiver are added to the internal pipeline for the transaction and they are sent to the server lastly.
On the other hand, if you want to call other methods for commands, you can use the global instance of the client instead of the block argument.
It affects out of the transaction pipeline and the replies are returned soon.
Also, you should use the block argument as a receiver to call commands in the block.
Although the above restrictions are needed, this implementations is compatible with a standalone client.
13 changes: 7 additions & 6 deletions cluster/lib/redis/cluster.rb
Expand Up @@ -99,19 +99,20 @@ def cluster(subcommand, *args)
# Watch the given keys to determine execution of the MULTI/EXEC block.
#
# Using a block is required for a cluster client. It's different from a standalone client.
# And you should use the block argument as a receiver if you call transaction feature methods.
# On the other hand, you can use the global instance of the client if you call methods of other commands.
# And you should use the block argument as a receiver if you call commands.
#
# An `#unwatch` is automatically issued if an exception is raised within the
# block that is a subclass of StandardError and is not a ConnectionError.
#
# @param keys [String, Array<String>] one or more keys to watch
# @return [Array<Object>] replies of the transaction or an empty array
# @return [Object] returns the return value of the block
#
# @example A typical use case.
# redis.watch("{my}key") do |client| # The client is an instance of the internal adapter
# if redis.get("{my}key") == "some value" # We can't use the client passed by the block argument
# client.multi do |tx| # The tx is the same instance of the internal adapter
# # The client is an instance of the internal adapter for the optimistic locking
# redis.watch("{my}key") do |client|
# if client.get("{my}key") == "some value"
# # The tx is an instance of the internal adapter for the transaction
# client.multi do |tx|
# tx.set("{my}key", "other value")
# tx.incr("{my}counter")
# end
Expand Down
6 changes: 4 additions & 2 deletions cluster/lib/redis/cluster/client.rb
Expand Up @@ -119,8 +119,10 @@ def watch(*keys, &block)
transaction = Redis::Cluster::TransactionAdapter.new(
self, @router, @command_builder, node: c, slot: slot, asking: asking
)
yield transaction
transaction.execute

result = yield transaction
c.call('UNWATCH') unless transaction.lock_released?
result
end
end
end
Expand Down
71 changes: 62 additions & 9 deletions cluster/lib/redis/cluster/transaction_adapter.rb
Expand Up @@ -4,14 +4,69 @@

class Redis
class Cluster
class TransactionAdapter < RedisClient::Cluster::Transaction
class TransactionAdapter
class Internal < RedisClient::Cluster::Transaction
def initialize(client, router, command_builder, node: nil, slot: nil, asking: false)
@client = client
super(router, command_builder, node: node, slot: slot, asking: asking)
end

def multi
raise(Redis::Cluster::TransactionConsistencyError, "Can't nest multi transaction")
end

def exec
# no need to do anything
end

def discard
# no need to do anything
end

def watch(*_)
raise(Redis::Cluster::TransactionConsistencyError, "Can't use watch in a transaction")
end

def unwatch
# no need to do anything
end

private

def method_missing(name, *args, **kwargs, &block)
return call(name, *args, **kwargs, &block) if @client.respond_to?(name)

super
end

def respond_to_missing?(name, include_private = false)
return true if @client.respond_to?(name)

super
end
end

def initialize(client, router, command_builder, node: nil, slot: nil, asking: false)
@client = client
super(router, command_builder, node: node, slot: slot, asking: asking)
@router = router
@command_builder = command_builder
@node = node
@slot = slot
@asking = asking
@lock_released = false
end

def lock_released?
@lock_released
end

def multi
yield self
@lock_released = true
transaction = Redis::Cluster::TransactionAdapter::Internal.new(
@client, @router, @command_builder, node: @node, slot: @slot, asking: @asking
)
yield transaction
transaction.execute
end

def exec
Expand All @@ -23,20 +78,18 @@ def discard
end

def watch(*_)
raise(
Redis::Cluster::TransactionConsistencyError,
'You should pass all the keys to a watch method if you use the cluster client.'
)
raise(Redis::Cluster::TransactionConsistencyError, "Can't nest watch command if you use the cluster client")
end

def unwatch
# no need to do anything
@lock_released = true
@node.call('UNWATCH')
end

private

def method_missing(name, *args, **kwargs, &block)
return call(name, *args, **kwargs, &block) if @client.respond_to?(name)
return @client.public_send(name, *args, **kwargs, &block) if @client.respond_to?(name)

super
end
Expand Down
17 changes: 10 additions & 7 deletions cluster/test/client_transactions_test.rb
Expand Up @@ -59,12 +59,15 @@ def test_cluster_client_does_support_transaction_with_optimistic_locking
Fiber.yield
end

redis.watch('{key}1', '{key}2') do |tx|
redis.watch('{key}1', '{key}2') do |client|
another.resume
v1 = redis.get('{key}1')
v2 = redis.get('{key}2')
tx.set('{key}1', v2)
tx.set('{key}2', v1)
v1 = client.get('{key}1')
v2 = client.get('{key}2')

client.multi do |tx|
tx.set('{key}1', v2)
tx.set('{key}2', v1)
end
end

assert_equal %w[3 4], redis.mget('{key}1', '{key}2')
Expand All @@ -74,7 +77,7 @@ def test_cluster_client_can_be_used_compatible_with_standalone_client
redis.set('{my}key', 'value')
redis.set('{my}counter', '0')
redis.watch('{my}key', '{my}counter') do |client|
if redis.get('{my}key') == 'value'
if client.get('{my}key') == 'value'
client.multi do |tx|
tx.set('{my}key', 'updated value')
tx.incr('{my}counter')
Expand All @@ -96,7 +99,7 @@ def test_cluster_client_can_be_used_compatible_with_standalone_client

redis.watch('{my}key', '{my}counter') do |client|
another.resume
if redis.get('{my}key') == 'value'
if client.get('{my}key') == 'value'
client.multi do |tx|
tx.set('{my}key', 'latest value')
tx.incr('{my}counter')
Expand Down
30 changes: 18 additions & 12 deletions cluster/test/commands_on_transactions_test.rb
Expand Up @@ -45,30 +45,36 @@ def test_watch
end

assert_raises(Redis::Cluster::TransactionConsistencyError) do
redis.watch('{key}1', '{key}2') do |tx|
tx.watch('{key}3')
redis.watch('{key}1', '{key}2') do |cli|
cli.watch('{key}3')
end
end

assert_raises(Redis::Cluster::TransactionConsistencyError) do
redis.watch('key1', 'key2') do |tx|
tx.set('key1', '1')
tx.set('key2', '2')
redis.watch('key1', 'key2') do |cli|
cli.multi do |tx|
tx.set('key1', '1')
tx.set('key2', '2')
end
end
end

assert_raises(Redis::Cluster::TransactionConsistencyError) do
redis.watch('{hey}1', '{hey}2') do |tx|
tx.set('{key}1', '1')
tx.set('{key}2', '2')
redis.watch('{hey}1', '{hey}2') do |cli|
cli.multi do |tx|
tx.set('{key}1', '1')
tx.set('{key}2', '2')
end
end
end

assert_empty(redis.watch('{key}1', '{key}2') { |_| })
assert_equal('hello', redis.watch('{key}1', '{key}2') { |_| 'hello' })

redis.watch('{key}1', '{key}2') do |tx|
tx.set('{key}1', '1')
tx.set('{key}2', '2')
redis.watch('{key}1', '{key}2') do |cli|
cli.multi do |tx|
tx.set('{key}1', '1')
tx.set('{key}2', '2')
end
end

assert_equal %w[1 2], redis.mget('{key}1', '{key}2')
Expand Down

0 comments on commit c4e6dcc

Please sign in to comment.