/
client.rb
656 lines (540 loc) · 17 KB
/
client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
# frozen_string_literal: true
require_relative "errors"
require "socket"
require "cgi"
class Redis
class Client
# Defaults are also used for converting string keys to symbols.
DEFAULTS = {
url: -> { ENV["REDIS_URL"] },
scheme: "redis",
host: "127.0.0.1",
port: 6379,
path: nil,
read_timeout: nil,
write_timeout: nil,
connect_timeout: nil,
timeout: 5.0,
username: nil,
password: nil,
db: 0,
driver: nil,
id: nil,
tcp_keepalive: 0,
reconnect_attempts: 1,
reconnect_delay: 0,
reconnect_delay_max: 0.5,
inherit_socket: false,
logger: nil,
sentinels: nil,
role: nil
}.freeze
attr_reader :options
def scheme
@options[:scheme]
end
def host
@options[:host]
end
def port
@options[:port]
end
def path
@options[:path]
end
def read_timeout
@options[:read_timeout]
end
def connect_timeout
@options[:connect_timeout]
end
def timeout
@options[:read_timeout]
end
def username
@options[:username]
end
def password
@options[:password]
end
def db
@options[:db]
end
def db=(db)
@options[:db] = db.to_i
end
def driver
@options[:driver]
end
def inherit_socket?
@options[:inherit_socket]
end
attr_accessor :logger
attr_reader :connection
attr_reader :command_map
def initialize(options = {})
@options = _parse_options(options)
@reconnect = true
@logger = @options[:logger]
@connection = nil
@command_map = {}
@pending_reads = 0
@connector =
if !@options[:sentinels].nil?
Connector::Sentinel.new(@options)
elsif options.include?(:connector) && options[:connector].respond_to?(:new)
options.delete(:connector).new(@options)
else
Connector.new(@options)
end
end
def connect
@pid = Process.pid
# Don't try to reconnect when the connection is fresh
with_reconnect(false) do
establish_connection
if password
if username
begin
call [:auth, username, password]
rescue CommandError => err # Likely on Redis < 6
if err.message.match?(/ERR wrong number of arguments for \'auth\' command/)
call [:auth, password]
else
raise
end
end
else
call [:auth, password]
end
end
call [:readonly] if @options[:readonly]
call [:select, db] if db != 0
call [:client, :setname, @options[:id]] if @options[:id]
@connector.check(self)
end
self
end
def id
@options[:id] || "redis://#{location}/#{db}"
end
def location
path || "#{host}:#{port}"
end
def call(command)
reply = process([command]) { read }
raise reply if reply.is_a?(CommandError)
if block_given? && reply != 'QUEUED'
yield reply
else
reply
end
end
def call_loop(command, timeout = 0)
error = nil
result = with_socket_timeout(timeout) do
process([command]) do
loop do
reply = read
if reply.is_a?(CommandError)
error = reply
break
else
yield reply
end
end
end
end
# Raise error when previous block broke out of the loop.
raise error if error
# Result is set to the value that the provided block used to break.
result
end
def call_pipeline(pipeline)
return [] if pipeline.futures.empty?
with_reconnect pipeline.with_reconnect? do
begin
pipeline.finish(call_pipelined(pipeline)).tap do
self.db = pipeline.db if pipeline.db
end
rescue ConnectionError => e
return nil if pipeline.shutdown?
# Assume the pipeline was sent in one piece, but execution of
# SHUTDOWN caused none of the replies for commands that were executed
# prior to it from coming back around.
raise e
end
end
end
def call_pipelined(pipeline)
return [] if pipeline.futures.empty?
# The method #ensure_connected (called from #process) reconnects once on
# I/O errors. To make an effort in making sure that commands are not
# executed more than once, only allow reconnection before the first reply
# has been read. When an error occurs after the first reply has been
# read, retrying would re-execute the entire pipeline, thus re-issuing
# already successfully executed commands. To circumvent this, don't retry
# after the first reply has been read successfully.
commands = pipeline.commands
result = Array.new(commands.size)
reconnect = @reconnect
begin
exception = nil
process(commands) do
pipeline.timeouts.each_with_index do |timeout, i|
reply = if timeout
with_socket_timeout(timeout) { read }
else
read
end
result[i] = reply
@reconnect = false
exception = reply if exception.nil? && reply.is_a?(CommandError)
end
end
raise exception if exception
ensure
@reconnect = reconnect
end
result
end
def call_with_timeout(command, timeout, &blk)
with_socket_timeout(timeout) do
call(command, &blk)
end
rescue ConnectionError
retry
end
def call_without_timeout(command, &blk)
call_with_timeout(command, 0, &blk)
end
def process(commands)
logging(commands) do
ensure_connected do
commands.each do |command|
if command_map[command.first]
command = command.dup
command[0] = command_map[command.first]
end
write(command)
end
yield if block_given?
end
end
end
def connected?
!!(connection && connection.connected?)
end
def disconnect
connection.disconnect if connected?
end
alias close disconnect
def reconnect
disconnect
connect
end
def io
yield
rescue TimeoutError => e1
# Add a message to the exception without destroying the original stack
e2 = TimeoutError.new("Connection timed out")
e2.set_backtrace(e1.backtrace)
raise e2
rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EBADF, Errno::EINVAL => e
raise ConnectionError, "Connection lost (%s)" % [e.class.name.split("::").last]
end
def read
io do
value = connection.read
@pending_reads -= 1
value
end
end
def write(command)
io do
@pending_reads += 1
connection.write(command)
end
end
def with_socket_timeout(timeout)
connect unless connected?
original = @options[:read_timeout]
begin
connection.timeout = timeout
@options[:read_timeout] = timeout # for reconnection
yield
ensure
connection.timeout = self.timeout if connected?
@options[:read_timeout] = original
end
end
def without_socket_timeout(&blk)
with_socket_timeout(0, &blk)
end
def with_reconnect(val = true)
original, @reconnect = @reconnect, val
yield
ensure
@reconnect = original
end
def without_reconnect(&blk)
with_reconnect(false, &blk)
end
protected
def logging(commands)
return yield unless @logger&.debug?
begin
commands.each do |name, *args|
logged_args = args.map do |a|
if a.respond_to?(:inspect) then a.inspect
elsif a.respond_to?(:to_s) then a.to_s
else
# handle poorly-behaved descendants of BasicObject
klass = a.instance_exec { (class << self; self end).superclass }
"\#<#{klass}:#{a.__id__}>"
end
end
@logger.debug("[Redis] command=#{name.to_s.upcase} args=#{logged_args.join(' ')}")
end
t1 = Time.now
yield
ensure
@logger.debug("[Redis] call_time=%0.2f ms" % ((Time.now - t1) * 1000)) if t1
end
end
def establish_connection
server = @connector.resolve.dup
@options[:host] = server[:host]
@options[:port] = Integer(server[:port]) if server.include?(:port)
@connection = @options[:driver].connect(@options)
@pending_reads = 0
rescue TimeoutError,
SocketError,
Errno::EADDRNOTAVAIL,
Errno::ECONNREFUSED,
Errno::EHOSTDOWN,
Errno::EHOSTUNREACH,
Errno::ENETUNREACH,
Errno::ENOENT,
Errno::ETIMEDOUT,
Errno::EINVAL => error
raise CannotConnectError, "Error connecting to Redis on #{location} (#{error.class})"
end
def ensure_connected
disconnect if @pending_reads > 0
attempts = 0
begin
attempts += 1
if connected?
unless inherit_socket? || Process.pid == @pid
raise InheritedError,
"Tried to use a connection from a child process without reconnecting. " \
"You need to reconnect to Redis after forking " \
"or set :inherit_socket to true."
end
else
connect
end
yield
rescue BaseConnectionError
disconnect
if attempts <= @options[:reconnect_attempts] && @reconnect
sleep_t = [(@options[:reconnect_delay] * 2**(attempts - 1)),
@options[:reconnect_delay_max]].min
Kernel.sleep(sleep_t)
retry
else
raise
end
rescue Exception
disconnect
raise
end
end
def _parse_options(options)
return options if options[:_parsed]
defaults = DEFAULTS.dup
options = options.dup
defaults.keys.each do |key|
# Fill in defaults if needed
defaults[key] = defaults[key].call if defaults[key].respond_to?(:call)
# Symbolize only keys that are needed
options[key] = options[key.to_s] if options.key?(key.to_s)
end
url = options[:url]
url = defaults[:url] if url.nil?
# Override defaults from URL if given
if url
require "uri"
uri = URI(url)
if uri.scheme == "unix"
defaults[:path] = uri.path
elsif uri.scheme == "redis" || uri.scheme == "rediss"
defaults[:scheme] = uri.scheme
defaults[:host] = uri.host if uri.host
defaults[:port] = uri.port if uri.port
defaults[:username] = CGI.unescape(uri.user) if uri.user && !uri.user.empty?
defaults[:password] = CGI.unescape(uri.password) if uri.password && !uri.password.empty?
defaults[:db] = uri.path[1..-1].to_i if uri.path
defaults[:role] = :master
else
raise ArgumentError, "invalid uri scheme '#{uri.scheme}'"
end
defaults[:ssl] = true if uri.scheme == "rediss"
end
# Use default when option is not specified or nil
defaults.keys.each do |key|
options[key] = defaults[key] if options[key].nil?
end
if options[:path]
# Unix socket
options[:scheme] = "unix"
options.delete(:host)
options.delete(:port)
else
# TCP socket
options[:host] = options[:host].to_s
options[:port] = options[:port].to_i
end
if options.key?(:timeout)
options[:connect_timeout] ||= options[:timeout]
options[:read_timeout] ||= options[:timeout]
options[:write_timeout] ||= options[:timeout]
end
options[:connect_timeout] = Float(options[:connect_timeout])
options[:read_timeout] = Float(options[:read_timeout])
options[:write_timeout] = Float(options[:write_timeout])
options[:reconnect_attempts] = options[:reconnect_attempts].to_i
options[:reconnect_delay] = options[:reconnect_delay].to_f
options[:reconnect_delay_max] = options[:reconnect_delay_max].to_f
options[:db] = options[:db].to_i
options[:driver] = _parse_driver(options[:driver]) || Connection.drivers.last
case options[:tcp_keepalive]
when Hash
%i[time intvl probes].each do |key|
unless options[:tcp_keepalive][key].is_a?(Integer)
raise "Expected the #{key.inspect} key in :tcp_keepalive to be an Integer"
end
end
when Integer
if options[:tcp_keepalive] >= 60
options[:tcp_keepalive] = { time: options[:tcp_keepalive] - 20, intvl: 10, probes: 2 }
elsif options[:tcp_keepalive] >= 30
options[:tcp_keepalive] = { time: options[:tcp_keepalive] - 10, intvl: 5, probes: 2 }
elsif options[:tcp_keepalive] >= 5
options[:tcp_keepalive] = { time: options[:tcp_keepalive] - 2, intvl: 2, probes: 1 }
end
end
options[:_parsed] = true
options
end
def _parse_driver(driver)
driver = driver.to_s if driver.is_a?(Symbol)
if driver.is_a?(String)
begin
require_relative "connection/#{driver}"
rescue LoadError, NameError
begin
require "redis/connection/#{driver}"
rescue LoadError, NameError => error
raise "Cannot load driver #{driver.inspect}: #{error.message}"
end
end
driver = Connection.const_get(driver.capitalize)
end
driver
end
class Connector
def initialize(options)
@options = options.dup
end
def resolve
@options
end
def check(client); end
class Sentinel < Connector
def initialize(options)
super(options)
@options[:db] = DEFAULTS.fetch(:db)
@sentinels = @options.delete(:sentinels).dup
@role = (@options[:role] || "master").to_s
@master = @options[:host]
end
def check(client)
# Check the instance is really of the role we are looking for.
# We can't assume the command is supported since it was introduced
# recently and this client should work with old stuff.
begin
role = client.call([:role])[0]
rescue Redis::CommandError
# Assume the test is passed if we can't get a reply from ROLE...
role = @role
end
if role != @role
client.disconnect
raise ConnectionError, "Instance role mismatch. Expected #{@role}, got #{role}."
end
end
def resolve
result = case @role
when "master"
resolve_master
when "slave"
resolve_slave
else
raise ArgumentError, "Unknown instance role #{@role}"
end
result || (raise ConnectionError, "Unable to fetch #{@role} via Sentinel.")
end
def sentinel_detect
@sentinels.each do |sentinel|
client = Client.new(@options.merge({
host: sentinel[:host] || sentinel["host"],
port: sentinel[:port] || sentinel["port"],
username: sentinel[:username] || sentinel["username"],
password: sentinel[:password] || sentinel["password"],
reconnect_attempts: 0
}))
begin
if result = yield(client)
# This sentinel responded. Make sure we ask it first next time.
@sentinels.delete(sentinel)
@sentinels.unshift(sentinel)
return result
end
rescue BaseConnectionError
ensure
client.disconnect
end
end
raise CannotConnectError, "No sentinels available."
end
def resolve_master
sentinel_detect do |client|
if reply = client.call(["sentinel", "get-master-addr-by-name", @master])
{ host: reply[0], port: reply[1] }
end
end
end
def resolve_slave
sentinel_detect do |client|
if reply = client.call(["sentinel", "slaves", @master])
slaves = reply.map { |s| s.each_slice(2).to_h }
slaves.each { |s| s['flags'] = s.fetch('flags').split(',') }
slaves.reject! { |s| s.fetch('flags').include?('s_down') }
if slaves.empty?
raise CannotConnectError, 'No slaves available.'
else
slave = slaves.sample
{
host: slave.fetch('ip'),
port: slave.fetch('port')
}
end
end
end
end
end
end
end
end