diff --git a/lib/redis/cluster.rb b/lib/redis/cluster.rb index 9e2e2ca07..a3ffdc7ae 100644 --- a/lib/redis/cluster.rb +++ b/lib/redis/cluster.rb @@ -78,11 +78,13 @@ def call_loop(command, timeout = 0, &block) end def call_pipeline(pipeline) - node_keys, command_keys = extract_keys_in_pipeline(pipeline) - raise CrossSlotPipeliningError, command_keys if node_keys.size > 1 + node_keys = pipeline.commands.map { |cmd| find_node_key(cmd, primary_only: true) }.compact.uniq + if node_keys.size > 1 + raise(CrossSlotPipeliningError, + pipeline.commands.map { |cmd| @command.extract_first_key(cmd) }.reject(&:empty?).uniq) + end - node = find_node(node_keys.first) - try_send(node, :call_pipeline, pipeline) + try_send(find_node(node_keys.first), :call_pipeline, pipeline) end def call_with_timeout(command, timeout, &block) @@ -253,14 +255,14 @@ def assign_node(command) find_node(node_key) end - def find_node_key(command) + def find_node_key(command, primary_only: false) key = @command.extract_first_key(command) return if key.empty? slot = KeySlotConverter.convert(key) return unless @slot.exists?(slot) - if @command.should_send_to_master?(command) + if @command.should_send_to_master?(command) || primary_only @slot.find_node_key_of_master(slot) else @slot.find_node_key_of_slave(slot) @@ -285,11 +287,5 @@ def update_cluster_info!(node_key = nil) @node.map(&:disconnect) @node, @slot = fetch_cluster_info!(@option) end - - def extract_keys_in_pipeline(pipeline) - node_keys = pipeline.commands.map { |cmd| find_node_key(cmd) }.compact.uniq - command_keys = pipeline.commands.map { |cmd| @command.extract_first_key(cmd) }.reject(&:empty?) - [node_keys, command_keys] - end end end diff --git a/test/cluster_client_pipelining_test.rb b/test/cluster_client_pipelining_test.rb index 56f2c693c..85a54576f 100644 --- a/test/cluster_client_pipelining_test.rb +++ b/test/cluster_client_pipelining_test.rb @@ -56,4 +56,17 @@ def test_pipelining_without_hash_tags end end end + + def test_pipelining_with_multiple_replicas + rc = build_another_client(replica: true) + rc.instance_variable_get(:@client).instance_variable_get(:@slot).instance_variable_get(:@map).each do |_, v| + v[:slaves] << v[:master] if v[:slaves].size < 2 # reproducing multiple replicas + end + + rc.pipelined do |r| + 10.times { r.get('key1') } + end + + rc.close + end end