/
client.rb
127 lines (104 loc) · 3.73 KB
/
client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# frozen_string_literal: true
require 'redis-cluster-client'
require 'redis/cluster/transaction_adapter'
class Redis
class Cluster
class Client < RedisClient::Cluster
ERROR_MAPPING = ::Redis::Client::ERROR_MAPPING.merge(
RedisClient::Cluster::InitialSetupError => Redis::Cluster::InitialSetupError,
RedisClient::Cluster::OrchestrationCommandNotSupported => Redis::Cluster::OrchestrationCommandNotSupported,
RedisClient::Cluster::AmbiguousNodeError => Redis::Cluster::AmbiguousNodeError,
RedisClient::Cluster::ErrorCollection => Redis::Cluster::CommandErrorCollection,
RedisClient::Cluster::Transaction::ConsistencyError => Redis::Cluster::TransactionConsistencyError,
RedisClient::Cluster::NodeMightBeDown => Redis::Cluster::NodeMightBeDown,
)
class << self
def config(**kwargs)
super(protocol: 2, **kwargs)
end
def sentinel(**kwargs)
super(protocol: 2, **kwargs)
end
def translate_error!(error, mapping: ERROR_MAPPING)
case error
when RedisClient::Cluster::ErrorCollection
error.errors.each do |_node, node_error|
if node_error.is_a?(RedisClient::AuthenticationError)
raise mapping.fetch(node_error.class), node_error.message, node_error.backtrace
end
end
remapped_node_errors = error.errors.map do |node_key, node_error|
remapped = mapping.fetch(node_error.class, node_error.class).new(node_error.message)
remapped.set_backtrace node_error.backtrace
[node_key, remapped]
end.to_h
raise(Redis::Cluster::CommandErrorCollection.new(remapped_node_errors, error.message).tap do |remapped|
remapped.set_backtrace error.backtrace
end)
else
Redis::Client.translate_error!(error, mapping: mapping)
end
end
end
def initialize(*)
handle_errors { super }
end
ruby2_keywords :initialize if respond_to?(:ruby2_keywords, true)
def id
@router.node_keys.join(' ')
end
def server_url
@router.node_keys
end
def connected?
true
end
def disable_reconnection
yield # TODO: do we need this, is it doable?
end
def timeout
config.read_timeout
end
def db
0
end
undef_method :call
undef_method :call_once
undef_method :call_once_v
undef_method :blocking_call
def call_v(command, &block)
handle_errors { super(command, &block) }
end
def blocking_call_v(timeout, command, &block)
timeout += self.timeout if timeout && timeout > 0
handle_errors { super(timeout, command, &block) }
end
def pipelined(exception: true, &block)
handle_errors { super(exception: exception, &block) }
end
def multi(watch: nil, &block)
handle_errors { super(watch: watch, &block) }
end
def watch(*keys)
unless block_given?
raise Redis::Cluster::TransactionConsistencyError, 'A block is required if you use the cluster client.'
end
handle_errors do
RedisClient::Cluster::OptimisticLocking.new(@router).watch(keys) do |c, slot, asking|
transaction = Redis::Cluster::TransactionAdapter.new(
self, @router, @command_builder, node: c, slot: slot, asking: asking
)
yield transaction
transaction.execute
end
end
end
private
def handle_errors
yield
rescue ::RedisClient::Error => error
Redis::Cluster::Client.translate_error!(error)
end
end
end
end