From 13fc410810ef6d1fbf1f75a2dcae8263ca7816d9 Mon Sep 17 00:00:00 2001 From: Pranjal Gupta Date: Sun, 6 Feb 2022 01:56:00 +0530 Subject: [PATCH] in_tail: Add `group.match` parameter for specifying generic rules Signed-off-by: Pranjal Gupta --- lib/fluent/plugin/in_tail.rb | 107 ++++++++++--------- test/plugin/test_in_tail.rb | 192 +++++++++++++++++++++++++---------- 2 files changed, 189 insertions(+), 110 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index cc19fa3c02..775b3ae32a 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -40,8 +40,9 @@ class TailInput < Fluent::Plugin::Input RESERVED_CHARS = ['/', '*', '%'].freeze MetricsInfo = Struct.new(:opened, :closed, :rotated) - DEFAULT_NAMESPACE = DEFAULT_APPNAME = /./ + DEFAULT_KEY = /.*/ DEFAULT_LIMIT = -1 + REGEXP_JOIN = "_" class WatcherSetupError < StandardError def initialize(msg) @@ -65,7 +66,8 @@ def initialize @startup = true # Map rules with GroupWatcher objects @group_watchers = {} - @sorted_path = nil + @group_keys = nil + @default_group_key = nil end desc 'The paths to read. Multiple paths can be specified, separated by comma.' @@ -129,22 +131,21 @@ def initialize config_section :group, param_name: :group, required: false, multi: false do desc 'Regex for extracting group\'s metadata' config_param :pattern, - :regexp, - default: /var\/log\/containers\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$/ + :regexp, + default: /^\/var\/log\/containers\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$/ + desc 'Period of time in which the group_line_limit is applied' config_param :rate_period, :time, default: 5 - config_section :rule, multi: true, required: true do - desc 'Namespace key' - config_param :namespace, :array, value_type: :string, default: [DEFAULT_NAMESPACE] - desc 'App name key' - config_param :appname, :array, value_type: :string, default: [DEFAULT_APPNAME] + config_section :rule, param_name: :rule, required: true, multi: true do + desc 'Key-value pairs for grouping' + config_param :match, :hash, value_type: :regexp, default: {namespace: [DEFAULT_KEY], appname: [DEFAULT_KEY]} desc 'Maximum number of log lines allowed per group over a period of rate_period' config_param :limit, :integer, default: DEFAULT_LIMIT end end - attr_reader :paths, :group_watchers + attr_reader :paths, :group_watchers, :default_group_key def configure(conf) @variable_store = Fluent::VariableStore.fetch_or_build(:in_tail) @@ -221,12 +222,22 @@ def configure(conf) end end - ## Ensuring correct time period syntax - @group.rule.each { |rule| - raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= DEFAULT_LIMIT - } unless @group.nil? - - construct_groupwatchers unless @group.nil? + unless @group.nil? + ## Ensuring correct time period syntax + @group.rule.each { |rule| + raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= DEFAULT_LIMIT + } + + @group_keys = Regexp.compile(@group.pattern).named_captures.keys + @default_group_key = ([DEFAULT_KEY] * @group_keys.length).join(REGEXP_JOIN) + + ## Ensures that "specific" rules (with larger number of `rule.match` keys) + ## have a higher priority against "generic" rules (with less number of `rule.match` keys). + ## This will be helpful when a file satisfies more than one rule. + @group.rule.sort_by!{ |rule| -rule.match.length() } + construct_groupwatchers + @group_watchers[@default_group_key] ||= GroupWatcher.new(@group.rate_period, DEFAULT_LIMIT) + end opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files") closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files") @@ -267,46 +278,38 @@ def parse_encoding_param(encoding_name) end end - def construct_groupwatchers - @group.rule.each { |rule| - num_groups = rule.namespace.size * rule.appname.size - - rule.namespace.each { |namespace| - namespace = /#{Regexp.quote(namespace)}/ unless namespace.eql?(DEFAULT_NAMESPACE) - @group_watchers[namespace] ||= {} - - rule.appname.each { |appname| - appname = /#{Regexp.quote(appname)}/ unless appname.eql?(DEFAULT_APPNAME) - @group_watchers[namespace][appname] = GroupWatcher.new(@group.rate_period, rule.limit/num_groups) - } - - @group_watchers[namespace][DEFAULT_APPNAME] ||= GroupWatcher.new(@group.rate_period) - } + def construct_group_key(named_captures) + match_rule = [] + @group_keys.each { |key| + match_rule.append(named_captures.fetch(key, DEFAULT_KEY)) } + match_rule = match_rule.join(REGEXP_JOIN) - if @group_watchers.dig(DEFAULT_NAMESPACE, DEFAULT_APPNAME).nil? - @group_watchers[DEFAULT_NAMESPACE] ||= {} - @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] = GroupWatcher.new(@group.rate_period) - end + match_rule end - def find_group(namespace, appname) - namespace_key = @group_watchers.keys.find { |regexp| namespace.match?(regexp) && regexp != DEFAULT_NAMESPACE } - namespace_key ||= DEFAULT_NAMESPACE - - appname_key = @group_watchers[namespace_key].keys.find { |regexp| appname.match?(regexp) && regexp != DEFAULT_APPNAME } - appname_key ||= DEFAULT_APPNAME + def construct_groupwatchers + @group.rule.each { |rule| + match_rule = construct_group_key(rule.match) + @group_watchers[match_rule] ||= GroupWatcher.new(@group.rate_period, rule.limit) + } + end - @group_watchers[namespace_key][appname_key] + def find_group(metadata) + metadata_key = construct_group_key(metadata) + gw_key = @group_watchers.keys.find{ |regexp| metadata_key.match?(regexp) && regexp != @default_group_key} + gw_key ||= @default_group_key + + @group_watchers[gw_key] end def find_group_from_metadata(path) begin - metadata = @group.pattern.match(path) - group_watcher = find_group(metadata['namespace'], metadata['appname']) + metadata = @group.pattern.match(path).named_captures + group_watcher = find_group(metadata) rescue => e - $log.warn "Cannot find group from metadata, Adding file in the default group" - group_watcher = @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] + log.warn "Cannot find group from metadata, Adding file in the default group" + group_watcher = @group_watchers[@default_group_key] end group_watcher @@ -636,7 +639,7 @@ def detach_watcher_after_rotate_wait(tw, ino) if @open_on_every_update # Detach now because it's already closed, waiting it doesn't make sense. detach_watcher(tw, ino) - elsif @read_bytes_limit_per_second < 0 || (!tw.group_watcher.nil? && tw.group_watcher.limit <= 0) + elsif @read_bytes_limit_per_second < 0 || tw.group_watcher&.limit <= 0 # throttling isn't enabled, just wait @rotate_wait timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do detach_watcher(tw, ino) @@ -913,7 +916,6 @@ def to_s end end - class TailWatcher def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) @path = target_info.path @@ -938,10 +940,6 @@ def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watch attr_reader :watchers attr_accessor :group_watcher - def group_watcher=(group_watcher) - @group_watcher = group_watcher - end - def tag @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') end @@ -1224,7 +1222,7 @@ def should_shutdown_now? def handle_notify return if limit_bytes_per_second_reached? - return if !group_watcher.nil? && group_watcher.limit_lines_reached?(@path) + return if group_watcher&.limit_lines_reached?(@path) with_io do |io| begin @@ -1243,7 +1241,8 @@ def handle_notify @fifo.read_lines(@lines) group_watcher.update_lines_read(@path, @lines.size) unless group_watcher.nil? - if !group_watcher.nil? && group_watcher.limit_lines_reached?(@path) || should_shutdown_now? + if group_watcher&.limit_lines_reached?(@path) || should_shutdown_now? + @log.debug "Reading Limit exceeded #{@path} #{group_watcher.number_lines_read}" unless should_shutdown_now? read_more = false break end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 05a320ac60..a352f681b0 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -144,7 +144,10 @@ def create_target_info(path) }) ]) - PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" + PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" + DEBUG_LOG_LEVEL = config_element("", "", { + "@log_level" => "debug" + }) def create_group_directive(pattern, rate_period, *rules) config_element("", "", {}, [ @@ -155,12 +158,12 @@ def create_group_directive(pattern, rate_period, *rules) ]) end - def create_rule_directive(namespace = [], appname = [], limit) + def create_rule_directive(match_named_captures, limit) params = { "limit" => limit, + "match" => match_named_captures, } - params["namespace"] = namespace.join(', ') if namespace.size > 0 - params["appname"] = appname.join(', ') if appname.size > 0 + config_element("rule", "", params) end @@ -182,21 +185,37 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) end test "valid configuration" do - rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], 100) - rule2 = create_rule_directive(['namespace-d', 'appname-e'], ['f'], 50) - rule3 = create_rule_directive([], ['appname-g'], -1) - rule4 = create_rule_directive(['appname-h'], [], 0) - - conf = create_group_directive('.', '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG + rule1 = create_rule_directive({ + "namespace"=> "/namespace-a/", + "podname"=> "/podname-[b|c]/" + }, 100) + rule2 = create_rule_directive({ + "namespace"=> "/namespace-[d|e]/", + "podname"=> "/podname-f/", + }, 50) + rule3 = create_rule_directive({ + "podname"=> "/podname-g/", + }, -1) + rule4 = create_rule_directive({ + "namespace"=> "/namespace-h/", + }, 0) + + conf = create_group_directive(PATTERN, '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG assert_nothing_raised do d = create_driver(conf) end end - test "limit should be greater than DEFAULT_LIMIT (-1)" do - rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], -100) - rule2 = create_rule_directive(['namespace-d', 'namespace-e'], ['appname-f'], 50) - conf = create_group_directive('.', '1m', rule1, rule2) + SINGLE_LINE_CONFIG + test "limit should be greater than DEFAULT_LIMIT (-1)" do + rule1 = create_rule_directive({ + "namespace"=> "/namespace-a/", + "podname"=> "/podname-[b|c]/", + }, -100) + rule2 = create_rule_directive({ + "namespace"=> "/namespace-[d|e]/", + "podname"=> "/podname-f/", + }, 50) + conf = create_group_directive(PATTERN, '1m', rule1, rule2) + SINGLE_LINE_CONFIG assert_raise(RuntimeError) do d = create_driver(conf) end @@ -318,20 +337,52 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) sub_test_case "group rules line limit resolution" do test "valid" do - rule1 = create_rule_directive(['namespace-a'], ['appname-b', 'appname-c'], 50) - rule2 = create_rule_directive([], ['appname-b', 'appname-c'], 400) - rule3 = create_rule_directive(['namespace-a'], [], 100) + rule1 = create_rule_directive({ + "namespace"=> "/namespace-a/", + "podname"=> "/podname-[b|c]/", + }, 50) + rule2 = create_rule_directive({ + "podname"=> "/podname-[b|c]/", + }, 400) + rule3 = create_rule_directive({ + "namespace"=> "/namespace-a/", + }, 100) - conf = create_group_directive('.', '1m', rule1, rule2, rule3) + SINGLE_LINE_CONFIG + conf = create_group_directive(PATTERN, '1m', rule3, rule1, rule2) + SINGLE_LINE_CONFIG assert_nothing_raised do d = create_driver(conf) + instance = d.instance + + metadata = { + "namespace"=> "namespace-a", + "podname"=> "podname-b", + } + assert_equal 50, instance.find_group(metadata).limit + + metadata = { + "namespace" => "namespace-a", + "podname" => "podname-c", + } + assert_equal 50, instance.find_group(metadata).limit + + metadata = { + "namespace" => "namespace-a", + "podname" => "podname-d", + } + assert_equal 100, instance.find_group(metadata).limit + + metadata = { + "namespace" => "namespace-f", + "podname" => "podname-b", + } + assert_equal 400, instance.find_group(metadata).limit + + metadata = { + "podname" => "podname-c", + } + assert_equal 400, instance.find_group(metadata).limit - assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-b/].limit - assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-c/].limit - assert_equal 100, d.instance.group_watchers[/namespace\-a/][/./].limit - assert_equal 200, d.instance.group_watchers[/./][/appname\-b/].limit - assert_equal 200, d.instance.group_watchers[/./][/appname\-c/].limit - assert_equal -1, d.instance.group_watchers[/./][/./].limit + assert_equal -1, instance.find_group({}).limit end end @@ -339,8 +390,8 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) sub_test_case "files should be placed in groups" do test "invalid regex pattern places files in default group" do - rule1 = create_rule_directive([], [], 100) ## limits default groups - conf = ROOT_CONFIG + create_group_directive('.', '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG + rule1 = create_rule_directive({}, 100) ## limits default groups + conf = ROOT_CONFIG + DEBUG_LOG_LEVEL + create_group_directive(PATTERN, '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG d = create_driver(conf, false) File.open("#{TMP_DIR}/test1.txt", 'w') @@ -349,34 +400,51 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) d.run do ## checking default group_watcher's paths - assert_equal 3, d.instance.group_watchers[/./][/./].size - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test1.txt') - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test2.txt') - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test3.txt') + instance = d.instance + key = instance.default_group_key + + assert_equal 3, instance.log.logs.count{|a| a.match?("Cannot find group from metadata, Adding file in the default group\n")} + assert_equal 3, instance.group_watchers[key].size + assert_true instance.group_watchers[key].include? File.join(TMP_DIR, 'test1.txt') + assert_true instance.group_watchers[key].include? File.join(TMP_DIR, 'test2.txt') + assert_true instance.group_watchers[key].include? File.join(TMP_DIR, 'test3.txt') end end test "valid regex pattern places file in their respective groups" do - rule1 = create_rule_directive(['test-namespace1'], ['test-appname1'], 100) - rule2 = create_rule_directive(['test-namespace1'], [], 200) - rule3 = create_rule_directive([], ['test-appname2'], 100) - rule4 = create_rule_directive([], [], 100) - - path_element = create_path_element("test-appname*.log") - - conf = ROOT_CONFIG + create_group_directive(PATTERN, '1m', rule1, rule2, rule3, rule4) + path_element + SINGLE_LINE_CONFIG + rule1 = create_rule_directive({ + "namespace"=> "/test-namespace1/", + "podname"=> "/test-podname1/", + }, 100) + rule2 = create_rule_directive({ + "namespace"=> "/test-namespace1/", + }, 200) + rule3 = create_rule_directive({ + "podname"=> "/test-podname2/", + }, 300) + rule4 = create_rule_directive({}, 400) + + path_element = create_path_element("test-podname*.log") + + conf = ROOT_CONFIG + create_group_directive(PATTERN, '1m', rule4, rule3, rule2, rule1) + path_element + SINGLE_LINE_CONFIG d = create_driver(conf, false) - File.open("#{TMP_DIR}/test-appname1_test-namespace1_test-container-15fabq.log", 'w') - File.open("#{TMP_DIR}/test-appname3_test-namespace1_test-container-15fabq.log", 'w') - File.open("#{TMP_DIR}/test-appname2_test-namespace2_test-container-15fabq.log", 'w') - File.open("#{TMP_DIR}/test-appname4_test-namespace3_test-container-15fabq.log", 'w') + file1 = File.join(TMP_DIR, "test-podname1_test-namespace1_test-container-15fabq.log") + file2 = File.join(TMP_DIR, "test-podname3_test-namespace1_test-container-15fabq.log") + file3 = File.join(TMP_DIR, "test-podname2_test-namespace2_test-container-15fabq.log") + file4 = File.join(TMP_DIR, "test-podname4_test-namespace3_test-container-15fabq.log") d.run do - assert_true d.instance.group_watchers[/test\-namespace1/][/test\-appname1/].include? File.join(TMP_DIR, "test-appname1_test-namespace1_test-container-15fabq.log") - assert_true d.instance.group_watchers[/test\-namespace1/][/./].include? File.join(TMP_DIR, "test-appname3_test-namespace1_test-container-15fabq.log") - assert_true d.instance.group_watchers[/./][/test\-appname2/].include? File.join(TMP_DIR, "test-appname2_test-namespace2_test-container-15fabq.log") - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, "test-appname4_test-namespace3_test-container-15fabq.log") + File.open(file1, 'w') + File.open(file2, 'w') + File.open(file3, 'w') + File.open(file4, 'w') + + instance = d.instance + assert_equal 100, instance.find_group_from_metadata(file1).limit + assert_equal 200, instance.find_group_from_metadata(file2).limit + assert_equal 300, instance.find_group_from_metadata(file3).limit + assert_equal 400, instance.find_group_from_metadata(file4).limit end end @@ -2482,14 +2550,20 @@ def test_shutdown_timeout sub_test_case "throttling logs at in_tail level" do - data("file test1.log no limit 5120 text: msg" => ["test1.log", 5120, "msg"], - "file test2.log no limit 1024 text: test" => ["test2.log", 1024, "test"]) + data("file test1.log no_limit 5120 text: msg" => ["test1.log", 5120, "msg"], + "file test2.log no_limit 1024 text: test" => ["test2.log", 1024, "test"]) def test_lines_collected_with_no_throttling(data) file, num_lines, msg = data - rule = create_rule_directive([], [], -1) + + pattern = "/^#{TMP_DIR}\/(?.+)\.log$/" + rule = create_rule_directive({ + "file" => "/test.*/", + }, -1) + group = create_group_directive(pattern, '10s', rule) path_element = create_path_element(file) - conf = ROOT_CONFIG + create_group_directive('.', '10s', rule) + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + conf = ROOT_CONFIG + group + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + File.open("#{TMP_DIR}/#{file}", 'wb') do |f| num_lines.times do f.puts "#{msg}\n" @@ -2517,19 +2591,23 @@ def test_lines_collected_with_no_throttling(data) end test "lines collected with throttling" do - file = "appname1_namespace12_container-123456.log" + file = "podname1_namespace12_container-123456.log" limit = 1000 rate_period = '10s' num_lines = 3000 msg = "a"*8190 # Total size = 8190 bytes + 2 (\n) bytes - rule = create_rule_directive(['namespace'], ['appname'], limit) + rule = create_rule_directive({ + "namespace"=> "/namespace.+/", + "podname"=> "/podname.+/", + }, limit) path_element = create_path_element(file) conf = ROOT_CONFIG + create_group_directive(PATTERN, rate_period, rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD d = create_driver(conf, false) - - File.open("#{TMP_DIR}/#{file}", 'wb') do |f| + file_path = "#{TMP_DIR}/#{file}" + + File.open(file_path, 'wb') do |f| num_lines.times do f.puts msg end @@ -2539,7 +2617,7 @@ def test_lines_collected_with_no_throttling(data) start_time = Fluent::Clock.now prev_count = 0 - 3.times do + (num_lines/limit).times do assert_true Fluent::Clock.now - start_time < 10 ## Check record_count after 10s to check lines reads assert_equal limit, d.record_count - prev_count @@ -2553,7 +2631,9 @@ def test_lines_collected_with_no_throttling(data) ## When all the lines are read and rate_period seconds are over ## limit will reset and since there are no more logs to be read, ## number_lines_read will be 0 - assert_equal 0, d.instance.group_watchers[/namespace/][/appname/].current_paths["#{TMP_DIR}/#{file}"].number_lines_read + + gw = d.instance.find_group_from_metadata(file_path) + assert_equal 0, gw.current_paths[file_path].number_lines_read end end end