/
pipeline.rb
129 lines (106 loc) · 2.71 KB
/
pipeline.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# frozen_string_literal: true
require "delegate"
class Redis
class PipelinedConnection
attr_accessor :db
def initialize(pipeline, futures = [], exception: true)
@pipeline = pipeline
@futures = futures
@exception = exception
end
include Commands
def pipelined
yield self
end
def multi
transaction = MultiConnection.new(@pipeline, @futures)
send_command([:multi])
size = @futures.size
yield transaction
multi_future = MultiFuture.new(@futures[size..-1])
@pipeline.call_v([:exec]) do |result|
multi_future._set(result)
end
@futures << multi_future
multi_future
end
private
def synchronize
yield self
end
def send_command(command, &block)
future = Future.new(command, block, @exception)
@pipeline.call_v(command) do |result|
future._set(result)
end
@futures << future
future
end
def send_blocking_command(command, timeout, &block)
future = Future.new(command, block, @exception)
@pipeline.blocking_call_v(timeout, command) do |result|
future._set(result)
end
@futures << future
future
end
end
class MultiConnection < PipelinedConnection
def multi
raise Redis::Error, "Can't nest multi transaction"
end
private
# Blocking commands inside transaction behave like non-blocking.
# It shouldn't be done though.
# https://redis.io/commands/blpop/#blpop-inside-a-multi--exec-transaction
def send_blocking_command(command, _timeout, &block)
send_command(command, &block)
end
end
class FutureNotReady < RuntimeError
def initialize
super("Value will be available once the pipeline executes.")
end
end
class Future < BasicObject
FutureNotReady = ::Redis::FutureNotReady.new
def initialize(command, coerce, exception)
@command = command
@object = FutureNotReady
@coerce = coerce
@exception = exception
end
def inspect
"<Redis::Future #{@command.inspect}>"
end
def _set(object)
@object = @coerce ? @coerce.call(object) : object
value
end
def value
::Kernel.raise(@object) if @exception && @object.is_a?(::StandardError)
@object
end
def is_a?(other)
self.class.ancestors.include?(other)
end
def class
Future
end
end
class MultiFuture < Future
def initialize(futures)
@futures = futures
@command = [:exec]
@object = FutureNotReady
end
def _set(replies)
if replies
@futures.each_with_index do |future, index|
future._set(replies[index])
end
end
@object = replies
end
end
end