diff --git a/CHANGELOG.md b/CHANGELOG.md index 014d3a4f..87af5c79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ # Unreleased +- Add `exception` flag in `pipelined` allowing failed commands to be returned in the result array when set to `false`. + # 5.1.0 - `multi` now accept a `watch` keyword argument like `redis-client`. See #1236. diff --git a/README.md b/README.md index 99f14110..d4ee0ab9 100644 --- a/README.md +++ b/README.md @@ -191,6 +191,28 @@ end # => ["OK"] ``` +### Exception management + +The `exception` flag in the `#pipelined` is a feature that modifies the pipeline execution behavior. When set +to `false`, it doesn't raise an exception when a command error occurs. Instead, it allows the pipeline to execute all +commands, and any failed command will be available in the returned array. (Defaults to `true`) + +```ruby +results = redis.pipelined(exception: false) do |pipeline| + pipeline.set('key1', 'value1') + pipeline.lpush('key1', 'something') # This will fail + pipeline.set('key2', 'value2') +end +# results => ["OK", #, "OK"] + +results.each do |result| + if result.is_a?(Redis::CommandError) + # Do something with the failed result + end +end +``` + + ### Executing commands atomically You can use `MULTI/EXEC` to run a number of commands in an atomic diff --git a/cluster/lib/redis/cluster/client.rb b/cluster/lib/redis/cluster/client.rb index a14845c2..a0e88992 100644 --- a/cluster/lib/redis/cluster/client.rb +++ b/cluster/lib/redis/cluster/client.rb @@ -90,8 +90,8 @@ def blocking_call_v(timeout, command, &block) handle_errors { super(timeout, command, &block) } end - def pipelined(&block) - handle_errors { super(&block) } + def pipelined(exception: true, &block) + handle_errors { super(exception: exception, &block) } end def multi(watch: nil, &block) diff --git a/lib/redis.rb b/lib/redis.rb index 286585ee..09d8fd95 100644 --- a/lib/redis.rb +++ b/lib/redis.rb @@ -99,10 +99,10 @@ def _client @client end - def pipelined + def pipelined(exception: true) synchronize do |client| - client.pipelined do |raw_pipeline| - yield PipelinedConnection.new(raw_pipeline) + client.pipelined(exception: exception) do |raw_pipeline| + yield PipelinedConnection.new(raw_pipeline, exception: exception) end end end diff --git a/lib/redis/client.rb b/lib/redis/client.rb index 38f3273a..d23ed912 100644 --- a/lib/redis/client.rb +++ b/lib/redis/client.rb @@ -105,7 +105,7 @@ def blocking_call_v(timeout, command, &block) Client.translate_error!(error) end - def pipelined + def pipelined(exception: true) super rescue ::RedisClient::Error => error Client.translate_error!(error) diff --git a/lib/redis/pipeline.rb b/lib/redis/pipeline.rb index 65e2cda2..0e415aa9 100644 --- a/lib/redis/pipeline.rb +++ b/lib/redis/pipeline.rb @@ -6,9 +6,10 @@ class Redis class PipelinedConnection attr_accessor :db - def initialize(pipeline, futures = []) + def initialize(pipeline, futures = [], exception: true) @pipeline = pipeline @futures = futures + @exception = exception end include Commands @@ -37,7 +38,7 @@ def synchronize end def send_command(command, &block) - future = Future.new(command, block) + future = Future.new(command, block, @exception) @pipeline.call_v(command) do |result| future._set(result) end @@ -46,7 +47,7 @@ def send_command(command, &block) end def send_blocking_command(command, timeout, &block) - future = Future.new(command, block) + future = Future.new(command, block, @exception) @pipeline.blocking_call_v(timeout, command) do |result| future._set(result) end @@ -79,10 +80,11 @@ def initialize class Future < BasicObject FutureNotReady = ::Redis::FutureNotReady.new - def initialize(command, coerce) + def initialize(command, coerce, exception) @command = command @object = FutureNotReady @coerce = coerce + @exception = exception end def inspect @@ -95,7 +97,7 @@ def _set(object) end def value - ::Kernel.raise(@object) if @object.is_a?(::StandardError) + ::Kernel.raise(@object) if @exception && @object.is_a?(::StandardError) @object end diff --git a/redis.gemspec b/redis.gemspec index e3b05873..90711d76 100644 --- a/redis.gemspec +++ b/redis.gemspec @@ -45,5 +45,5 @@ Gem::Specification.new do |s| s.required_ruby_version = '>= 2.6.0' - s.add_runtime_dependency('redis-client', '>= 0.17.0') + s.add_runtime_dependency('redis-client', '>= 0.22.0') end diff --git a/test/redis/pipelining_commands_test.rb b/test/redis/pipelining_commands_test.rb index 45663974..6265921a 100644 --- a/test/redis/pipelining_commands_test.rb +++ b/test/redis/pipelining_commands_test.rb @@ -98,6 +98,18 @@ def test_assignment_of_results_inside_the_block_with_errors assert_raises(Redis::FutureNotReady) { @second.value } end + def test_assignment_of_results_inside_the_block_without_raising_exception + r.pipelined(exception: false) do |p| + @first = p.doesnt_exist + @second = p.sadd?("foo", 1) + @third = p.sadd?("foo", 1) + end + + assert_equal RedisClient::CommandError, @first.value.class + assert_equal true, @second.value + assert_equal false, @third.value + end + def test_assignment_of_results_inside_a_nested_block r.pipelined do |p| @first = p.sadd?("foo", 1) @@ -111,6 +123,30 @@ def test_assignment_of_results_inside_a_nested_block assert_equal false, @second.value end + def test_nested_pipelining_returns_without_raising_exception + result = r.pipelined(exception: false) do |p1| + p1.doesnt_exist + p1.set("foo", "42") + p1.pipelined do |p2| + p2.doesnt_exist_again + p2.set("bar", "99") + end + end + + assert result[0].is_a?(RedisClient::CommandError) + assert_equal ["doesnt_exist"], result[0].command + + assert_equal "OK", result[1] + + assert result[2].is_a?(RedisClient::CommandError) + assert_equal ["doesnt_exist_again"], result[2].command + + assert_equal "OK", result[3] + + assert_equal "42", r.get("foo") + assert_equal "99", r.get("bar") + end + def test_futures_raise_when_confused_with_something_else r.pipelined do |p| @result = p.sadd?("foo", 1)