/
node.rb
108 lines (81 loc) · 2.32 KB
/
node.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# frozen_string_literal: true
require_relative '../errors'
class Redis
class Cluster
# Keep client list of node for Redis Cluster Client
class Node
include Enumerable
ReloadNeeded = Class.new(StandardError)
ROLE_SLAVE = 'slave'
def initialize(options, node_flags = {}, with_replica = false)
@with_replica = with_replica
@node_flags = node_flags
@clients = build_clients(options)
end
def each(&block)
@clients.values.each(&block)
end
def sample
@clients.values.sample
end
def find_by(node_key)
@clients.fetch(node_key)
rescue KeyError
raise ReloadNeeded
end
def call_all(command, &block)
try_map { |_, client| client.call(command, &block) }.values
end
def call_master(command, &block)
try_map do |node_key, client|
next if slave?(node_key)
client.call(command, &block)
end.values
end
def call_slave(command, &block)
return call_master(command, &block) if replica_disabled?
try_map do |node_key, client|
next if master?(node_key)
client.call(command, &block)
end.values
end
def process_all(commands, &block)
try_map { |_, client| client.process(commands, &block) }.values
end
private
def replica_disabled?
!@with_replica
end
def master?(node_key)
!slave?(node_key)
end
def slave?(node_key)
@node_flags[node_key] == ROLE_SLAVE
end
def build_clients(options)
clients = options.map do |node_key, option|
next if replica_disabled? && slave?(node_key)
option = option.merge(readonly: true) if slave?(node_key)
client = Client.new(option)
[node_key, client]
end
clients.compact.to_h
end
def try_map
errors = {}
results = {}
@clients.each do |node_key, client|
begin
reply = yield(node_key, client)
results[node_key] = reply unless reply.nil?
rescue CommandError => err
errors[node_key] = err
next
end
end
return results if errors.empty?
raise CommandErrorCollection, errors
end
end
end
end