diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 3ad5943f82..da9bba81c9 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -24,6 +24,7 @@ require 'fluent/variable_store' require 'fluent/capability' require 'fluent/plugin/in_tail/position_file' +require 'fluent/plugin/in_tail/group_watch' if Fluent.windows? require_relative 'file_wrapper' @@ -33,6 +34,8 @@ module Fluent::Plugin class TailInput < Fluent::Plugin::Input + include GroupWatch + Fluent::Plugin.register_input('tail', self) helpers :timer, :event_loop, :parser, :compat_parameters @@ -406,6 +409,8 @@ def setup_watcher(target_info, pe) event_loop_attach(watcher) end + tw.group_watcher = add_path_to_group_watcher(target_info.path) + tw rescue => e if tw @@ -461,6 +466,8 @@ def start_watchers(targets_info) def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true) targets_info.each_value { |target_info| + remove_path_from_group_watcher(target_info.path) + if remove_watcher tw = @tails.delete(target_info) else @@ -542,18 +549,19 @@ def detach_watcher(tw, ino, close_io = true) end end + def throttling_is_enabled?(tw) + return true if @read_bytes_limit_per_second > 0 + return true if tw.group_watcher && tw.group_watcher.limit >= 0 + false + end + def detach_watcher_after_rotate_wait(tw, ino) # Call event_loop_attach/event_loop_detach is high-cost for short-live object. # If this has a problem with large number of files, use @_event_loop directly instead of timer_execute. if @open_on_every_update # Detach now because it's already closed, waiting it doesn't make sense. detach_watcher(tw, ino) - elsif @read_bytes_limit_per_second < 0 - # throttling isn't enabled, just wait @rotate_wait - timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do - detach_watcher(tw, ino) - end - else + elsif throttling_is_enabled?(tw) # When the throttling feature is enabled, it might not reach EOF yet. # Should ensure to read all contents before closing it, with keeping throttling. start_time_to_wait = Fluent::Clock.now @@ -564,6 +572,11 @@ def detach_watcher_after_rotate_wait(tw, ino) detach_watcher(tw, ino) end end + else + # when the throttling feature isn't enabled, just wait @rotate_wait + timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do + detach_watcher(tw, ino) + end end end @@ -775,6 +788,7 @@ def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watch attr_reader :line_buffer_timer_flusher attr_accessor :unwatched # This is used for removing position entry from PositionFile attr_reader :watchers + attr_accessor :group_watcher def tag @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') @@ -997,6 +1011,10 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, @log.info "following tail of #{@path}" end + def group_watcher + @watcher.group_watcher + end + def on_notify @notify_mutex.synchronize { handle_notify } end @@ -1054,6 +1072,7 @@ def should_shutdown_now? def handle_notify return if limit_bytes_per_second_reached? + return if group_watcher&.limit_lines_reached?(@path) with_io do |io| begin @@ -1063,17 +1082,26 @@ def handle_notify begin while true @start_reading_time ||= Fluent::Clock.now + group_watcher&.update_reading_time(@path) + data = io.readpartial(BYTES_TO_READ, @iobuf) @eof = false @number_bytes_read += data.bytesize @fifo << data + + n_lines_before_read = @lines.size @fifo.read_lines(@lines) + group_watcher&.update_lines_read(@path, @lines.size - n_lines_before_read) - if limit_bytes_per_second_reached? || should_shutdown_now? + group_watcher_limit = group_watcher&.limit_lines_reached?(@path) + @log.debug "Reading Limit exceeded #{@path} #{group_watcher.number_lines_read}" if group_watcher_limit + + if group_watcher_limit || limit_bytes_per_second_reached? || should_shutdown_now? # Just get out from tailing loop. read_more = false break end + if @lines.size >= @read_lines_limit # not to use too much memory in case the file is very large read_more = true diff --git a/lib/fluent/plugin/in_tail/group_watch.rb b/lib/fluent/plugin/in_tail/group_watch.rb new file mode 100644 index 0000000000..758e3a76a9 --- /dev/null +++ b/lib/fluent/plugin/in_tail/group_watch.rb @@ -0,0 +1,204 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/input' + +module Fluent::Plugin + class TailInput < Fluent::Plugin::Input + module GroupWatchParams + include Fluent::Configurable + + DEFAULT_KEY = /.*/ + DEFAULT_LIMIT = -1 + REGEXP_JOIN = "_" + + config_section :group, param_name: :group, required: false, multi: false do + desc 'Regex for extracting group\'s metadata' + config_param :pattern, + :regexp, + default: /^\/var\/log\/containers\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$/ + + desc 'Period of time in which the group_line_limit is applied' + config_param :rate_period, :time, default: 5 + + config_section :rule, param_name: :rule, required: true, multi: true do + desc 'Key-value pairs for grouping' + config_param :match, :hash, value_type: :regexp, default: { namespace: [DEFAULT_KEY], podname: [DEFAULT_KEY] } + desc 'Maximum number of log lines allowed per group over a period of rate_period' + config_param :limit, :integer, default: DEFAULT_LIMIT + end + end + end + + module GroupWatch + def self.included(mod) + mod.include GroupWatchParams + end + + attr_reader :group_watchers, :default_group_key + + def initialize + super + @group_watchers = {} + @group_keys = nil + @default_group_key = nil + end + + def configure(conf) + super + + unless @group.nil? + ## Ensuring correct time period syntax + @group.rule.each { |rule| + raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= GroupWatchParams::DEFAULT_LIMIT + } + + @group_keys = Regexp.compile(@group.pattern).named_captures.keys + @default_group_key = ([GroupWatchParams::DEFAULT_KEY] * @group_keys.length).join(GroupWatchParams::REGEXP_JOIN) + + ## Ensures that "specific" rules (with larger number of `rule.match` keys) + ## have a higher priority against "generic" rules (with less number of `rule.match` keys). + ## This will be helpful when a file satisfies more than one rule. + @group.rule.sort_by! { |rule| -rule.match.length() } + construct_groupwatchers + @group_watchers[@default_group_key] ||= GroupWatcher.new(@group.rate_period, GroupWatchParams::DEFAULT_LIMIT) + end + end + + def add_path_to_group_watcher(path) + return nil if @group.nil? + group_watcher = find_group_from_metadata(path) + group_watcher.add(path) unless group_watcher.include?(path) + group_watcher + end + + def remove_path_from_group_watcher(path) + return if @group.nil? + group_watcher = find_group_from_metadata(path) + group_watcher.delete(path) + end + + def construct_group_key(named_captures) + match_rule = [] + @group_keys.each { |key| + match_rule.append(named_captures.fetch(key, GroupWatchParams::DEFAULT_KEY)) + } + match_rule = match_rule.join(GroupWatchParams::REGEXP_JOIN) + + match_rule + end + + def construct_groupwatchers + @group.rule.each { |rule| + match_rule = construct_group_key(rule.match) + @group_watchers[match_rule] ||= GroupWatcher.new(@group.rate_period, rule.limit) + } + end + + def find_group(metadata) + metadata_key = construct_group_key(metadata) + gw_key = @group_watchers.keys.find { |regexp| metadata_key.match?(regexp) && regexp != @default_group_key } + gw_key ||= @default_group_key + + @group_watchers[gw_key] + end + + def find_group_from_metadata(path) + begin + metadata = @group.pattern.match(path).named_captures + group_watcher = find_group(metadata) + rescue + log.warn "Cannot find group from metadata, Adding file in the default group" + group_watcher = @group_watchers[@default_group_key] + end + + group_watcher + end + end + + class GroupWatcher + attr_accessor :current_paths, :limit, :number_lines_read, :start_reading_time, :rate_period + + FileCounter = Struct.new( + :number_lines_read, + :start_reading_time, + ) + + def initialize(rate_period = 60, limit = -1) + @current_paths = {} + @rate_period = rate_period + @limit = limit + end + + def add(path) + @current_paths[path] = FileCounter.new(0, nil) + end + + def include?(path) + @current_paths.key?(path) + end + + def size + @current_paths.size + end + + def delete(path) + @current_paths.delete(path) + end + + def update_reading_time(path) + @current_paths[path].start_reading_time ||= Fluent::Clock.now + end + + def update_lines_read(path, value) + @current_paths[path].number_lines_read += value + end + + def reset_counter(path) + @current_paths[path].start_reading_time = nil + @current_paths[path].number_lines_read = 0 + end + + def time_spent_reading(path) + Fluent::Clock.now - @current_paths[path].start_reading_time + end + + def limit_time_period_reached?(path) + time_spent_reading(path) < @rate_period + end + + def limit_lines_reached?(path) + return true unless include?(path) + return true if @limit == 0 + + return false if @limit < 0 + return false if @current_paths[path].number_lines_read < @limit / size + + # update_reading_time(path) + if limit_time_period_reached?(path) # Exceeds limit + true + else # Does not exceed limit + reset_counter(path) + false + end + end + + def to_s + super + " current_paths: #{@current_paths} rate_period: #{@rate_period} limit: #{@limit}" + end + end + end +end diff --git a/test/plugin/in_tail/test_io_handler.rb b/test/plugin/in_tail/test_io_handler.rb index d9a18a1c94..9128d8ea46 100644 --- a/test/plugin/in_tail/test_io_handler.rb +++ b/test/plugin/in_tail/test_io_handler.rb @@ -21,12 +21,20 @@ class IntailIOHandlerTest < Test::Unit::TestCase @file.unlink rescue nil end + def create_target_info + Fluent::Plugin::TailInput::TargetInfo.new(@file.path, Fluent::FileWrapper.stat(@file.path).ino) + end + + def create_watcher + Fluent::Plugin::TailInput::TailWatcher.new(create_target_info, nil, nil, nil, nil, nil, nil, nil, nil) + end + test '#on_notify load file content and passed it to receive_lines method' do text = "this line is test\ntest line is test\n" @file.write(text) @file.close - watcher = 'watcher' + watcher = create_watcher update_pos = 0 @@ -61,7 +69,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase update_pos = 0 - watcher = 'watcher' + watcher = create_watcher stub(watcher).pe do pe = 'position_file' stub(pe).read_pos { 0 } @@ -92,7 +100,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase update_pos = 0 - watcher = 'watcher' + watcher = create_watcher stub(watcher).pe do pe = 'position_file' stub(pe).read_pos { 0 } @@ -118,7 +126,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase update_pos = 0 - watcher = 'watcher' + watcher = create_watcher stub(watcher).pe do pe = 'position_file' stub(pe).read_pos { 0 } diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index c96e72f558..8972e8bba9 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -91,12 +91,12 @@ def create_target_info(path) TMP_DIR = File.dirname(__FILE__) + "/../tmp/tail#{ENV['TEST_ENV_NUMBER']}" - CONFIG = config_element("ROOT", "", { - "path" => "#{TMP_DIR}/tail.txt", + ROOT_CONFIG = config_element("ROOT", "", { "tag" => "t1", "rotate_wait" => "2s", "refresh_interval" => "1s" }) + CONFIG = ROOT_CONFIG + config_element("", "", { "path" => "#{TMP_DIR}/tail.txt" }) COMMON_CONFIG = CONFIG + config_element("", "", { "pos_file" => "#{TMP_DIR}/tail.pos" }) CONFIG_READ_FROM_HEAD = config_element("", "", { "read_from_head" => true }) CONFIG_DISABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false }) @@ -144,6 +144,33 @@ def create_target_info(path) }) ]) + TAILING_GROUP_PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" + DEBUG_LOG_LEVEL = config_element("", "", { + "@log_level" => "debug" + }) + + def create_group_directive(pattern, rate_period, *rules) + config_element("", "", {}, [ + config_element("group", "", { + "pattern" => pattern, + "rate_period" => rate_period + }, rules) + ]) + end + + def create_rule_directive(match_named_captures, limit) + params = { + "limit" => limit, + "match" => match_named_captures, + } + + config_element("rule", "", params) + end + + def create_path_element(path) + config_element("source", "", { "path" => "#{TMP_DIR}/#{path}" }) + end + def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) config = use_common_conf ? COMMON_CONFIG + conf : conf Fluent::Test::Driver::Input.new(Fluent::Plugin::TailInput).configure(config) @@ -263,6 +290,165 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) end end + sub_test_case "configure group" do + test " required" do + conf = create_group_directive('.', '1m') + SINGLE_LINE_CONFIG + assert_raise(Fluent::ConfigError) do + create_driver(conf) + end + end + + test "valid configuration" do + rule1 = create_rule_directive({ + "namespace"=> "/namespace-a/", + "podname"=> "/podname-[b|c]/" + }, 100) + rule2 = create_rule_directive({ + "namespace"=> "/namespace-[d|e]/", + "podname"=> "/podname-f/", + }, 50) + rule3 = create_rule_directive({ + "podname"=> "/podname-g/", + }, -1) + rule4 = create_rule_directive({ + "namespace"=> "/namespace-h/", + }, 0) + + conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG + assert_nothing_raised do + create_driver(conf) + end + end + + test "limit should be greater than DEFAULT_LIMIT (-1)" do + rule1 = create_rule_directive({ + "namespace"=> "/namespace-a/", + "podname"=> "/podname-[b|c]/", + }, -100) + rule2 = create_rule_directive({ + "namespace"=> "/namespace-[d|e]/", + "podname"=> "/podname-f/", + }, 50) + conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2) + SINGLE_LINE_CONFIG + assert_raise(RuntimeError) do + create_driver(conf) + end + end + end + + sub_test_case "group rules line limit resolution" do + test "valid" do + rule1 = create_rule_directive({ + "namespace"=> "/namespace-a/", + "podname"=> "/podname-[b|c]/", + }, 50) + rule2 = create_rule_directive({ + "podname"=> "/podname-[b|c]/", + }, 400) + rule3 = create_rule_directive({ + "namespace"=> "/namespace-a/", + }, 100) + + conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule3, rule1, rule2) + SINGLE_LINE_CONFIG + assert_nothing_raised do + d = create_driver(conf) + instance = d.instance + + metadata = { + "namespace"=> "namespace-a", + "podname"=> "podname-b", + } + assert_equal(50, instance.find_group(metadata).limit) + + metadata = { + "namespace" => "namespace-a", + "podname" => "podname-c", + } + assert_equal(50, instance.find_group(metadata).limit) + + metadata = { + "namespace" => "namespace-a", + "podname" => "podname-d", + } + assert_equal(100, instance.find_group(metadata).limit) + + metadata = { + "namespace" => "namespace-f", + "podname" => "podname-b", + } + assert_equal(400, instance.find_group(metadata).limit) + + metadata = { + "podname" => "podname-c", + } + assert_equal(400, instance.find_group(metadata).limit) + + assert_equal(-1, instance.find_group({}).limit) + end + end + end + + sub_test_case "files should be placed in groups" do + test "invalid regex pattern places files in default group" do + rule1 = create_rule_directive({}, 100) ## limits default groups + conf = ROOT_CONFIG + DEBUG_LOG_LEVEL + create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG + + d = create_driver(conf, false) + File.open("#{TMP_DIR}/test1.txt", 'w') + File.open("#{TMP_DIR}/test2.txt", 'w') + File.open("#{TMP_DIR}/test3.txt", 'w') + + d.run do + ## checking default group_watcher's paths + instance = d.instance + key = instance.default_group_key + + assert_equal(3, instance.log.logs.count{|a| a.match?("Cannot find group from metadata, Adding file in the default group\n")}) + assert_equal(3, instance.group_watchers[key].size) + assert_true(instance.group_watchers[key].include? File.join(TMP_DIR, 'test1.txt')) + assert_true(instance.group_watchers[key].include? File.join(TMP_DIR, 'test2.txt')) + assert_true(instance.group_watchers[key].include? File.join(TMP_DIR, 'test3.txt')) + end + end + + test "valid regex pattern places file in their respective groups" do + rule1 = create_rule_directive({ + "namespace"=> "/test-namespace1/", + "podname"=> "/test-podname1/", + }, 100) + rule2 = create_rule_directive({ + "namespace"=> "/test-namespace1/", + }, 200) + rule3 = create_rule_directive({ + "podname"=> "/test-podname2/", + }, 300) + rule4 = create_rule_directive({}, 400) + + path_element = create_path_element("test-podname*.log") + + conf = ROOT_CONFIG + create_group_directive(TAILING_GROUP_PATTERN, '1m', rule4, rule3, rule2, rule1) + path_element + SINGLE_LINE_CONFIG + d = create_driver(conf, false) + + file1 = File.join(TMP_DIR, "test-podname1_test-namespace1_test-container-15fabq.log") + file2 = File.join(TMP_DIR, "test-podname3_test-namespace1_test-container-15fabq.log") + file3 = File.join(TMP_DIR, "test-podname2_test-namespace2_test-container-15fabq.log") + file4 = File.join(TMP_DIR, "test-podname4_test-namespace3_test-container-15fabq.log") + + d.run do + File.open(file1, 'w') + File.open(file2, 'w') + File.open(file3, 'w') + File.open(file4, 'w') + + instance = d.instance + assert_equal(100, instance.find_group_from_metadata(file1).limit) + assert_equal(200, instance.find_group_from_metadata(file2).limit) + assert_equal(300, instance.find_group_from_metadata(file3).limit) + assert_equal(400, instance.find_group_from_metadata(file4).limit) + end + end + end + sub_test_case "singleline" do data(flat: SINGLE_LINE_CONFIG, parse: PARSE_SINGLE_LINE_CONFIG) @@ -2360,4 +2546,108 @@ def test_shutdown_timeout elapsed = Fluent::Clock.now - start_time assert_true(elapsed > 0.5 && elapsed < 2.5) end + + sub_test_case "throttling logs at in_tail level" do + data("file test1.log no_limit 5120 text: msg" => ["test1.log", 5120, "msg"], + "file test2.log no_limit 1024 text: test" => ["test2.log", 1024, "test"]) + def test_lines_collected_with_no_throttling(data) + file, num_lines, msg = data + + pattern = "/^#{TMP_DIR}\/(?.+)\.log$/" + rule = create_rule_directive({ + "file" => "/test.*/", + }, -1) + group = create_group_directive(pattern, "1s", rule) + path_element = create_path_element(file) + + conf = ROOT_CONFIG + group + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + + File.open("#{TMP_DIR}/#{file}", 'wb') do |f| + num_lines.times do + f.puts "#{msg}\n" + end + end + + + d = create_driver(conf, false) + d.run(timeout: 3) do + start_time = Fluent::Clock.now + + assert_equal(num_lines, d.record_count) + assert_equal({ "message" => msg }, d.events[0][2]) + + prev_count = d.record_count + sleep(0.1) while d.emit_count < 1 + assert_true(Fluent::Clock.now - start_time < 2) + ## after waiting for 1 (+ jitter) secs, limit will reset + ## Plugin will start reading but it will encounter EOF Error + ## since no logs are left to be read + ## Hence, d.record_count = prev_count + tail_watcher_interval = 1.0 # hard coded value in in_tail + safety_ratio = 1.02 + jitter = tail_watcher_interval * safety_ratio + sleep(1.0 + jitter) + assert_equal(0, d.record_count - prev_count) + end + end + + test "lines collected with throttling" do + file = "podname1_namespace12_container-123456.log" + limit = 1000 + rate_period = 2 + num_lines = 3000 + msg = "a" * 8190 # Total size = 8190 bytes + 2 (\n) bytes + + rule = create_rule_directive({ + "namespace"=> "/namespace.+/", + "podname"=> "/podname.+/", + }, limit) + path_element = create_path_element(file) + conf = ROOT_CONFIG + create_group_directive(TAILING_GROUP_PATTERN, "#{rate_period}s", rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD + + d = create_driver(conf, false) + file_path = "#{TMP_DIR}/#{file}" + + File.open(file_path, 'wb') do |f| + num_lines.times do + f.puts msg + end + end + + d.run(timeout: 15) do + sleep_interval = 0.1 + tail_watcher_interval = 1.0 # hard coded value in in_tail + safety_ratio = 1.02 + lower_jitter = sleep_interval * safety_ratio + upper_jitter = (tail_watcher_interval + sleep_interval) * safety_ratio + lower_interval = rate_period - lower_jitter + upper_interval = rate_period + upper_jitter + + emit_count = 0 + prev_count = 0 + + while emit_count < 3 do + start_time = Fluent::Clock.now + sleep(sleep_interval) while d.emit_count <= emit_count + elapsed_seconds = Fluent::Clock.now - start_time + if emit_count > 0 + assert_true(elapsed_seconds > lower_interval && elapsed_seconds < upper_interval, + "elapsed_seconds #{elapsed_seconds} is out of allowed range:\n" + + " lower: #{lower_interval} [sec]\n" + + " upper: #{upper_interval} [sec]") + end + assert_equal(limit, d.record_count - prev_count) + emit_count = d.emit_count + prev_count = d.record_count + end + + ## When all the lines are read and rate_period seconds are over + ## limit will reset and since there are no more logs to be read, + ## number_lines_read will be 0 + sleep upper_interval + gw = d.instance.find_group_from_metadata(file_path) + assert_equal(0, gw.current_paths[file_path].number_lines_read) + end + end + end end