Skip to content

Commit

Permalink
Merge pull request #1263 from philippeboyd/feature/add-pipeline-raise…
Browse files Browse the repository at this point in the history
…_exception-flag

add flag to disable raising exceptions with pipelined
  • Loading branch information
byroot committed Apr 15, 2024
2 parents bce3f41 + 609b6bc commit b2f9c28
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,7 @@
# Unreleased

- Add `exception` flag in `pipelined` allowing failed commands to be returned in the result array when set to `false`.

# 5.1.0

- `multi` now accept a `watch` keyword argument like `redis-client`. See #1236.
Expand Down
22 changes: 22 additions & 0 deletions README.md
Expand Up @@ -191,6 +191,28 @@ end
# => ["OK"]
```

### Exception management

The `exception` flag in the `#pipelined` is a feature that modifies the pipeline execution behavior. When set
to `false`, it doesn't raise an exception when a command error occurs. Instead, it allows the pipeline to execute all
commands, and any failed command will be available in the returned array. (Defaults to `true`)

```ruby
results = redis.pipelined(exception: false) do |pipeline|
pipeline.set('key1', 'value1')
pipeline.lpush('key1', 'something') # This will fail
pipeline.set('key2', 'value2')
end
# results => ["OK", #<RedisClient::WrongTypeError: WRONGTYPE Operation against a key holding the wrong kind of value>, "OK"]

results.each do |result|
if result.is_a?(Redis::CommandError)
# Do something with the failed result
end
end
```


### Executing commands atomically

You can use `MULTI/EXEC` to run a number of commands in an atomic
Expand Down
4 changes: 2 additions & 2 deletions cluster/lib/redis/cluster/client.rb
Expand Up @@ -90,8 +90,8 @@ def blocking_call_v(timeout, command, &block)
handle_errors { super(timeout, command, &block) }
end

def pipelined(&block)
handle_errors { super(&block) }
def pipelined(exception: true, &block)
handle_errors { super(exception: exception, &block) }
end

def multi(watch: nil, &block)
Expand Down
6 changes: 3 additions & 3 deletions lib/redis.rb
Expand Up @@ -99,10 +99,10 @@ def _client
@client
end

def pipelined
def pipelined(exception: true)
synchronize do |client|
client.pipelined do |raw_pipeline|
yield PipelinedConnection.new(raw_pipeline)
client.pipelined(exception: exception) do |raw_pipeline|
yield PipelinedConnection.new(raw_pipeline, exception: exception)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/redis/client.rb
Expand Up @@ -105,7 +105,7 @@ def blocking_call_v(timeout, command, &block)
Client.translate_error!(error)
end

def pipelined
def pipelined(exception: true)
super
rescue ::RedisClient::Error => error
Client.translate_error!(error)
Expand Down
12 changes: 7 additions & 5 deletions lib/redis/pipeline.rb
Expand Up @@ -6,9 +6,10 @@ class Redis
class PipelinedConnection
attr_accessor :db

def initialize(pipeline, futures = [])
def initialize(pipeline, futures = [], exception: true)
@pipeline = pipeline
@futures = futures
@exception = exception
end

include Commands
Expand Down Expand Up @@ -37,7 +38,7 @@ def synchronize
end

def send_command(command, &block)
future = Future.new(command, block)
future = Future.new(command, block, @exception)
@pipeline.call_v(command) do |result|
future._set(result)
end
Expand All @@ -46,7 +47,7 @@ def send_command(command, &block)
end

def send_blocking_command(command, timeout, &block)
future = Future.new(command, block)
future = Future.new(command, block, @exception)
@pipeline.blocking_call_v(timeout, command) do |result|
future._set(result)
end
Expand Down Expand Up @@ -79,10 +80,11 @@ def initialize
class Future < BasicObject
FutureNotReady = ::Redis::FutureNotReady.new

def initialize(command, coerce)
def initialize(command, coerce, exception)
@command = command
@object = FutureNotReady
@coerce = coerce
@exception = exception
end

def inspect
Expand All @@ -95,7 +97,7 @@ def _set(object)
end

def value
::Kernel.raise(@object) if @object.is_a?(::StandardError)
::Kernel.raise(@object) if @exception && @object.is_a?(::StandardError)
@object
end

Expand Down
2 changes: 1 addition & 1 deletion redis.gemspec
Expand Up @@ -45,5 +45,5 @@ Gem::Specification.new do |s|

s.required_ruby_version = '>= 2.6.0'

s.add_runtime_dependency('redis-client', '>= 0.17.0')
s.add_runtime_dependency('redis-client', '>= 0.22.0')
end
36 changes: 36 additions & 0 deletions test/redis/pipelining_commands_test.rb
Expand Up @@ -98,6 +98,18 @@ def test_assignment_of_results_inside_the_block_with_errors
assert_raises(Redis::FutureNotReady) { @second.value }
end

def test_assignment_of_results_inside_the_block_without_raising_exception
r.pipelined(exception: false) do |p|
@first = p.doesnt_exist
@second = p.sadd?("foo", 1)
@third = p.sadd?("foo", 1)
end

assert_equal RedisClient::CommandError, @first.value.class
assert_equal true, @second.value
assert_equal false, @third.value
end

def test_assignment_of_results_inside_a_nested_block
r.pipelined do |p|
@first = p.sadd?("foo", 1)
Expand All @@ -111,6 +123,30 @@ def test_assignment_of_results_inside_a_nested_block
assert_equal false, @second.value
end

def test_nested_pipelining_returns_without_raising_exception
result = r.pipelined(exception: false) do |p1|
p1.doesnt_exist
p1.set("foo", "42")
p1.pipelined do |p2|
p2.doesnt_exist_again
p2.set("bar", "99")
end
end

assert result[0].is_a?(RedisClient::CommandError)
assert_equal ["doesnt_exist"], result[0].command

assert_equal "OK", result[1]

assert result[2].is_a?(RedisClient::CommandError)
assert_equal ["doesnt_exist_again"], result[2].command

assert_equal "OK", result[3]

assert_equal "42", r.get("foo")
assert_equal "99", r.get("bar")
end

def test_futures_raise_when_confused_with_something_else
r.pipelined do |p|
@result = p.sadd?("foo", 1)
Expand Down

0 comments on commit b2f9c28

Please sign in to comment.