forked from redis/redis-rb
/
synchrony.rb
142 lines (117 loc) · 3.21 KB
/
synchrony.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# frozen_string_literal: true
require_relative "command_helper"
require_relative "registry"
require_relative "../errors"
require "em-synchrony"
require "hiredis/reader"
class Redis
module Connection
class RedisClient < EventMachine::Connection
include EventMachine::Deferrable
attr_accessor :timeout
def post_init
@req = nil
@connected = false
@reader = ::Hiredis::Reader.new
end
def connection_completed
@connected = true
succeed
end
def connected?
@connected
end
def receive_data(data)
@reader.feed(data)
loop do
begin
reply = @reader.gets
rescue RuntimeError => err
@req.fail [:error, ProtocolError.new(err.message)]
break
end
break if reply == false
reply = CommandError.new(reply.message) if reply.is_a?(RuntimeError)
@req.succeed [:reply, reply]
end
end
def read
@req = EventMachine::DefaultDeferrable.new
if @timeout > 0
@req.timeout(@timeout, :timeout)
end
EventMachine::Synchrony.sync @req
end
def send(data)
callback { send_data data }
end
def unbind
@connected = false
if @req
@req.fail [:error, Errno::ECONNRESET]
@req = nil
else
fail
end
end
end
class Synchrony
include Redis::Connection::CommandHelper
def self.connect(config)
if config[:scheme] == "unix"
begin
conn = EventMachine.connect_unix_domain(config[:path], RedisClient)
rescue RuntimeError => e
if e.message == "no connection"
raise Errno::ECONNREFUSED
else
raise e
end
end
elsif config[:scheme] == "rediss" || config[:ssl]
raise NotImplementedError, "SSL not supported by synchrony driver"
else
conn = EventMachine.connect(config[:host], config[:port], RedisClient) do |c|
c.pending_connect_timeout = [config[:connect_timeout], 0.1].max
end
end
fiber = Fiber.current
conn.callback { fiber.resume }
conn.errback { fiber.resume :refused }
raise Errno::ECONNREFUSED if Fiber.yield == :refused
instance = new(conn)
instance.timeout = config[:read_timeout]
instance
end
def initialize(connection)
@connection = connection
end
def connected?
@connection && @connection.connected?
end
def timeout=(timeout)
@connection.timeout = timeout
end
def disconnect
@connection.close_connection
@connection = nil
end
def write(command)
@connection.send(build_command(command))
end
def read
type, payload = @connection.read
if type == :reply
payload
elsif type == :error
raise payload
elsif type == :timeout
raise TimeoutError
else
raise "Unknown type #{type.inspect}"
end
end
end
end
end
Redis::Connection.drivers << Redis::Connection::Synchrony