Skip to content

Commit

Permalink
Merge pull request #1020 from supercaracal/fix-a-validation-bug-for-p…
Browse files Browse the repository at this point in the history
…ipelining-with-cluster-multiple-replicas

Fix a bug for cross-slot validation in pipelining
  • Loading branch information
byroot committed Jul 26, 2021
2 parents baf5706 + 691aef7 commit a200ee3
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
20 changes: 8 additions & 12 deletions lib/redis/cluster.rb
Expand Up @@ -78,11 +78,13 @@ def call_loop(command, timeout = 0, &block)
end

def call_pipeline(pipeline)
node_keys, command_keys = extract_keys_in_pipeline(pipeline)
raise CrossSlotPipeliningError, command_keys if node_keys.size > 1
node_keys = pipeline.commands.map { |cmd| find_node_key(cmd, primary_only: true) }.compact.uniq
if node_keys.size > 1
raise(CrossSlotPipeliningError,
pipeline.commands.map { |cmd| @command.extract_first_key(cmd) }.reject(&:empty?).uniq)
end

node = find_node(node_keys.first)
try_send(node, :call_pipeline, pipeline)
try_send(find_node(node_keys.first), :call_pipeline, pipeline)
end

def call_with_timeout(command, timeout, &block)
Expand Down Expand Up @@ -253,14 +255,14 @@ def assign_node(command)
find_node(node_key)
end

def find_node_key(command)
def find_node_key(command, primary_only: false)
key = @command.extract_first_key(command)
return if key.empty?

slot = KeySlotConverter.convert(key)
return unless @slot.exists?(slot)

if @command.should_send_to_master?(command)
if @command.should_send_to_master?(command) || primary_only
@slot.find_node_key_of_master(slot)
else
@slot.find_node_key_of_slave(slot)
Expand All @@ -285,11 +287,5 @@ def update_cluster_info!(node_key = nil)
@node.map(&:disconnect)
@node, @slot = fetch_cluster_info!(@option)
end

def extract_keys_in_pipeline(pipeline)
node_keys = pipeline.commands.map { |cmd| find_node_key(cmd) }.compact.uniq
command_keys = pipeline.commands.map { |cmd| @command.extract_first_key(cmd) }.reject(&:empty?)
[node_keys, command_keys]
end
end
end
13 changes: 13 additions & 0 deletions test/cluster_client_pipelining_test.rb
Expand Up @@ -56,4 +56,17 @@ def test_pipelining_without_hash_tags
end
end
end

def test_pipelining_with_multiple_replicas
rc = build_another_client(replica: true)
rc.instance_variable_get(:@client).instance_variable_get(:@slot).instance_variable_get(:@map).each do |_, v|
v[:slaves] << v[:master] if v[:slaves].size < 2 # reproducing multiple replicas
end

rc.pipelined do |r|
10.times { r.get('key1') }
end

rc.close
end
end

0 comments on commit a200ee3

Please sign in to comment.