Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add flag to disable raising exceptions with pipelined #1263

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,7 @@
# Unreleased

- Add `exception` flag in `pipelined` allowing failed commands to be returned in the result array when set to `false`.

# 5.1.0

- `multi` now accept a `watch` keyword argument like `redis-client`. See #1236.
Expand Down
22 changes: 22 additions & 0 deletions README.md
Expand Up @@ -191,6 +191,28 @@ end
# => ["OK"]
```

### Exception management

The `exception` flag in the `#pipelined` is a feature that modifies the pipeline execution behavior. When set
to `false`, it doesn't raise an exception when a command error occurs. Instead, it allows the pipeline to execute all
commands, and any failed command will be available in the returned array. (Defaults to `true`)

```ruby
results = redis.pipelined(exception: false) do |pipeline|
pipeline.set('key1', 'value1')
pipeline.lpush('key1', 'something') # This will fail
pipeline.set('key2', 'value2')
end
# results => ["OK", #<RedisClient::WrongTypeError: WRONGTYPE Operation against a key holding the wrong kind of value>, "OK"]

results.each do |result|
if result.is_a?(Redis::CommandError)
# Do something with the failed result
end
end
```


### Executing commands atomically

You can use `MULTI/EXEC` to run a number of commands in an atomic
Expand Down
4 changes: 2 additions & 2 deletions cluster/lib/redis/cluster/client.rb
Expand Up @@ -90,8 +90,8 @@ def blocking_call_v(timeout, command, &block)
handle_errors { super(timeout, command, &block) }
end

def pipelined(&block)
handle_errors { super(&block) }
def pipelined(exception: true, &block)
handle_errors { super(exception: exception, &block) }
end

def multi(watch: nil, &block)
Expand Down
6 changes: 3 additions & 3 deletions lib/redis.rb
Expand Up @@ -99,10 +99,10 @@ def _client
@client
end

def pipelined
def pipelined(exception: true)
synchronize do |client|
client.pipelined do |raw_pipeline|
yield PipelinedConnection.new(raw_pipeline)
client.pipelined(exception: exception) do |raw_pipeline|
yield PipelinedConnection.new(raw_pipeline, exception: exception)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/redis/client.rb
Expand Up @@ -105,7 +105,7 @@ def blocking_call_v(timeout, command, &block)
Client.translate_error!(error)
end

def pipelined
def pipelined(exception: true)
super
rescue ::RedisClient::Error => error
Client.translate_error!(error)
Expand Down
12 changes: 7 additions & 5 deletions lib/redis/pipeline.rb
Expand Up @@ -6,9 +6,10 @@ class Redis
class PipelinedConnection
attr_accessor :db

def initialize(pipeline, futures = [])
def initialize(pipeline, futures = [], exception: true)
@pipeline = pipeline
@futures = futures
@exception = exception
end

include Commands
Expand Down Expand Up @@ -37,7 +38,7 @@ def synchronize
end

def send_command(command, &block)
future = Future.new(command, block)
future = Future.new(command, block, @exception)
@pipeline.call_v(command) do |result|
future._set(result)
end
Expand All @@ -46,7 +47,7 @@ def send_command(command, &block)
end

def send_blocking_command(command, timeout, &block)
future = Future.new(command, block)
future = Future.new(command, block, @exception)
@pipeline.blocking_call_v(timeout, command) do |result|
future._set(result)
end
Expand Down Expand Up @@ -79,10 +80,11 @@ def initialize
class Future < BasicObject
FutureNotReady = ::Redis::FutureNotReady.new

def initialize(command, coerce)
def initialize(command, coerce, exception)
@command = command
@object = FutureNotReady
@coerce = coerce
@exception = exception
end

def inspect
Expand All @@ -95,7 +97,7 @@ def _set(object)
end

def value
::Kernel.raise(@object) if @object.is_a?(::StandardError)
::Kernel.raise(@object) if @exception && @object.is_a?(::StandardError)
@object
end

Expand Down
2 changes: 1 addition & 1 deletion redis.gemspec
Expand Up @@ -45,5 +45,5 @@ Gem::Specification.new do |s|

s.required_ruby_version = '>= 2.6.0'

s.add_runtime_dependency('redis-client', '>= 0.17.0')
s.add_runtime_dependency('redis-client', '>= 0.22.0')
end
36 changes: 36 additions & 0 deletions test/redis/pipelining_commands_test.rb
Expand Up @@ -98,6 +98,18 @@ def test_assignment_of_results_inside_the_block_with_errors
assert_raises(Redis::FutureNotReady) { @second.value }
end

def test_assignment_of_results_inside_the_block_without_raising_exception
r.pipelined(exception: false) do |p|
@first = p.doesnt_exist
@second = p.sadd?("foo", 1)
@third = p.sadd?("foo", 1)
end

assert_equal RedisClient::CommandError, @first.value.class
assert_equal true, @second.value
assert_equal false, @third.value
end

def test_assignment_of_results_inside_a_nested_block
r.pipelined do |p|
@first = p.sadd?("foo", 1)
Expand All @@ -111,6 +123,30 @@ def test_assignment_of_results_inside_a_nested_block
assert_equal false, @second.value
end

def test_nested_pipelining_returns_without_raising_exception
result = r.pipelined(exception: false) do |p1|
p1.doesnt_exist
p1.set("foo", "42")
p1.pipelined do |p2|
p2.doesnt_exist_again
p2.set("bar", "99")
end
end

assert result[0].is_a?(RedisClient::CommandError)
assert_equal ["doesnt_exist"], result[0].command

assert_equal "OK", result[1]

assert result[2].is_a?(RedisClient::CommandError)
assert_equal ["doesnt_exist_again"], result[2].command

assert_equal "OK", result[3]

assert_equal "42", r.get("foo")
assert_equal "99", r.get("bar")
end

def test_futures_raise_when_confused_with_something_else
r.pipelined do |p|
@result = p.sadd?("foo", 1)
Expand Down