/
service_discovery.rb
125 lines (104 loc) · 4.45 KB
/
service_discovery.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/plugin_helper/timer'
require 'fluent/plugin_helper/service_discovery/manager'
module Fluent
module PluginHelper
module ServiceDiscovery
include Fluent::PluginHelper::Timer
# For the compatibility with older versions without `param_name: :service_discovery_configs`
attr_reader :service_discovery
def self.included(mod)
mod.include ServiceDiscoveryParams
end
def configure(conf)
super
# For the compatibility with older versions without `param_name: :service_discovery_configs`
@service_discovery = @service_discovery_configs
end
def start
unless @discovery_manager
log.warn('There is no discovery_manager. skip start them')
super
return
end
@discovery_manager.start
unless @discovery_manager.static_config?
timer_execute(@_plugin_helper_service_discovery_title, @_plugin_helper_service_discovery_interval) do
@discovery_manager.run_once
end
end
super
end
%i[after_start stop before_shutdown shutdown after_shutdown close terminate].each do |mth|
define_method(mth) do
@discovery_manager&.__send__(mth)
super()
end
end
private
# @param title [Symbol] the thread name. this value should be unique.
# @param static_default_service_directive [String] the directive name of each service when "static" service discovery is enabled in default
# @param load_balancer [Object] object which has two methods #rebalance and #select_service
# @param custom_build_method [Proc]
def service_discovery_configure(title, static_default_service_directive: nil, load_balancer: nil, custom_build_method: nil, interval: 3)
configs = @service_discovery_configs.map(&:corresponding_config_element)
if static_default_service_directive
configs.prepend Fluent::Config::Element.new(
'service_discovery',
'',
{'@type' => 'static'},
@config.elements(name: static_default_service_directive.to_s).map{|e| Fluent::Config::Element.new('service', e.arg, e.dup, e.elements, e.unused) }
)
end
service_discovery_create_manager(title, configurations: configs, load_balancer: load_balancer, custom_build_method: custom_build_method, interval: interval)
end
def service_discovery_select_service(&block)
@discovery_manager.select_service(&block)
end
def service_discovery_services
@discovery_manager.services
end
def service_discovery_rebalance
@discovery_manager.rebalance
end
# @param title [Symbol] the thread name. this value should be unique.
# @param configurations [Hash] hash which must has discivery_service type and its configuration like `{ type: :static, conf: <Fluent::Config::Element> }`
# @param load_balancer [Object] object which has two methods #rebalance and #select_service
# @param custom_build_method [Proc]
def service_discovery_create_manager(title, configurations:, load_balancer: nil, custom_build_method: nil, interval: 3)
@_plugin_helper_service_discovery_title = title
@_plugin_helper_service_discovery_interval = interval
@discovery_manager = Fluent::PluginHelper::ServiceDiscovery::Manager.new(
log: log,
load_balancer: load_balancer,
custom_build_method: custom_build_method,
)
@discovery_manager.configure(configurations, parent: self)
@discovery_manager
end
def discovery_manager
@discovery_manager
end
module ServiceDiscoveryParams
include Fluent::Configurable
config_section :service_discovery, multi: true, param_name: :service_discovery_configs do
config_param :@type, :string
end
end
end
end
end