From e3a4ff6ffd65e40670c6044716353e0bfb0b30a0 Mon Sep 17 00:00:00 2001 From: Pranjal Gupta Date: Mon, 18 Oct 2021 12:49:50 +0530 Subject: [PATCH 01/23] in_tail_with_throttle: Add ThrottleInput Plugin Signed-off-by: Pranjal Gupta --- lib/fluent/plugin/in_tail_with_throttle.rb | 296 +++++++++++++++++++++ 1 file changed, 296 insertions(+) create mode 100644 lib/fluent/plugin/in_tail_with_throttle.rb diff --git a/lib/fluent/plugin/in_tail_with_throttle.rb b/lib/fluent/plugin/in_tail_with_throttle.rb new file mode 100644 index 0000000000..b297916222 --- /dev/null +++ b/lib/fluent/plugin/in_tail_with_throttle.rb @@ -0,0 +1,296 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/in_tail' + +module Fluent::Plugin + class ThrottleInput < Fluent::Plugin::TailInput + Fluent::Plugin.register_input('tail_with_throttle', self) + + DEFAULT_NAMESPACE = DEFAULT_APPNAME = /./ + DEFAULT_LIMIT = -1 + + attr_reader :group_watchers + + def initialize + super + # Map rules with GroupWatcher objects + @group_watchers = {} + @sorted_path = nil + end + + config_section :group, param_name: :group, required: true, multi: false do + desc 'Regex for extracting group\'s metadata' + config_param :pattern, + :regexp, + default: /var\/log\/containers\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$/ + desc 'Period of time in which the group_line_limit is applied' + config_param :rate_period, :time, default: 5 + + config_section :rule, multi: true, required: true do + desc 'Namespace key' + config_param :namespace, :array, value_type: :string, default: [DEFAULT_NAMESPACE] + desc 'App name key' + config_param :appname, :array, value_type: :string, default: [DEFAULT_APPNAME] + desc 'Maximum number of log lines allowed per group over a period of rate_period' + config_param :limit, :integer, default: DEFAULT_LIMIT + end + end + + def configure(conf) + super + ## Ensuring correct time period syntax + @group.rule.each { |rule| + raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= DEFAULT_LIMIT + } + + construct_groupwatchers + end + + def construct_groupwatchers + @group.rule.each { |rule| + num_groups = rule.namespace.size * rule.appname.size + + rule.namespace.each { |namespace| + namespace = /#{Regexp.quote(namespace)}/ unless namespace.eql?(DEFAULT_NAMESPACE) + @group_watchers[namespace] ||= {} + + rule.appname.each { |appname| + appname = /#{Regexp.quote(appname)}/ unless appname.eql?(DEFAULT_APPNAME) + @group_watchers[namespace][appname] = GroupWatcher.new(@group.rate_period, rule.limit/num_groups) + } + + @group_watchers[namespace][DEFAULT_APPNAME] ||= GroupWatcher.new(@group.rate_period) + } + } + + if @group_watchers.dig(DEFAULT_NAMESPACE, DEFAULT_APPNAME).nil? + @group_watchers[DEFAULT_NAMESPACE] ||= {} + @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] = GroupWatcher.new(@group.rate_period) + end + end + + def find_group_from_metadata(path) + def find_group(namespace, appname) + namespace_key = @group_watchers.keys.find { |regexp| namespace.match?(regexp) && regexp != DEFAULT_NAMESPACE } + namespace_key ||= DEFAULT_NAMESPACE + + appname_key = @group_watchers[namespace_key].keys.find { |regexp| appname.match?(regexp) && regexp != DEFAULT_APPNAME } + appname_key ||= DEFAULT_APPNAME + + @group_watchers[namespace_key][appname_key] + end + + begin + metadata = @group.pattern.match(path) + group_watcher = find_group(metadata['namespace'], metadata['appname']) + rescue => e + $log.warn "Cannot find group from metadata, Adding file in the default group" + group_watcher = @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] + end + + group_watcher + end + + def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true) + targets_info.each_value { |target_info| + group_watcher = find_group_from_metadata(target_info.path) + group_watcher.delete(target_info.path) + } + super + end + + def setup_watcher(target_info, pe) + tw = super + group_watcher = find_group_from_metadata(target_info.path) + group_watcher.add(tw.path) unless group_watcher.include?(tw.path) + tw.group_watcher = group_watcher + + tw + rescue => e + raise e + end + + def detach_watcher_after_rotate_wait(tw, ino) + # Call event_loop_attach/event_loop_detach is high-cost for short-live object. + # If this has a problem with large number of files, use @_event_loop directly instead of timer_execute. + if @open_on_every_update + # Detach now because it's already closed, waiting it doesn't make sense. + detach_watcher(tw, ino) + elsif !tw.group_watcher.nil? && tw.group_watcher.limit <= 0 + # throttling isn't enabled, just wait @rotate_wait + timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do + detach_watcher(tw, ino) + end + else + # When the throttling feature is enabled, it might not reach EOF yet. + # Should ensure to read all contents before closing it, with keeping throttling. + start_time_to_wait = Fluent::Clock.now + timer = timer_execute(:in_tail_close_watcher, 1, repeat: true) do + elapsed = Fluent::Clock.now - start_time_to_wait + if tw.eof? && elapsed >= @rotate_wait + timer.detach + detach_watcher(tw, ino) + end + end + end + end + + class GroupWatcher + attr_accessor :current_paths, :limit, :number_lines_read, :start_reading_time, :rate_period + + FileCounter = Struct.new( + :number_lines_read, + :start_reading_time, + ) + + def initialize(rate_period = 60, limit = -1) + @current_paths = {} + @rate_period = rate_period + @limit = limit + end + + def add(path) + @current_paths[path] = FileCounter.new(0, nil) + end + + def include?(path) + @current_paths.key?(path) + end + + def size + @current_paths.size + end + + def delete(path) + @current_paths.delete(path) + end + + def update_reading_time(path) + @current_paths[path].start_reading_time ||= Fluent::Clock.now + end + + def update_lines_read(path, value) + @current_paths[path].number_lines_read += value + end + + def reset_counter(path) + @current_paths[path].start_reading_time = nil + @current_paths[path].number_lines_read = 0 + end + + def time_spent_reading(path) + Fluent::Clock.now - @current_paths[path].start_reading_time + end + + def limit_time_period_reached?(path) + time_spent_reading(path) < @rate_period + end + + def limit_lines_reached?(path) + return true unless include?(path) + return true if @limit == 0 + + return false if @limit < 0 + return false if @current_paths[path].number_lines_read < @limit / size + + # update_reading_time(path) + if limit_time_period_reached?(path) # Exceeds limit + true + else # Does not exceed limit + reset_counter(path) + false + end + end + + def to_s + super + " current_paths: #{@current_paths} rate_period: #{@rate_period} limit: #{@limit}" + end + end + + class Fluent::Plugin::TailInput::TailWatcher + attr_accessor :group_watcher + + def group_watcher=(group_watcher) + @group_watcher = group_watcher + end + + + class Fluent::Plugin::TailInput::TailWatcher::IOHandler + alias_method :orig_handle_notify, :handle_notify + + def group_watcher + @watcher.group_watcher + end + + def handle_notify + if group_watcher.nil? + orig_handle_notify + else + rate_limit_handle_notify + end + end + + def rate_limit_handle_notify + return if group_watcher.limit_lines_reached?(@path) + + with_io do |io| + begin + read_more = false + + if !io.nil? && @lines.empty? + begin + while true + group_watcher.update_reading_time(@path) + data = io.readpartial(BYTES_TO_READ, @iobuf) + @eof = false + @fifo << data + group_watcher.update_lines_read(@path, -@lines.size) + @fifo.read_lines(@lines) + group_watcher.update_lines_read(@path, @lines.size) + + if group_watcher.limit_lines_reached?(@path) || should_shutdown_now? + # Just get out from tailing loop. + @log.info "Read limit exceeded #{@path}" if !should_shutdown_now? + read_more = false + break + elsif @lines.size >= @read_lines_limit + # not to use too much memory in case the file is very large + read_more = true + break + end + end + rescue EOFError + @eof = true + end + end + @log.debug "Lines read: #{@path} #{group_watcher.current_paths[@path].number_lines_read}" + + unless @lines.empty? + if @receive_lines.call(@lines, @watcher) + @watcher.pe.update_pos(io.pos - @fifo.bytesize) + @lines.clear + else + read_more = false + end + end + end while read_more + end + end + end + end + end +end \ No newline at end of file From 0fed6bd2a7198d1eb88ab46617b98e5fbf469f2b Mon Sep 17 00:00:00 2001 From: Pranjal Gupta Date: Mon, 18 Oct 2021 12:50:16 +0530 Subject: [PATCH 02/23] in_tail_with_throttle: Add configuration and rate limiting tests Signed-off-by: Pranjal Gupta --- test/plugin/test_in_tail_with_throttle.rb | 233 ++++++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 test/plugin/test_in_tail_with_throttle.rb diff --git a/test/plugin/test_in_tail_with_throttle.rb b/test/plugin/test_in_tail_with_throttle.rb new file mode 100644 index 0000000000..3f809410ee --- /dev/null +++ b/test/plugin/test_in_tail_with_throttle.rb @@ -0,0 +1,233 @@ +require_relative '../helper' +require_relative './test_in_tail' +require 'fluent/test' +require 'fluent/test/helpers' +require 'fluent/test/driver/input' +require 'fluent/plugin/in_tail_with_throttle' + +class ThrottleInputTest < TailInputTest + + def create_group_directive(pattern, rate_period, *rules) + config_element("", "", {}, [ + config_element("group", "", { + "pattern" => pattern, + "rate_period" => rate_period + }, rules) + ]) + end + + def create_rule_directive(namespace = [], appname = [], limit) + params = { + "limit" => limit, + } + params["namespace"] = namespace.join(', ') if namespace.size > 0 + params["appname"] = appname.join(', ') if appname.size > 0 + config_element("rule", "", params) + end + + def create_path_element(path) + config_element("source", "", { "path" => "#{TMP_DIR}/#{path}" }) + end + + def create_driver(conf, add_path = true) + conf = add_path ? conf + create_path_element("tail.txt") : conf + Fluent::Test::Driver::Input.new(Fluent::Plugin::ThrottleInput).configure(conf) + end + + CONFG = config_element("source", "", { + "@type" => "tail_with_throttle", + "tag" => "t1", + "pos_file" => "#{TMP_DIR}/tail.pos", + "refresh_interval" => "1s", + "rotate_wait" => "2s", + }, [ + config_element("parse", "", { + "@type" => "/(?.*)/" }) + ] + ) + PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" + READ_FROM_HEAD = config_element("source", "", { "read_from_head" => "true" }) + + sub_test_case "#configuration" do + + test " required" do + assert_raise(Fluent::ConfigError) do + d = create_driver(CONFG) + end + end + + test " required" do + conf = CONFG + create_group_directive('.', '1m') + assert_raise(Fluent::ConfigError) do + d = create_driver(conf) + end + end + + test "valid configuration" do + rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], 100) + rule2 = create_rule_directive(['namespace-d', 'appname-e'], ['f'], 50) + rule3 = create_rule_directive([], ['appname-g'], -1) + rule4 = create_rule_directive(['appname-h'], [], 0) + + conf = CONFG + create_group_directive('.', '1m', rule1, rule2, rule3, rule4) + assert_nothing_raised do + d = create_driver(conf) + end + end + + test "limit should be greater than DEFAULT_LIMIT (-1)" do + rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], -100) + rule2 = create_rule_directive(['namespace-d', 'namespace-e'], ['appname-f'], 50) + conf = CONFG + create_group_directive('.', '1m', rule1, rule2) + assert_raise(RuntimeError) do + d = create_driver(conf) + end + end + + end + + sub_test_case "group rules line limit resolution" do + + test "valid" do + rule1 = create_rule_directive(['namespace-a'], ['appname-b', 'appname-c'], 50) + rule2 = create_rule_directive([], ['appname-b', 'appname-c'], 400) + rule3 = create_rule_directive(['namespace-a'], [], 100) + + conf = CONFG + create_group_directive('.', '1m', rule1, rule2, rule3) + assert_nothing_raised do + d = create_driver(conf) + + assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-b/].limit + assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-c/].limit + assert_equal 100, d.instance.group_watchers[/namespace\-a/][/./].limit + assert_equal 200, d.instance.group_watchers[/./][/appname\-b/].limit + assert_equal 200, d.instance.group_watchers[/./][/appname\-c/].limit + assert_equal -1, d.instance.group_watchers[/./][/./].limit + end + end + + end + + sub_test_case "files should be placed in groups" do + test "invalid regex pattern places files in default group" do + rule1 = create_rule_directive([], [], 100) ## limits default groups + conf = CONFG + create_group_directive('.', '1m', rule1) + create_path_element("test*.txt") + + d = create_driver(conf, false) + File.open("#{TMP_DIR}/test1.txt", 'w') + File.open("#{TMP_DIR}/test2.txt", 'w') + File.open("#{TMP_DIR}/test3.txt", 'w') + + d.run do + ## checking default group_watcher's paths + assert_equal 3, d.instance.group_watchers[/./][/./].size + assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test1.txt') + assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test2.txt') + assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test3.txt') + end + end + + test "valid regex pattern places file in their respective groups" do + rule1 = create_rule_directive(['test-namespace1'], ['test-appname1'], 100) + rule2 = create_rule_directive(['test-namespace1'], [], 200) + rule3 = create_rule_directive([], ['test-appname2'], 100) + rule4 = create_rule_directive([], [], 100) + + path_element = create_path_element("test-appname*.log") + + conf = CONFG + create_group_directive(PATTERN, '1m', rule1, rule2, rule3, rule4) + path_element + d = create_driver(conf, false) + + File.open("#{TMP_DIR}/test-appname1_test-namespace1_test-container-15fabq.log", 'w') + File.open("#{TMP_DIR}/test-appname3_test-namespace1_test-container-15fabq.log", 'w') + File.open("#{TMP_DIR}/test-appname2_test-namespace2_test-container-15fabq.log", 'w') + File.open("#{TMP_DIR}/test-appname4_test-namespace3_test-container-15fabq.log", 'w') + + d.run do + assert_true d.instance.group_watchers[/test\-namespace1/][/test\-appname1/].include? File.join(TMP_DIR, "test-appname1_test-namespace1_test-container-15fabq.log") + assert_true d.instance.group_watchers[/test\-namespace1/][/./].include? File.join(TMP_DIR, "test-appname3_test-namespace1_test-container-15fabq.log") + assert_true d.instance.group_watchers[/./][/test\-appname2/].include? File.join(TMP_DIR, "test-appname2_test-namespace2_test-container-15fabq.log") + assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, "test-appname4_test-namespace3_test-container-15fabq.log") + end + end + + end + + sub_test_case "throttling logs at in_tail level" do + + data("file test1.log no limit 5120 text: msg" => ["test1.log", 5120, "msg"], + "file test2.log no limit 1024 text: test" => ["test2.log", 1024, "test"]) + def test_lines_collected_with_no_throttling(data) + file, num_lines, msg = data + rule = create_rule_directive([], [], -1) + path_element = create_path_element(file) + + conf = CONFG + create_group_directive('.', '10s', rule) + path_element + READ_FROM_HEAD + File.open("#{TMP_DIR}/#{file}", 'wb') do |f| + num_lines.times do + f.puts "#{msg}\n" + end + end + + + d = create_driver(conf, false) + d.run do + start_time = Time.now + + assert_true Time.now - start_time < 10 + assert_equal num_lines, d.record_count + assert_equal({ "message" => msg }, d.events[0][2]) + + prev_count = d.record_count + ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver + sleep(1) until Time.now - start_time > 12 + ## after waiting for 10 secs, limit will reset + ## Plugin will start reading but it will encounter EOF Error + ## since no logs are left to be read + ## Hence, d.record_count = prev_count + assert_equal 0, d.record_count - prev_count + end + end + + test "lines collected with throttling" do + file = "appname1_namespace12_container-123456.log" + limit = 1000 + rate_period = '10s' + num_lines = 3000 + msg = "a"*8190 # Total size = 8190 bytes + 2 (\n) bytes + + rule = create_rule_directive(['namespace'], ['appname'], limit) + path_element = create_path_element(file) + conf = CONFG + create_group_directive(PATTERN, rate_period, rule) + path_element + READ_FROM_HEAD + + d = create_driver(conf, false) + + File.open("#{TMP_DIR}/#{file}", 'wb') do |f| + num_lines.times do + f.puts msg + end + end + + d.run do + start_time = Time.now + prev_count = 0 + + 3.times do + assert_true Time.now - start_time < 10 + ## Check record_count after 10s to check lines reads + assert_equal limit, d.record_count - prev_count + prev_count = d.record_count + ## sleep until rate_period seconds are over so that + ## Plugin can read lines again + sleep(1) until Time.now - start_time > 12 + ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver + start_time = Time.now + end + ## When all the lines are read and rate_period seconds are over + ## limit will reset and since there are no more logs to be read, + ## number_lines_read will be 0 + assert_equal 0, d.instance.group_watchers[/namespace/][/appname/].current_paths["#{TMP_DIR}/#{file}"].number_lines_read + end + end + end +end From 10603da42f10fa62aa24e01bcfe59b70658ba6ed Mon Sep 17 00:00:00 2001 From: Pranjal Gupta Date: Wed, 20 Oct 2021 18:25:03 +0530 Subject: [PATCH 03/23] in_tail_with_throttle: Fix `test_io_handler` to use Tailwatcher rather than string Signed-off-by: Pranjal Gupta --- test/plugin/in_tail/test_io_handler.rb | 16 +++-- test/plugin/test_in_tail_with_throttle.rb | 81 ++++++++++++++++++++++- 2 files changed, 90 insertions(+), 7 deletions(-) diff --git a/test/plugin/in_tail/test_io_handler.rb b/test/plugin/in_tail/test_io_handler.rb index d9a18a1c94..9128d8ea46 100644 --- a/test/plugin/in_tail/test_io_handler.rb +++ b/test/plugin/in_tail/test_io_handler.rb @@ -21,12 +21,20 @@ class IntailIOHandlerTest < Test::Unit::TestCase @file.unlink rescue nil end + def create_target_info + Fluent::Plugin::TailInput::TargetInfo.new(@file.path, Fluent::FileWrapper.stat(@file.path).ino) + end + + def create_watcher + Fluent::Plugin::TailInput::TailWatcher.new(create_target_info, nil, nil, nil, nil, nil, nil, nil, nil) + end + test '#on_notify load file content and passed it to receive_lines method' do text = "this line is test\ntest line is test\n" @file.write(text) @file.close - watcher = 'watcher' + watcher = create_watcher update_pos = 0 @@ -61,7 +69,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase update_pos = 0 - watcher = 'watcher' + watcher = create_watcher stub(watcher).pe do pe = 'position_file' stub(pe).read_pos { 0 } @@ -92,7 +100,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase update_pos = 0 - watcher = 'watcher' + watcher = create_watcher stub(watcher).pe do pe = 'position_file' stub(pe).read_pos { 0 } @@ -118,7 +126,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase update_pos = 0 - watcher = 'watcher' + watcher = create_watcher stub(watcher).pe do pe = 'position_file' stub(pe).read_pos { 0 } diff --git a/test/plugin/test_in_tail_with_throttle.rb b/test/plugin/test_in_tail_with_throttle.rb index 3f809410ee..e2420a17dd 100644 --- a/test/plugin/test_in_tail_with_throttle.rb +++ b/test/plugin/test_in_tail_with_throttle.rb @@ -1,12 +1,87 @@ require_relative '../helper' -require_relative './test_in_tail' +# require_relative './test_in_tail' require 'fluent/test' require 'fluent/test/helpers' require 'fluent/test/driver/input' require 'fluent/plugin/in_tail_with_throttle' -class ThrottleInputTest < TailInputTest - +class ThrottleInputTest < Test::Unit::TestCase + + def setup + Fluent::Test.setup + cleanup_directory(TMP_DIR) + end + + def teardown + super + cleanup_directory(TMP_DIR) + Fluent::Engine.stop + end + + def cleanup_directory(path) + unless Dir.exist?(path) + FileUtils.mkdir_p(path) + return + end + + if Fluent.windows? + Dir.glob("*", base: path).each do |name| + begin + cleanup_file(File.join(path, name)) + rescue + # expect test driver block release already owned file handle. + end + end + else + begin + FileUtils.rm_f(path, secure:true) + rescue ArgumentError + FileUtils.rm_f(path) # For Ruby 2.6 or before. + end + if File.exist?(path) + FileUtils.remove_entry_secure(path, true) + end + end + FileUtils.mkdir_p(path) + end + + def cleanup_file(path) + if Fluent.windows? + # On Windows, when the file or directory is removed and created + # frequently, there is a case that creating file or directory will + # fail. This situation is caused by pending file or directory + # deletion which is mentioned on win32 API document [1] + # As a workaround, execute rename and remove method. + # + # [1] https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#files + # + file = File.join(Dir.tmpdir, SecureRandom.hex(10)) + begin + FileUtils.mv(path, file) + FileUtils.rm_rf(file, secure: true) + rescue ArgumentError + FileUtils.rm_rf(file) # For Ruby 2.6 or before. + end + if File.exist?(file) + # ensure files are closed for Windows, on which deleted files + # are still visible from filesystem + GC.start(full_mark: true, immediate_mark: true, immediate_sweep: true) + FileUtils.remove_entry_secure(file, true) + end + else + begin + FileUtils.rm_f(path, secure: true) + rescue ArgumentError + FileUtils.rm_f(path) # For Ruby 2.6 or before. + end + if File.exist?(path) + FileUtils.remove_entry_secure(path, true) + end + end + end + + TMP_DIR = File.dirname(__FILE__) + "/../tmp/tail_with_throttle#{ENV['TEST_ENV_NUMBER']}" + def create_group_directive(pattern, rate_period, *rules) config_element("", "", {}, [ config_element("group", "", { From 87e7426f220843d7cba1dfeeec2556856749c795 Mon Sep 17 00:00:00 2001 From: Pranjal Gupta Date: Fri, 22 Oct 2021 15:19:40 +0530 Subject: [PATCH 04/23] in_tail_with_throttle: Remove nested method and modify indentation Signed-off-by: Pranjal Gupta --- lib/fluent/plugin/in_tail_with_throttle.rb | 22 +++++++++++----------- test/plugin/test_in_tail_with_throttle.rb | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/fluent/plugin/in_tail_with_throttle.rb b/lib/fluent/plugin/in_tail_with_throttle.rb index b297916222..4772cfd287 100644 --- a/lib/fluent/plugin/in_tail_with_throttle.rb +++ b/lib/fluent/plugin/in_tail_with_throttle.rb @@ -83,17 +83,17 @@ def construct_groupwatchers end end - def find_group_from_metadata(path) - def find_group(namespace, appname) - namespace_key = @group_watchers.keys.find { |regexp| namespace.match?(regexp) && regexp != DEFAULT_NAMESPACE } - namespace_key ||= DEFAULT_NAMESPACE + def find_group(namespace, appname) + namespace_key = @group_watchers.keys.find { |regexp| namespace.match?(regexp) && regexp != DEFAULT_NAMESPACE } + namespace_key ||= DEFAULT_NAMESPACE - appname_key = @group_watchers[namespace_key].keys.find { |regexp| appname.match?(regexp) && regexp != DEFAULT_APPNAME } - appname_key ||= DEFAULT_APPNAME + appname_key = @group_watchers[namespace_key].keys.find { |regexp| appname.match?(regexp) && regexp != DEFAULT_APPNAME } + appname_key ||= DEFAULT_APPNAME - @group_watchers[namespace_key][appname_key] - end - + @group_watchers[namespace_key][appname_key] + end + + def find_group_from_metadata(path) begin metadata = @group.pattern.match(path) group_watcher = find_group(metadata['namespace'], metadata['appname']) @@ -120,8 +120,8 @@ def setup_watcher(target_info, pe) tw.group_watcher = group_watcher tw - rescue => e - raise e + rescue => e + raise e end def detach_watcher_after_rotate_wait(tw, ino) diff --git a/test/plugin/test_in_tail_with_throttle.rb b/test/plugin/test_in_tail_with_throttle.rb index e2420a17dd..daaa4c9161 100644 --- a/test/plugin/test_in_tail_with_throttle.rb +++ b/test/plugin/test_in_tail_with_throttle.rb @@ -1,5 +1,5 @@ require_relative '../helper' -# require_relative './test_in_tail' + require 'fluent/test' require 'fluent/test/helpers' require 'fluent/test/driver/input' From c76ffbdd07474f53f9403b7f0c510d752bdac590 Mon Sep 17 00:00:00 2001 From: Pranjal Gupta Date: Thu, 6 Jan 2022 20:46:41 +0530 Subject: [PATCH 05/23] in_tail: Add group based log collection from `in_tail_with_throttle` Signed-off-by: Pranjal Gupta --- lib/fluent/plugin/in_tail.rb | 182 +++++++++++- lib/fluent/plugin/in_tail_with_throttle.rb | 296 -------------------- test/plugin/test_in_tail.rb | 201 +++++++++++++- test/plugin/test_in_tail_with_throttle.rb | 308 --------------------- 4 files changed, 379 insertions(+), 608 deletions(-) delete mode 100644 lib/fluent/plugin/in_tail_with_throttle.rb delete mode 100644 test/plugin/test_in_tail_with_throttle.rb diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 3ad5943f82..cc19fa3c02 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -40,6 +40,9 @@ class TailInput < Fluent::Plugin::Input RESERVED_CHARS = ['/', '*', '%'].freeze MetricsInfo = Struct.new(:opened, :closed, :rotated) + DEFAULT_NAMESPACE = DEFAULT_APPNAME = /./ + DEFAULT_LIMIT = -1 + class WatcherSetupError < StandardError def initialize(msg) @message = msg @@ -60,6 +63,9 @@ def initialize @shutdown_start_time = nil @metrics = nil @startup = true + # Map rules with GroupWatcher objects + @group_watchers = {} + @sorted_path = nil end desc 'The paths to read. Multiple paths can be specified, separated by comma.' @@ -120,7 +126,25 @@ def initialize config_argument :usage, :string, default: 'in_tail_parser' end - attr_reader :paths + config_section :group, param_name: :group, required: false, multi: false do + desc 'Regex for extracting group\'s metadata' + config_param :pattern, + :regexp, + default: /var\/log\/containers\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$/ + desc 'Period of time in which the group_line_limit is applied' + config_param :rate_period, :time, default: 5 + + config_section :rule, multi: true, required: true do + desc 'Namespace key' + config_param :namespace, :array, value_type: :string, default: [DEFAULT_NAMESPACE] + desc 'App name key' + config_param :appname, :array, value_type: :string, default: [DEFAULT_APPNAME] + desc 'Maximum number of log lines allowed per group over a period of rate_period' + config_param :limit, :integer, default: DEFAULT_LIMIT + end + end + + attr_reader :paths, :group_watchers def configure(conf) @variable_store = Fluent::VariableStore.fetch_or_build(:in_tail) @@ -196,6 +220,14 @@ def configure(conf) @read_bytes_limit_per_second = min_bytes end end + + ## Ensuring correct time period syntax + @group.rule.each { |rule| + raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= DEFAULT_LIMIT + } unless @group.nil? + + construct_groupwatchers unless @group.nil? + opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files") closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files") rotated_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_rotated_total", help_text: "Total number of rotated files") @@ -235,6 +267,51 @@ def parse_encoding_param(encoding_name) end end + def construct_groupwatchers + @group.rule.each { |rule| + num_groups = rule.namespace.size * rule.appname.size + + rule.namespace.each { |namespace| + namespace = /#{Regexp.quote(namespace)}/ unless namespace.eql?(DEFAULT_NAMESPACE) + @group_watchers[namespace] ||= {} + + rule.appname.each { |appname| + appname = /#{Regexp.quote(appname)}/ unless appname.eql?(DEFAULT_APPNAME) + @group_watchers[namespace][appname] = GroupWatcher.new(@group.rate_period, rule.limit/num_groups) + } + + @group_watchers[namespace][DEFAULT_APPNAME] ||= GroupWatcher.new(@group.rate_period) + } + } + + if @group_watchers.dig(DEFAULT_NAMESPACE, DEFAULT_APPNAME).nil? + @group_watchers[DEFAULT_NAMESPACE] ||= {} + @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] = GroupWatcher.new(@group.rate_period) + end + end + + def find_group(namespace, appname) + namespace_key = @group_watchers.keys.find { |regexp| namespace.match?(regexp) && regexp != DEFAULT_NAMESPACE } + namespace_key ||= DEFAULT_NAMESPACE + + appname_key = @group_watchers[namespace_key].keys.find { |regexp| appname.match?(regexp) && regexp != DEFAULT_APPNAME } + appname_key ||= DEFAULT_APPNAME + + @group_watchers[namespace_key][appname_key] + end + + def find_group_from_metadata(path) + begin + metadata = @group.pattern.match(path) + group_watcher = find_group(metadata['namespace'], metadata['appname']) + rescue => e + $log.warn "Cannot find group from metadata, Adding file in the default group" + group_watcher = @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] + end + + group_watcher + end + def start super @@ -406,6 +483,12 @@ def setup_watcher(target_info, pe) event_loop_attach(watcher) end + unless @group.nil? + group_watcher = find_group_from_metadata(target_info.path) + group_watcher.add(tw.path) unless group_watcher.include?(tw.path) + tw.group_watcher = group_watcher + end + tw rescue => e if tw @@ -461,6 +544,11 @@ def start_watchers(targets_info) def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true) targets_info.each_value { |target_info| + unless @group.nil? + group_watcher = find_group_from_metadata(target_info.path) + group_watcher.delete(target_info.path) + end + if remove_watcher tw = @tails.delete(target_info) else @@ -548,7 +636,7 @@ def detach_watcher_after_rotate_wait(tw, ino) if @open_on_every_update # Detach now because it's already closed, waiting it doesn't make sense. detach_watcher(tw, ino) - elsif @read_bytes_limit_per_second < 0 + elsif @read_bytes_limit_per_second < 0 || (!tw.group_watcher.nil? && tw.group_watcher.limit <= 0) # throttling isn't enabled, just wait @rotate_wait timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do detach_watcher(tw, ino) @@ -753,6 +841,79 @@ def on_timer end end + class GroupWatcher + attr_accessor :current_paths, :limit, :number_lines_read, :start_reading_time, :rate_period + + FileCounter = Struct.new( + :number_lines_read, + :start_reading_time, + ) + + def initialize(rate_period = 60, limit = -1) + @current_paths = {} + @rate_period = rate_period + @limit = limit + end + + def add(path) + @current_paths[path] = FileCounter.new(0, nil) + end + + def include?(path) + @current_paths.key?(path) + end + + def size + @current_paths.size + end + + def delete(path) + @current_paths.delete(path) + end + + def update_reading_time(path) + @current_paths[path].start_reading_time ||= Fluent::Clock.now + end + + def update_lines_read(path, value) + @current_paths[path].number_lines_read += value + end + + def reset_counter(path) + @current_paths[path].start_reading_time = nil + @current_paths[path].number_lines_read = 0 + end + + def time_spent_reading(path) + Fluent::Clock.now - @current_paths[path].start_reading_time + end + + def limit_time_period_reached?(path) + time_spent_reading(path) < @rate_period + end + + def limit_lines_reached?(path) + return true unless include?(path) + return true if @limit == 0 + + return false if @limit < 0 + return false if @current_paths[path].number_lines_read < @limit / size + + # update_reading_time(path) + if limit_time_period_reached?(path) # Exceeds limit + true + else # Does not exceed limit + reset_counter(path) + false + end + end + + def to_s + super + " current_paths: #{@current_paths} rate_period: #{@rate_period} limit: #{@limit}" + end + end + + class TailWatcher def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) @path = target_info.path @@ -775,6 +936,11 @@ def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watch attr_reader :line_buffer_timer_flusher attr_accessor :unwatched # This is used for removing position entry from PositionFile attr_reader :watchers + attr_accessor :group_watcher + + def group_watcher=(group_watcher) + @group_watcher = group_watcher + end def tag @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') @@ -997,6 +1163,10 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, @log.info "following tail of #{@path}" end + def group_watcher + @watcher.group_watcher + end + def on_notify @notify_mutex.synchronize { handle_notify } end @@ -1054,6 +1224,7 @@ def should_shutdown_now? def handle_notify return if limit_bytes_per_second_reached? + return if !group_watcher.nil? && group_watcher.limit_lines_reached?(@path) with_io do |io| begin @@ -1063,12 +1234,19 @@ def handle_notify begin while true @start_reading_time ||= Fluent::Clock.now + group_watcher.update_reading_time(@path) unless group_watcher.nil? data = io.readpartial(BYTES_TO_READ, @iobuf) @eof = false @number_bytes_read += data.bytesize @fifo << data + group_watcher.update_lines_read(@path, -@lines.size) unless group_watcher.nil? @fifo.read_lines(@lines) + group_watcher.update_lines_read(@path, @lines.size) unless group_watcher.nil? + if !group_watcher.nil? && group_watcher.limit_lines_reached?(@path) || should_shutdown_now? + read_more = false + break + end if limit_bytes_per_second_reached? || should_shutdown_now? # Just get out from tailing loop. read_more = false diff --git a/lib/fluent/plugin/in_tail_with_throttle.rb b/lib/fluent/plugin/in_tail_with_throttle.rb deleted file mode 100644 index 4772cfd287..0000000000 --- a/lib/fluent/plugin/in_tail_with_throttle.rb +++ /dev/null @@ -1,296 +0,0 @@ -# -# Fluentd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -require 'fluent/plugin/in_tail' - -module Fluent::Plugin - class ThrottleInput < Fluent::Plugin::TailInput - Fluent::Plugin.register_input('tail_with_throttle', self) - - DEFAULT_NAMESPACE = DEFAULT_APPNAME = /./ - DEFAULT_LIMIT = -1 - - attr_reader :group_watchers - - def initialize - super - # Map rules with GroupWatcher objects - @group_watchers = {} - @sorted_path = nil - end - - config_section :group, param_name: :group, required: true, multi: false do - desc 'Regex for extracting group\'s metadata' - config_param :pattern, - :regexp, - default: /var\/log\/containers\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$/ - desc 'Period of time in which the group_line_limit is applied' - config_param :rate_period, :time, default: 5 - - config_section :rule, multi: true, required: true do - desc 'Namespace key' - config_param :namespace, :array, value_type: :string, default: [DEFAULT_NAMESPACE] - desc 'App name key' - config_param :appname, :array, value_type: :string, default: [DEFAULT_APPNAME] - desc 'Maximum number of log lines allowed per group over a period of rate_period' - config_param :limit, :integer, default: DEFAULT_LIMIT - end - end - - def configure(conf) - super - ## Ensuring correct time period syntax - @group.rule.each { |rule| - raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= DEFAULT_LIMIT - } - - construct_groupwatchers - end - - def construct_groupwatchers - @group.rule.each { |rule| - num_groups = rule.namespace.size * rule.appname.size - - rule.namespace.each { |namespace| - namespace = /#{Regexp.quote(namespace)}/ unless namespace.eql?(DEFAULT_NAMESPACE) - @group_watchers[namespace] ||= {} - - rule.appname.each { |appname| - appname = /#{Regexp.quote(appname)}/ unless appname.eql?(DEFAULT_APPNAME) - @group_watchers[namespace][appname] = GroupWatcher.new(@group.rate_period, rule.limit/num_groups) - } - - @group_watchers[namespace][DEFAULT_APPNAME] ||= GroupWatcher.new(@group.rate_period) - } - } - - if @group_watchers.dig(DEFAULT_NAMESPACE, DEFAULT_APPNAME).nil? - @group_watchers[DEFAULT_NAMESPACE] ||= {} - @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] = GroupWatcher.new(@group.rate_period) - end - end - - def find_group(namespace, appname) - namespace_key = @group_watchers.keys.find { |regexp| namespace.match?(regexp) && regexp != DEFAULT_NAMESPACE } - namespace_key ||= DEFAULT_NAMESPACE - - appname_key = @group_watchers[namespace_key].keys.find { |regexp| appname.match?(regexp) && regexp != DEFAULT_APPNAME } - appname_key ||= DEFAULT_APPNAME - - @group_watchers[namespace_key][appname_key] - end - - def find_group_from_metadata(path) - begin - metadata = @group.pattern.match(path) - group_watcher = find_group(metadata['namespace'], metadata['appname']) - rescue => e - $log.warn "Cannot find group from metadata, Adding file in the default group" - group_watcher = @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] - end - - group_watcher - end - - def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true) - targets_info.each_value { |target_info| - group_watcher = find_group_from_metadata(target_info.path) - group_watcher.delete(target_info.path) - } - super - end - - def setup_watcher(target_info, pe) - tw = super - group_watcher = find_group_from_metadata(target_info.path) - group_watcher.add(tw.path) unless group_watcher.include?(tw.path) - tw.group_watcher = group_watcher - - tw - rescue => e - raise e - end - - def detach_watcher_after_rotate_wait(tw, ino) - # Call event_loop_attach/event_loop_detach is high-cost for short-live object. - # If this has a problem with large number of files, use @_event_loop directly instead of timer_execute. - if @open_on_every_update - # Detach now because it's already closed, waiting it doesn't make sense. - detach_watcher(tw, ino) - elsif !tw.group_watcher.nil? && tw.group_watcher.limit <= 0 - # throttling isn't enabled, just wait @rotate_wait - timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do - detach_watcher(tw, ino) - end - else - # When the throttling feature is enabled, it might not reach EOF yet. - # Should ensure to read all contents before closing it, with keeping throttling. - start_time_to_wait = Fluent::Clock.now - timer = timer_execute(:in_tail_close_watcher, 1, repeat: true) do - elapsed = Fluent::Clock.now - start_time_to_wait - if tw.eof? && elapsed >= @rotate_wait - timer.detach - detach_watcher(tw, ino) - end - end - end - end - - class GroupWatcher - attr_accessor :current_paths, :limit, :number_lines_read, :start_reading_time, :rate_period - - FileCounter = Struct.new( - :number_lines_read, - :start_reading_time, - ) - - def initialize(rate_period = 60, limit = -1) - @current_paths = {} - @rate_period = rate_period - @limit = limit - end - - def add(path) - @current_paths[path] = FileCounter.new(0, nil) - end - - def include?(path) - @current_paths.key?(path) - end - - def size - @current_paths.size - end - - def delete(path) - @current_paths.delete(path) - end - - def update_reading_time(path) - @current_paths[path].start_reading_time ||= Fluent::Clock.now - end - - def update_lines_read(path, value) - @current_paths[path].number_lines_read += value - end - - def reset_counter(path) - @current_paths[path].start_reading_time = nil - @current_paths[path].number_lines_read = 0 - end - - def time_spent_reading(path) - Fluent::Clock.now - @current_paths[path].start_reading_time - end - - def limit_time_period_reached?(path) - time_spent_reading(path) < @rate_period - end - - def limit_lines_reached?(path) - return true unless include?(path) - return true if @limit == 0 - - return false if @limit < 0 - return false if @current_paths[path].number_lines_read < @limit / size - - # update_reading_time(path) - if limit_time_period_reached?(path) # Exceeds limit - true - else # Does not exceed limit - reset_counter(path) - false - end - end - - def to_s - super + " current_paths: #{@current_paths} rate_period: #{@rate_period} limit: #{@limit}" - end - end - - class Fluent::Plugin::TailInput::TailWatcher - attr_accessor :group_watcher - - def group_watcher=(group_watcher) - @group_watcher = group_watcher - end - - - class Fluent::Plugin::TailInput::TailWatcher::IOHandler - alias_method :orig_handle_notify, :handle_notify - - def group_watcher - @watcher.group_watcher - end - - def handle_notify - if group_watcher.nil? - orig_handle_notify - else - rate_limit_handle_notify - end - end - - def rate_limit_handle_notify - return if group_watcher.limit_lines_reached?(@path) - - with_io do |io| - begin - read_more = false - - if !io.nil? && @lines.empty? - begin - while true - group_watcher.update_reading_time(@path) - data = io.readpartial(BYTES_TO_READ, @iobuf) - @eof = false - @fifo << data - group_watcher.update_lines_read(@path, -@lines.size) - @fifo.read_lines(@lines) - group_watcher.update_lines_read(@path, @lines.size) - - if group_watcher.limit_lines_reached?(@path) || should_shutdown_now? - # Just get out from tailing loop. - @log.info "Read limit exceeded #{@path}" if !should_shutdown_now? - read_more = false - break - elsif @lines.size >= @read_lines_limit - # not to use too much memory in case the file is very large - read_more = true - break - end - end - rescue EOFError - @eof = true - end - end - @log.debug "Lines read: #{@path} #{group_watcher.current_paths[@path].number_lines_read}" - - unless @lines.empty? - if @receive_lines.call(@lines, @watcher) - @watcher.pe.update_pos(io.pos - @fifo.bytesize) - @lines.clear - else - read_more = false - end - end - end while read_more - end - end - end - end - end -end \ No newline at end of file diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index c96e72f558..05a320ac60 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -91,12 +91,12 @@ def create_target_info(path) TMP_DIR = File.dirname(__FILE__) + "/../tmp/tail#{ENV['TEST_ENV_NUMBER']}" - CONFIG = config_element("ROOT", "", { - "path" => "#{TMP_DIR}/tail.txt", + ROOT_CONFIG = config_element("ROOT", "", { "tag" => "t1", "rotate_wait" => "2s", "refresh_interval" => "1s" }) + CONFIG = ROOT_CONFIG + config_element("", "", { "path" => "#{TMP_DIR}/tail.txt" }) COMMON_CONFIG = CONFIG + config_element("", "", { "pos_file" => "#{TMP_DIR}/tail.pos" }) CONFIG_READ_FROM_HEAD = config_element("", "", { "read_from_head" => true }) CONFIG_DISABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false }) @@ -143,6 +143,30 @@ def create_target_info(path) "format_firstline" => "/^[s]/" }) ]) + + PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" + + def create_group_directive(pattern, rate_period, *rules) + config_element("", "", {}, [ + config_element("group", "", { + "pattern" => pattern, + "rate_period" => rate_period + }, rules) + ]) + end + + def create_rule_directive(namespace = [], appname = [], limit) + params = { + "limit" => limit, + } + params["namespace"] = namespace.join(', ') if namespace.size > 0 + params["appname"] = appname.join(', ') if appname.size > 0 + config_element("rule", "", params) + end + + def create_path_element(path) + config_element("source", "", { "path" => "#{TMP_DIR}/#{path}" }) + end def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) config = use_common_conf ? COMMON_CONFIG + conf : conf @@ -150,6 +174,34 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) end sub_test_case "configure" do + test " required" do + conf = create_group_directive('.', '1m') + SINGLE_LINE_CONFIG + assert_raise(Fluent::ConfigError) do + d = create_driver(conf) + end + end + + test "valid configuration" do + rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], 100) + rule2 = create_rule_directive(['namespace-d', 'appname-e'], ['f'], 50) + rule3 = create_rule_directive([], ['appname-g'], -1) + rule4 = create_rule_directive(['appname-h'], [], 0) + + conf = create_group_directive('.', '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG + assert_nothing_raised do + d = create_driver(conf) + end + end + + test "limit should be greater than DEFAULT_LIMIT (-1)" do + rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], -100) + rule2 = create_rule_directive(['namespace-d', 'namespace-e'], ['appname-f'], 50) + conf = create_group_directive('.', '1m', rule1, rule2) + SINGLE_LINE_CONFIG + assert_raise(RuntimeError) do + d = create_driver(conf) + end + end + test "plain single line" do d = create_driver assert_equal ["#{TMP_DIR}/tail.txt"], d.instance.paths @@ -263,6 +315,73 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) end end + sub_test_case "group rules line limit resolution" do + + test "valid" do + rule1 = create_rule_directive(['namespace-a'], ['appname-b', 'appname-c'], 50) + rule2 = create_rule_directive([], ['appname-b', 'appname-c'], 400) + rule3 = create_rule_directive(['namespace-a'], [], 100) + + conf = create_group_directive('.', '1m', rule1, rule2, rule3) + SINGLE_LINE_CONFIG + assert_nothing_raised do + d = create_driver(conf) + + assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-b/].limit + assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-c/].limit + assert_equal 100, d.instance.group_watchers[/namespace\-a/][/./].limit + assert_equal 200, d.instance.group_watchers[/./][/appname\-b/].limit + assert_equal 200, d.instance.group_watchers[/./][/appname\-c/].limit + assert_equal -1, d.instance.group_watchers[/./][/./].limit + end + end + + end + + sub_test_case "files should be placed in groups" do + test "invalid regex pattern places files in default group" do + rule1 = create_rule_directive([], [], 100) ## limits default groups + conf = ROOT_CONFIG + create_group_directive('.', '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG + + d = create_driver(conf, false) + File.open("#{TMP_DIR}/test1.txt", 'w') + File.open("#{TMP_DIR}/test2.txt", 'w') + File.open("#{TMP_DIR}/test3.txt", 'w') + + d.run do + ## checking default group_watcher's paths + assert_equal 3, d.instance.group_watchers[/./][/./].size + assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test1.txt') + assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test2.txt') + assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test3.txt') + end + end + + test "valid regex pattern places file in their respective groups" do + rule1 = create_rule_directive(['test-namespace1'], ['test-appname1'], 100) + rule2 = create_rule_directive(['test-namespace1'], [], 200) + rule3 = create_rule_directive([], ['test-appname2'], 100) + rule4 = create_rule_directive([], [], 100) + + path_element = create_path_element("test-appname*.log") + + conf = ROOT_CONFIG + create_group_directive(PATTERN, '1m', rule1, rule2, rule3, rule4) + path_element + SINGLE_LINE_CONFIG + d = create_driver(conf, false) + + File.open("#{TMP_DIR}/test-appname1_test-namespace1_test-container-15fabq.log", 'w') + File.open("#{TMP_DIR}/test-appname3_test-namespace1_test-container-15fabq.log", 'w') + File.open("#{TMP_DIR}/test-appname2_test-namespace2_test-container-15fabq.log", 'w') + File.open("#{TMP_DIR}/test-appname4_test-namespace3_test-container-15fabq.log", 'w') + + d.run do + assert_true d.instance.group_watchers[/test\-namespace1/][/test\-appname1/].include? File.join(TMP_DIR, "test-appname1_test-namespace1_test-container-15fabq.log") + assert_true d.instance.group_watchers[/test\-namespace1/][/./].include? File.join(TMP_DIR, "test-appname3_test-namespace1_test-container-15fabq.log") + assert_true d.instance.group_watchers[/./][/test\-appname2/].include? File.join(TMP_DIR, "test-appname2_test-namespace2_test-container-15fabq.log") + assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, "test-appname4_test-namespace3_test-container-15fabq.log") + end + end + + end + sub_test_case "singleline" do data(flat: SINGLE_LINE_CONFIG, parse: PARSE_SINGLE_LINE_CONFIG) @@ -2360,4 +2479,82 @@ def test_shutdown_timeout elapsed = Fluent::Clock.now - start_time assert_true(elapsed > 0.5 && elapsed < 2.5) end + + sub_test_case "throttling logs at in_tail level" do + + data("file test1.log no limit 5120 text: msg" => ["test1.log", 5120, "msg"], + "file test2.log no limit 1024 text: test" => ["test2.log", 1024, "test"]) + def test_lines_collected_with_no_throttling(data) + file, num_lines, msg = data + rule = create_rule_directive([], [], -1) + path_element = create_path_element(file) + + conf = ROOT_CONFIG + create_group_directive('.', '10s', rule) + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + File.open("#{TMP_DIR}/#{file}", 'wb') do |f| + num_lines.times do + f.puts "#{msg}\n" + end + end + + + d = create_driver(conf, false) + d.run do + start_time = Fluent::Clock.now + + assert_true Fluent::Clock.now - start_time < 10 + assert_equal num_lines, d.record_count + assert_equal({ "message" => msg }, d.events[0][2]) + + prev_count = d.record_count + ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver + sleep(1) until Fluent::Clock.now - start_time > 12 + ## after waiting for 10 secs, limit will reset + ## Plugin will start reading but it will encounter EOF Error + ## since no logs are left to be read + ## Hence, d.record_count = prev_count + assert_equal 0, d.record_count - prev_count + end + end + + test "lines collected with throttling" do + file = "appname1_namespace12_container-123456.log" + limit = 1000 + rate_period = '10s' + num_lines = 3000 + msg = "a"*8190 # Total size = 8190 bytes + 2 (\n) bytes + + rule = create_rule_directive(['namespace'], ['appname'], limit) + path_element = create_path_element(file) + conf = ROOT_CONFIG + create_group_directive(PATTERN, rate_period, rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD + + d = create_driver(conf, false) + + File.open("#{TMP_DIR}/#{file}", 'wb') do |f| + num_lines.times do + f.puts msg + end + end + + d.run do + start_time = Fluent::Clock.now + prev_count = 0 + + 3.times do + assert_true Fluent::Clock.now - start_time < 10 + ## Check record_count after 10s to check lines reads + assert_equal limit, d.record_count - prev_count + prev_count = d.record_count + ## sleep until rate_period seconds are over so that + ## Plugin can read lines again + sleep(1) until Fluent::Clock.now - start_time > 12 + ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver + start_time = Fluent::Clock.now + end + ## When all the lines are read and rate_period seconds are over + ## limit will reset and since there are no more logs to be read, + ## number_lines_read will be 0 + assert_equal 0, d.instance.group_watchers[/namespace/][/appname/].current_paths["#{TMP_DIR}/#{file}"].number_lines_read + end + end + end end diff --git a/test/plugin/test_in_tail_with_throttle.rb b/test/plugin/test_in_tail_with_throttle.rb deleted file mode 100644 index daaa4c9161..0000000000 --- a/test/plugin/test_in_tail_with_throttle.rb +++ /dev/null @@ -1,308 +0,0 @@ -require_relative '../helper' - -require 'fluent/test' -require 'fluent/test/helpers' -require 'fluent/test/driver/input' -require 'fluent/plugin/in_tail_with_throttle' - -class ThrottleInputTest < Test::Unit::TestCase - - def setup - Fluent::Test.setup - cleanup_directory(TMP_DIR) - end - - def teardown - super - cleanup_directory(TMP_DIR) - Fluent::Engine.stop - end - - def cleanup_directory(path) - unless Dir.exist?(path) - FileUtils.mkdir_p(path) - return - end - - if Fluent.windows? - Dir.glob("*", base: path).each do |name| - begin - cleanup_file(File.join(path, name)) - rescue - # expect test driver block release already owned file handle. - end - end - else - begin - FileUtils.rm_f(path, secure:true) - rescue ArgumentError - FileUtils.rm_f(path) # For Ruby 2.6 or before. - end - if File.exist?(path) - FileUtils.remove_entry_secure(path, true) - end - end - FileUtils.mkdir_p(path) - end - - def cleanup_file(path) - if Fluent.windows? - # On Windows, when the file or directory is removed and created - # frequently, there is a case that creating file or directory will - # fail. This situation is caused by pending file or directory - # deletion which is mentioned on win32 API document [1] - # As a workaround, execute rename and remove method. - # - # [1] https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#files - # - file = File.join(Dir.tmpdir, SecureRandom.hex(10)) - begin - FileUtils.mv(path, file) - FileUtils.rm_rf(file, secure: true) - rescue ArgumentError - FileUtils.rm_rf(file) # For Ruby 2.6 or before. - end - if File.exist?(file) - # ensure files are closed for Windows, on which deleted files - # are still visible from filesystem - GC.start(full_mark: true, immediate_mark: true, immediate_sweep: true) - FileUtils.remove_entry_secure(file, true) - end - else - begin - FileUtils.rm_f(path, secure: true) - rescue ArgumentError - FileUtils.rm_f(path) # For Ruby 2.6 or before. - end - if File.exist?(path) - FileUtils.remove_entry_secure(path, true) - end - end - end - - TMP_DIR = File.dirname(__FILE__) + "/../tmp/tail_with_throttle#{ENV['TEST_ENV_NUMBER']}" - - def create_group_directive(pattern, rate_period, *rules) - config_element("", "", {}, [ - config_element("group", "", { - "pattern" => pattern, - "rate_period" => rate_period - }, rules) - ]) - end - - def create_rule_directive(namespace = [], appname = [], limit) - params = { - "limit" => limit, - } - params["namespace"] = namespace.join(', ') if namespace.size > 0 - params["appname"] = appname.join(', ') if appname.size > 0 - config_element("rule", "", params) - end - - def create_path_element(path) - config_element("source", "", { "path" => "#{TMP_DIR}/#{path}" }) - end - - def create_driver(conf, add_path = true) - conf = add_path ? conf + create_path_element("tail.txt") : conf - Fluent::Test::Driver::Input.new(Fluent::Plugin::ThrottleInput).configure(conf) - end - - CONFG = config_element("source", "", { - "@type" => "tail_with_throttle", - "tag" => "t1", - "pos_file" => "#{TMP_DIR}/tail.pos", - "refresh_interval" => "1s", - "rotate_wait" => "2s", - }, [ - config_element("parse", "", { - "@type" => "/(?.*)/" }) - ] - ) - PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" - READ_FROM_HEAD = config_element("source", "", { "read_from_head" => "true" }) - - sub_test_case "#configuration" do - - test " required" do - assert_raise(Fluent::ConfigError) do - d = create_driver(CONFG) - end - end - - test " required" do - conf = CONFG + create_group_directive('.', '1m') - assert_raise(Fluent::ConfigError) do - d = create_driver(conf) - end - end - - test "valid configuration" do - rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], 100) - rule2 = create_rule_directive(['namespace-d', 'appname-e'], ['f'], 50) - rule3 = create_rule_directive([], ['appname-g'], -1) - rule4 = create_rule_directive(['appname-h'], [], 0) - - conf = CONFG + create_group_directive('.', '1m', rule1, rule2, rule3, rule4) - assert_nothing_raised do - d = create_driver(conf) - end - end - - test "limit should be greater than DEFAULT_LIMIT (-1)" do - rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], -100) - rule2 = create_rule_directive(['namespace-d', 'namespace-e'], ['appname-f'], 50) - conf = CONFG + create_group_directive('.', '1m', rule1, rule2) - assert_raise(RuntimeError) do - d = create_driver(conf) - end - end - - end - - sub_test_case "group rules line limit resolution" do - - test "valid" do - rule1 = create_rule_directive(['namespace-a'], ['appname-b', 'appname-c'], 50) - rule2 = create_rule_directive([], ['appname-b', 'appname-c'], 400) - rule3 = create_rule_directive(['namespace-a'], [], 100) - - conf = CONFG + create_group_directive('.', '1m', rule1, rule2, rule3) - assert_nothing_raised do - d = create_driver(conf) - - assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-b/].limit - assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-c/].limit - assert_equal 100, d.instance.group_watchers[/namespace\-a/][/./].limit - assert_equal 200, d.instance.group_watchers[/./][/appname\-b/].limit - assert_equal 200, d.instance.group_watchers[/./][/appname\-c/].limit - assert_equal -1, d.instance.group_watchers[/./][/./].limit - end - end - - end - - sub_test_case "files should be placed in groups" do - test "invalid regex pattern places files in default group" do - rule1 = create_rule_directive([], [], 100) ## limits default groups - conf = CONFG + create_group_directive('.', '1m', rule1) + create_path_element("test*.txt") - - d = create_driver(conf, false) - File.open("#{TMP_DIR}/test1.txt", 'w') - File.open("#{TMP_DIR}/test2.txt", 'w') - File.open("#{TMP_DIR}/test3.txt", 'w') - - d.run do - ## checking default group_watcher's paths - assert_equal 3, d.instance.group_watchers[/./][/./].size - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test1.txt') - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test2.txt') - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test3.txt') - end - end - - test "valid regex pattern places file in their respective groups" do - rule1 = create_rule_directive(['test-namespace1'], ['test-appname1'], 100) - rule2 = create_rule_directive(['test-namespace1'], [], 200) - rule3 = create_rule_directive([], ['test-appname2'], 100) - rule4 = create_rule_directive([], [], 100) - - path_element = create_path_element("test-appname*.log") - - conf = CONFG + create_group_directive(PATTERN, '1m', rule1, rule2, rule3, rule4) + path_element - d = create_driver(conf, false) - - File.open("#{TMP_DIR}/test-appname1_test-namespace1_test-container-15fabq.log", 'w') - File.open("#{TMP_DIR}/test-appname3_test-namespace1_test-container-15fabq.log", 'w') - File.open("#{TMP_DIR}/test-appname2_test-namespace2_test-container-15fabq.log", 'w') - File.open("#{TMP_DIR}/test-appname4_test-namespace3_test-container-15fabq.log", 'w') - - d.run do - assert_true d.instance.group_watchers[/test\-namespace1/][/test\-appname1/].include? File.join(TMP_DIR, "test-appname1_test-namespace1_test-container-15fabq.log") - assert_true d.instance.group_watchers[/test\-namespace1/][/./].include? File.join(TMP_DIR, "test-appname3_test-namespace1_test-container-15fabq.log") - assert_true d.instance.group_watchers[/./][/test\-appname2/].include? File.join(TMP_DIR, "test-appname2_test-namespace2_test-container-15fabq.log") - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, "test-appname4_test-namespace3_test-container-15fabq.log") - end - end - - end - - sub_test_case "throttling logs at in_tail level" do - - data("file test1.log no limit 5120 text: msg" => ["test1.log", 5120, "msg"], - "file test2.log no limit 1024 text: test" => ["test2.log", 1024, "test"]) - def test_lines_collected_with_no_throttling(data) - file, num_lines, msg = data - rule = create_rule_directive([], [], -1) - path_element = create_path_element(file) - - conf = CONFG + create_group_directive('.', '10s', rule) + path_element + READ_FROM_HEAD - File.open("#{TMP_DIR}/#{file}", 'wb') do |f| - num_lines.times do - f.puts "#{msg}\n" - end - end - - - d = create_driver(conf, false) - d.run do - start_time = Time.now - - assert_true Time.now - start_time < 10 - assert_equal num_lines, d.record_count - assert_equal({ "message" => msg }, d.events[0][2]) - - prev_count = d.record_count - ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver - sleep(1) until Time.now - start_time > 12 - ## after waiting for 10 secs, limit will reset - ## Plugin will start reading but it will encounter EOF Error - ## since no logs are left to be read - ## Hence, d.record_count = prev_count - assert_equal 0, d.record_count - prev_count - end - end - - test "lines collected with throttling" do - file = "appname1_namespace12_container-123456.log" - limit = 1000 - rate_period = '10s' - num_lines = 3000 - msg = "a"*8190 # Total size = 8190 bytes + 2 (\n) bytes - - rule = create_rule_directive(['namespace'], ['appname'], limit) - path_element = create_path_element(file) - conf = CONFG + create_group_directive(PATTERN, rate_period, rule) + path_element + READ_FROM_HEAD - - d = create_driver(conf, false) - - File.open("#{TMP_DIR}/#{file}", 'wb') do |f| - num_lines.times do - f.puts msg - end - end - - d.run do - start_time = Time.now - prev_count = 0 - - 3.times do - assert_true Time.now - start_time < 10 - ## Check record_count after 10s to check lines reads - assert_equal limit, d.record_count - prev_count - prev_count = d.record_count - ## sleep until rate_period seconds are over so that - ## Plugin can read lines again - sleep(1) until Time.now - start_time > 12 - ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver - start_time = Time.now - end - ## When all the lines are read and rate_period seconds are over - ## limit will reset and since there are no more logs to be read, - ## number_lines_read will be 0 - assert_equal 0, d.instance.group_watchers[/namespace/][/appname/].current_paths["#{TMP_DIR}/#{file}"].number_lines_read - end - end - end -end From 41d348f3ce2bacf9e5b50316ebe7a4802d64262d Mon Sep 17 00:00:00 2001 From: Pranjal Gupta Date: Sun, 6 Feb 2022 01:56:00 +0530 Subject: [PATCH 06/23] in_tail: Add `group.match` parameter for specifying generic rules Signed-off-by: Pranjal Gupta --- lib/fluent/plugin/in_tail.rb | 107 ++++++++++--------- test/plugin/test_in_tail.rb | 192 +++++++++++++++++++++++++---------- 2 files changed, 189 insertions(+), 110 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index cc19fa3c02..775b3ae32a 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -40,8 +40,9 @@ class TailInput < Fluent::Plugin::Input RESERVED_CHARS = ['/', '*', '%'].freeze MetricsInfo = Struct.new(:opened, :closed, :rotated) - DEFAULT_NAMESPACE = DEFAULT_APPNAME = /./ + DEFAULT_KEY = /.*/ DEFAULT_LIMIT = -1 + REGEXP_JOIN = "_" class WatcherSetupError < StandardError def initialize(msg) @@ -65,7 +66,8 @@ def initialize @startup = true # Map rules with GroupWatcher objects @group_watchers = {} - @sorted_path = nil + @group_keys = nil + @default_group_key = nil end desc 'The paths to read. Multiple paths can be specified, separated by comma.' @@ -129,22 +131,21 @@ def initialize config_section :group, param_name: :group, required: false, multi: false do desc 'Regex for extracting group\'s metadata' config_param :pattern, - :regexp, - default: /var\/log\/containers\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$/ + :regexp, + default: /^\/var\/log\/containers\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$/ + desc 'Period of time in which the group_line_limit is applied' config_param :rate_period, :time, default: 5 - config_section :rule, multi: true, required: true do - desc 'Namespace key' - config_param :namespace, :array, value_type: :string, default: [DEFAULT_NAMESPACE] - desc 'App name key' - config_param :appname, :array, value_type: :string, default: [DEFAULT_APPNAME] + config_section :rule, param_name: :rule, required: true, multi: true do + desc 'Key-value pairs for grouping' + config_param :match, :hash, value_type: :regexp, default: {namespace: [DEFAULT_KEY], appname: [DEFAULT_KEY]} desc 'Maximum number of log lines allowed per group over a period of rate_period' config_param :limit, :integer, default: DEFAULT_LIMIT end end - attr_reader :paths, :group_watchers + attr_reader :paths, :group_watchers, :default_group_key def configure(conf) @variable_store = Fluent::VariableStore.fetch_or_build(:in_tail) @@ -221,12 +222,22 @@ def configure(conf) end end - ## Ensuring correct time period syntax - @group.rule.each { |rule| - raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= DEFAULT_LIMIT - } unless @group.nil? - - construct_groupwatchers unless @group.nil? + unless @group.nil? + ## Ensuring correct time period syntax + @group.rule.each { |rule| + raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= DEFAULT_LIMIT + } + + @group_keys = Regexp.compile(@group.pattern).named_captures.keys + @default_group_key = ([DEFAULT_KEY] * @group_keys.length).join(REGEXP_JOIN) + + ## Ensures that "specific" rules (with larger number of `rule.match` keys) + ## have a higher priority against "generic" rules (with less number of `rule.match` keys). + ## This will be helpful when a file satisfies more than one rule. + @group.rule.sort_by!{ |rule| -rule.match.length() } + construct_groupwatchers + @group_watchers[@default_group_key] ||= GroupWatcher.new(@group.rate_period, DEFAULT_LIMIT) + end opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files") closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files") @@ -267,46 +278,38 @@ def parse_encoding_param(encoding_name) end end - def construct_groupwatchers - @group.rule.each { |rule| - num_groups = rule.namespace.size * rule.appname.size - - rule.namespace.each { |namespace| - namespace = /#{Regexp.quote(namespace)}/ unless namespace.eql?(DEFAULT_NAMESPACE) - @group_watchers[namespace] ||= {} - - rule.appname.each { |appname| - appname = /#{Regexp.quote(appname)}/ unless appname.eql?(DEFAULT_APPNAME) - @group_watchers[namespace][appname] = GroupWatcher.new(@group.rate_period, rule.limit/num_groups) - } - - @group_watchers[namespace][DEFAULT_APPNAME] ||= GroupWatcher.new(@group.rate_period) - } + def construct_group_key(named_captures) + match_rule = [] + @group_keys.each { |key| + match_rule.append(named_captures.fetch(key, DEFAULT_KEY)) } + match_rule = match_rule.join(REGEXP_JOIN) - if @group_watchers.dig(DEFAULT_NAMESPACE, DEFAULT_APPNAME).nil? - @group_watchers[DEFAULT_NAMESPACE] ||= {} - @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] = GroupWatcher.new(@group.rate_period) - end + match_rule end - def find_group(namespace, appname) - namespace_key = @group_watchers.keys.find { |regexp| namespace.match?(regexp) && regexp != DEFAULT_NAMESPACE } - namespace_key ||= DEFAULT_NAMESPACE - - appname_key = @group_watchers[namespace_key].keys.find { |regexp| appname.match?(regexp) && regexp != DEFAULT_APPNAME } - appname_key ||= DEFAULT_APPNAME + def construct_groupwatchers + @group.rule.each { |rule| + match_rule = construct_group_key(rule.match) + @group_watchers[match_rule] ||= GroupWatcher.new(@group.rate_period, rule.limit) + } + end - @group_watchers[namespace_key][appname_key] + def find_group(metadata) + metadata_key = construct_group_key(metadata) + gw_key = @group_watchers.keys.find{ |regexp| metadata_key.match?(regexp) && regexp != @default_group_key} + gw_key ||= @default_group_key + + @group_watchers[gw_key] end def find_group_from_metadata(path) begin - metadata = @group.pattern.match(path) - group_watcher = find_group(metadata['namespace'], metadata['appname']) + metadata = @group.pattern.match(path).named_captures + group_watcher = find_group(metadata) rescue => e - $log.warn "Cannot find group from metadata, Adding file in the default group" - group_watcher = @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] + log.warn "Cannot find group from metadata, Adding file in the default group" + group_watcher = @group_watchers[@default_group_key] end group_watcher @@ -636,7 +639,7 @@ def detach_watcher_after_rotate_wait(tw, ino) if @open_on_every_update # Detach now because it's already closed, waiting it doesn't make sense. detach_watcher(tw, ino) - elsif @read_bytes_limit_per_second < 0 || (!tw.group_watcher.nil? && tw.group_watcher.limit <= 0) + elsif @read_bytes_limit_per_second < 0 || tw.group_watcher&.limit <= 0 # throttling isn't enabled, just wait @rotate_wait timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do detach_watcher(tw, ino) @@ -913,7 +916,6 @@ def to_s end end - class TailWatcher def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) @path = target_info.path @@ -938,10 +940,6 @@ def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watch attr_reader :watchers attr_accessor :group_watcher - def group_watcher=(group_watcher) - @group_watcher = group_watcher - end - def tag @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') end @@ -1224,7 +1222,7 @@ def should_shutdown_now? def handle_notify return if limit_bytes_per_second_reached? - return if !group_watcher.nil? && group_watcher.limit_lines_reached?(@path) + return if group_watcher&.limit_lines_reached?(@path) with_io do |io| begin @@ -1243,7 +1241,8 @@ def handle_notify @fifo.read_lines(@lines) group_watcher.update_lines_read(@path, @lines.size) unless group_watcher.nil? - if !group_watcher.nil? && group_watcher.limit_lines_reached?(@path) || should_shutdown_now? + if group_watcher&.limit_lines_reached?(@path) || should_shutdown_now? + @log.debug "Reading Limit exceeded #{@path} #{group_watcher.number_lines_read}" unless should_shutdown_now? read_more = false break end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 05a320ac60..a352f681b0 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -144,7 +144,10 @@ def create_target_info(path) }) ]) - PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" + PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" + DEBUG_LOG_LEVEL = config_element("", "", { + "@log_level" => "debug" + }) def create_group_directive(pattern, rate_period, *rules) config_element("", "", {}, [ @@ -155,12 +158,12 @@ def create_group_directive(pattern, rate_period, *rules) ]) end - def create_rule_directive(namespace = [], appname = [], limit) + def create_rule_directive(match_named_captures, limit) params = { "limit" => limit, + "match" => match_named_captures, } - params["namespace"] = namespace.join(', ') if namespace.size > 0 - params["appname"] = appname.join(', ') if appname.size > 0 + config_element("rule", "", params) end @@ -182,21 +185,37 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) end test "valid configuration" do - rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], 100) - rule2 = create_rule_directive(['namespace-d', 'appname-e'], ['f'], 50) - rule3 = create_rule_directive([], ['appname-g'], -1) - rule4 = create_rule_directive(['appname-h'], [], 0) - - conf = create_group_directive('.', '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG + rule1 = create_rule_directive({ + "namespace"=> "/namespace-a/", + "podname"=> "/podname-[b|c]/" + }, 100) + rule2 = create_rule_directive({ + "namespace"=> "/namespace-[d|e]/", + "podname"=> "/podname-f/", + }, 50) + rule3 = create_rule_directive({ + "podname"=> "/podname-g/", + }, -1) + rule4 = create_rule_directive({ + "namespace"=> "/namespace-h/", + }, 0) + + conf = create_group_directive(PATTERN, '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG assert_nothing_raised do d = create_driver(conf) end end - test "limit should be greater than DEFAULT_LIMIT (-1)" do - rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], -100) - rule2 = create_rule_directive(['namespace-d', 'namespace-e'], ['appname-f'], 50) - conf = create_group_directive('.', '1m', rule1, rule2) + SINGLE_LINE_CONFIG + test "limit should be greater than DEFAULT_LIMIT (-1)" do + rule1 = create_rule_directive({ + "namespace"=> "/namespace-a/", + "podname"=> "/podname-[b|c]/", + }, -100) + rule2 = create_rule_directive({ + "namespace"=> "/namespace-[d|e]/", + "podname"=> "/podname-f/", + }, 50) + conf = create_group_directive(PATTERN, '1m', rule1, rule2) + SINGLE_LINE_CONFIG assert_raise(RuntimeError) do d = create_driver(conf) end @@ -318,20 +337,52 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) sub_test_case "group rules line limit resolution" do test "valid" do - rule1 = create_rule_directive(['namespace-a'], ['appname-b', 'appname-c'], 50) - rule2 = create_rule_directive([], ['appname-b', 'appname-c'], 400) - rule3 = create_rule_directive(['namespace-a'], [], 100) + rule1 = create_rule_directive({ + "namespace"=> "/namespace-a/", + "podname"=> "/podname-[b|c]/", + }, 50) + rule2 = create_rule_directive({ + "podname"=> "/podname-[b|c]/", + }, 400) + rule3 = create_rule_directive({ + "namespace"=> "/namespace-a/", + }, 100) - conf = create_group_directive('.', '1m', rule1, rule2, rule3) + SINGLE_LINE_CONFIG + conf = create_group_directive(PATTERN, '1m', rule3, rule1, rule2) + SINGLE_LINE_CONFIG assert_nothing_raised do d = create_driver(conf) + instance = d.instance + + metadata = { + "namespace"=> "namespace-a", + "podname"=> "podname-b", + } + assert_equal 50, instance.find_group(metadata).limit + + metadata = { + "namespace" => "namespace-a", + "podname" => "podname-c", + } + assert_equal 50, instance.find_group(metadata).limit + + metadata = { + "namespace" => "namespace-a", + "podname" => "podname-d", + } + assert_equal 100, instance.find_group(metadata).limit + + metadata = { + "namespace" => "namespace-f", + "podname" => "podname-b", + } + assert_equal 400, instance.find_group(metadata).limit + + metadata = { + "podname" => "podname-c", + } + assert_equal 400, instance.find_group(metadata).limit - assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-b/].limit - assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-c/].limit - assert_equal 100, d.instance.group_watchers[/namespace\-a/][/./].limit - assert_equal 200, d.instance.group_watchers[/./][/appname\-b/].limit - assert_equal 200, d.instance.group_watchers[/./][/appname\-c/].limit - assert_equal -1, d.instance.group_watchers[/./][/./].limit + assert_equal -1, instance.find_group({}).limit end end @@ -339,8 +390,8 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) sub_test_case "files should be placed in groups" do test "invalid regex pattern places files in default group" do - rule1 = create_rule_directive([], [], 100) ## limits default groups - conf = ROOT_CONFIG + create_group_directive('.', '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG + rule1 = create_rule_directive({}, 100) ## limits default groups + conf = ROOT_CONFIG + DEBUG_LOG_LEVEL + create_group_directive(PATTERN, '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG d = create_driver(conf, false) File.open("#{TMP_DIR}/test1.txt", 'w') @@ -349,34 +400,51 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) d.run do ## checking default group_watcher's paths - assert_equal 3, d.instance.group_watchers[/./][/./].size - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test1.txt') - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test2.txt') - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test3.txt') + instance = d.instance + key = instance.default_group_key + + assert_equal 3, instance.log.logs.count{|a| a.match?("Cannot find group from metadata, Adding file in the default group\n")} + assert_equal 3, instance.group_watchers[key].size + assert_true instance.group_watchers[key].include? File.join(TMP_DIR, 'test1.txt') + assert_true instance.group_watchers[key].include? File.join(TMP_DIR, 'test2.txt') + assert_true instance.group_watchers[key].include? File.join(TMP_DIR, 'test3.txt') end end test "valid regex pattern places file in their respective groups" do - rule1 = create_rule_directive(['test-namespace1'], ['test-appname1'], 100) - rule2 = create_rule_directive(['test-namespace1'], [], 200) - rule3 = create_rule_directive([], ['test-appname2'], 100) - rule4 = create_rule_directive([], [], 100) - - path_element = create_path_element("test-appname*.log") - - conf = ROOT_CONFIG + create_group_directive(PATTERN, '1m', rule1, rule2, rule3, rule4) + path_element + SINGLE_LINE_CONFIG + rule1 = create_rule_directive({ + "namespace"=> "/test-namespace1/", + "podname"=> "/test-podname1/", + }, 100) + rule2 = create_rule_directive({ + "namespace"=> "/test-namespace1/", + }, 200) + rule3 = create_rule_directive({ + "podname"=> "/test-podname2/", + }, 300) + rule4 = create_rule_directive({}, 400) + + path_element = create_path_element("test-podname*.log") + + conf = ROOT_CONFIG + create_group_directive(PATTERN, '1m', rule4, rule3, rule2, rule1) + path_element + SINGLE_LINE_CONFIG d = create_driver(conf, false) - File.open("#{TMP_DIR}/test-appname1_test-namespace1_test-container-15fabq.log", 'w') - File.open("#{TMP_DIR}/test-appname3_test-namespace1_test-container-15fabq.log", 'w') - File.open("#{TMP_DIR}/test-appname2_test-namespace2_test-container-15fabq.log", 'w') - File.open("#{TMP_DIR}/test-appname4_test-namespace3_test-container-15fabq.log", 'w') + file1 = File.join(TMP_DIR, "test-podname1_test-namespace1_test-container-15fabq.log") + file2 = File.join(TMP_DIR, "test-podname3_test-namespace1_test-container-15fabq.log") + file3 = File.join(TMP_DIR, "test-podname2_test-namespace2_test-container-15fabq.log") + file4 = File.join(TMP_DIR, "test-podname4_test-namespace3_test-container-15fabq.log") d.run do - assert_true d.instance.group_watchers[/test\-namespace1/][/test\-appname1/].include? File.join(TMP_DIR, "test-appname1_test-namespace1_test-container-15fabq.log") - assert_true d.instance.group_watchers[/test\-namespace1/][/./].include? File.join(TMP_DIR, "test-appname3_test-namespace1_test-container-15fabq.log") - assert_true d.instance.group_watchers[/./][/test\-appname2/].include? File.join(TMP_DIR, "test-appname2_test-namespace2_test-container-15fabq.log") - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, "test-appname4_test-namespace3_test-container-15fabq.log") + File.open(file1, 'w') + File.open(file2, 'w') + File.open(file3, 'w') + File.open(file4, 'w') + + instance = d.instance + assert_equal 100, instance.find_group_from_metadata(file1).limit + assert_equal 200, instance.find_group_from_metadata(file2).limit + assert_equal 300, instance.find_group_from_metadata(file3).limit + assert_equal 400, instance.find_group_from_metadata(file4).limit end end @@ -2482,14 +2550,20 @@ def test_shutdown_timeout sub_test_case "throttling logs at in_tail level" do - data("file test1.log no limit 5120 text: msg" => ["test1.log", 5120, "msg"], - "file test2.log no limit 1024 text: test" => ["test2.log", 1024, "test"]) + data("file test1.log no_limit 5120 text: msg" => ["test1.log", 5120, "msg"], + "file test2.log no_limit 1024 text: test" => ["test2.log", 1024, "test"]) def test_lines_collected_with_no_throttling(data) file, num_lines, msg = data - rule = create_rule_directive([], [], -1) + + pattern = "/^#{TMP_DIR}\/(?.+)\.log$/" + rule = create_rule_directive({ + "file" => "/test.*/", + }, -1) + group = create_group_directive(pattern, '10s', rule) path_element = create_path_element(file) - conf = ROOT_CONFIG + create_group_directive('.', '10s', rule) + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + conf = ROOT_CONFIG + group + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + File.open("#{TMP_DIR}/#{file}", 'wb') do |f| num_lines.times do f.puts "#{msg}\n" @@ -2517,19 +2591,23 @@ def test_lines_collected_with_no_throttling(data) end test "lines collected with throttling" do - file = "appname1_namespace12_container-123456.log" + file = "podname1_namespace12_container-123456.log" limit = 1000 rate_period = '10s' num_lines = 3000 msg = "a"*8190 # Total size = 8190 bytes + 2 (\n) bytes - rule = create_rule_directive(['namespace'], ['appname'], limit) + rule = create_rule_directive({ + "namespace"=> "/namespace.+/", + "podname"=> "/podname.+/", + }, limit) path_element = create_path_element(file) conf = ROOT_CONFIG + create_group_directive(PATTERN, rate_period, rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD d = create_driver(conf, false) - - File.open("#{TMP_DIR}/#{file}", 'wb') do |f| + file_path = "#{TMP_DIR}/#{file}" + + File.open(file_path, 'wb') do |f| num_lines.times do f.puts msg end @@ -2539,7 +2617,7 @@ def test_lines_collected_with_no_throttling(data) start_time = Fluent::Clock.now prev_count = 0 - 3.times do + (num_lines/limit).times do assert_true Fluent::Clock.now - start_time < 10 ## Check record_count after 10s to check lines reads assert_equal limit, d.record_count - prev_count @@ -2553,7 +2631,9 @@ def test_lines_collected_with_no_throttling(data) ## When all the lines are read and rate_period seconds are over ## limit will reset and since there are no more logs to be read, ## number_lines_read will be 0 - assert_equal 0, d.instance.group_watchers[/namespace/][/appname/].current_paths["#{TMP_DIR}/#{file}"].number_lines_read + + gw = d.instance.find_group_from_metadata(file_path) + assert_equal 0, gw.current_paths[file_path].number_lines_read end end end From 8c466a5e4c9c3680fe152034583c3c5b8f9bbb18 Mon Sep 17 00:00:00 2001 From: Pranjal Gupta Date: Thu, 7 Apr 2022 01:41:48 +0530 Subject: [PATCH 07/23] Fix default_group_key typo Signed-off-by: Pranjal Gupta --- lib/fluent/plugin/in_tail.rb | 4 ++-- test/plugin/test_in_tail.rb | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 775b3ae32a..f08e741f38 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -139,7 +139,7 @@ def initialize config_section :rule, param_name: :rule, required: true, multi: true do desc 'Key-value pairs for grouping' - config_param :match, :hash, value_type: :regexp, default: {namespace: [DEFAULT_KEY], appname: [DEFAULT_KEY]} + config_param :match, :hash, value_type: :regexp, default: {namespace: [DEFAULT_KEY], podname: [DEFAULT_KEY]} desc 'Maximum number of log lines allowed per group over a period of rate_period' config_param :limit, :integer, default: DEFAULT_LIMIT end @@ -639,7 +639,7 @@ def detach_watcher_after_rotate_wait(tw, ino) if @open_on_every_update # Detach now because it's already closed, waiting it doesn't make sense. detach_watcher(tw, ino) - elsif @read_bytes_limit_per_second < 0 || tw.group_watcher&.limit <= 0 + elsif @read_bytes_limit_per_second < 0 || (!tw.group_watcher.nil? && tw.group_watcher.limit <= 0) # throttling isn't enabled, just wait @rotate_wait timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do detach_watcher(tw, ino) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index a352f681b0..8fb1bfa6c7 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -144,7 +144,7 @@ def create_target_info(path) }) ]) - PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" + TAILING_GROUP_PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" DEBUG_LOG_LEVEL = config_element("", "", { "@log_level" => "debug" }) @@ -200,7 +200,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) "namespace"=> "/namespace-h/", }, 0) - conf = create_group_directive(PATTERN, '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG + conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG assert_nothing_raised do d = create_driver(conf) end @@ -215,7 +215,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) "namespace"=> "/namespace-[d|e]/", "podname"=> "/podname-f/", }, 50) - conf = create_group_directive(PATTERN, '1m', rule1, rule2) + SINGLE_LINE_CONFIG + conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2) + SINGLE_LINE_CONFIG assert_raise(RuntimeError) do d = create_driver(conf) end @@ -348,7 +348,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) "namespace"=> "/namespace-a/", }, 100) - conf = create_group_directive(PATTERN, '1m', rule3, rule1, rule2) + SINGLE_LINE_CONFIG + conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule3, rule1, rule2) + SINGLE_LINE_CONFIG assert_nothing_raised do d = create_driver(conf) instance = d.instance @@ -391,7 +391,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) sub_test_case "files should be placed in groups" do test "invalid regex pattern places files in default group" do rule1 = create_rule_directive({}, 100) ## limits default groups - conf = ROOT_CONFIG + DEBUG_LOG_LEVEL + create_group_directive(PATTERN, '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG + conf = ROOT_CONFIG + DEBUG_LOG_LEVEL + create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG d = create_driver(conf, false) File.open("#{TMP_DIR}/test1.txt", 'w') @@ -426,7 +426,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) path_element = create_path_element("test-podname*.log") - conf = ROOT_CONFIG + create_group_directive(PATTERN, '1m', rule4, rule3, rule2, rule1) + path_element + SINGLE_LINE_CONFIG + conf = ROOT_CONFIG + create_group_directive(TAILING_GROUP_PATTERN, '1m', rule4, rule3, rule2, rule1) + path_element + SINGLE_LINE_CONFIG d = create_driver(conf, false) file1 = File.join(TMP_DIR, "test-podname1_test-namespace1_test-container-15fabq.log") @@ -2602,7 +2602,7 @@ def test_lines_collected_with_no_throttling(data) "podname"=> "/podname.+/", }, limit) path_element = create_path_element(file) - conf = ROOT_CONFIG + create_group_directive(PATTERN, rate_period, rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD + conf = ROOT_CONFIG + create_group_directive(TAILING_GROUP_PATTERN, rate_period, rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD d = create_driver(conf, false) file_path = "#{TMP_DIR}/#{file}" From fe4f91588eb655410123e499c6b63f27d1f92298 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Fri, 6 May 2022 22:33:52 +0900 Subject: [PATCH 08/23] in_tail: Extract group watcher related codes to in_tail/group_watcher.rb Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 157 +---------------- lib/fluent/plugin/in_tail/group_watcher.rb | 191 +++++++++++++++++++++ 2 files changed, 195 insertions(+), 153 deletions(-) create mode 100644 lib/fluent/plugin/in_tail/group_watcher.rb diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index f08e741f38..68f617daa9 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -24,6 +24,7 @@ require 'fluent/variable_store' require 'fluent/capability' require 'fluent/plugin/in_tail/position_file' +require 'fluent/plugin/in_tail/group_watcher' if Fluent.windows? require_relative 'file_wrapper' @@ -33,6 +34,8 @@ module Fluent::Plugin class TailInput < Fluent::Plugin::Input + include GroupWatch + Fluent::Plugin.register_input('tail', self) helpers :timer, :event_loop, :parser, :compat_parameters @@ -40,10 +43,6 @@ class TailInput < Fluent::Plugin::Input RESERVED_CHARS = ['/', '*', '%'].freeze MetricsInfo = Struct.new(:opened, :closed, :rotated) - DEFAULT_KEY = /.*/ - DEFAULT_LIMIT = -1 - REGEXP_JOIN = "_" - class WatcherSetupError < StandardError def initialize(msg) @message = msg @@ -64,10 +63,6 @@ def initialize @shutdown_start_time = nil @metrics = nil @startup = true - # Map rules with GroupWatcher objects - @group_watchers = {} - @group_keys = nil - @default_group_key = nil end desc 'The paths to read. Multiple paths can be specified, separated by comma.' @@ -128,24 +123,7 @@ def initialize config_argument :usage, :string, default: 'in_tail_parser' end - config_section :group, param_name: :group, required: false, multi: false do - desc 'Regex for extracting group\'s metadata' - config_param :pattern, - :regexp, - default: /^\/var\/log\/containers\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$/ - - desc 'Period of time in which the group_line_limit is applied' - config_param :rate_period, :time, default: 5 - - config_section :rule, param_name: :rule, required: true, multi: true do - desc 'Key-value pairs for grouping' - config_param :match, :hash, value_type: :regexp, default: {namespace: [DEFAULT_KEY], podname: [DEFAULT_KEY]} - desc 'Maximum number of log lines allowed per group over a period of rate_period' - config_param :limit, :integer, default: DEFAULT_LIMIT - end - end - - attr_reader :paths, :group_watchers, :default_group_key + attr_reader :paths def configure(conf) @variable_store = Fluent::VariableStore.fetch_or_build(:in_tail) @@ -221,24 +199,6 @@ def configure(conf) @read_bytes_limit_per_second = min_bytes end end - - unless @group.nil? - ## Ensuring correct time period syntax - @group.rule.each { |rule| - raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= DEFAULT_LIMIT - } - - @group_keys = Regexp.compile(@group.pattern).named_captures.keys - @default_group_key = ([DEFAULT_KEY] * @group_keys.length).join(REGEXP_JOIN) - - ## Ensures that "specific" rules (with larger number of `rule.match` keys) - ## have a higher priority against "generic" rules (with less number of `rule.match` keys). - ## This will be helpful when a file satisfies more than one rule. - @group.rule.sort_by!{ |rule| -rule.match.length() } - construct_groupwatchers - @group_watchers[@default_group_key] ||= GroupWatcher.new(@group.rate_period, DEFAULT_LIMIT) - end - opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files") closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files") rotated_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_rotated_total", help_text: "Total number of rotated files") @@ -278,43 +238,6 @@ def parse_encoding_param(encoding_name) end end - def construct_group_key(named_captures) - match_rule = [] - @group_keys.each { |key| - match_rule.append(named_captures.fetch(key, DEFAULT_KEY)) - } - match_rule = match_rule.join(REGEXP_JOIN) - - match_rule - end - - def construct_groupwatchers - @group.rule.each { |rule| - match_rule = construct_group_key(rule.match) - @group_watchers[match_rule] ||= GroupWatcher.new(@group.rate_period, rule.limit) - } - end - - def find_group(metadata) - metadata_key = construct_group_key(metadata) - gw_key = @group_watchers.keys.find{ |regexp| metadata_key.match?(regexp) && regexp != @default_group_key} - gw_key ||= @default_group_key - - @group_watchers[gw_key] - end - - def find_group_from_metadata(path) - begin - metadata = @group.pattern.match(path).named_captures - group_watcher = find_group(metadata) - rescue => e - log.warn "Cannot find group from metadata, Adding file in the default group" - group_watcher = @group_watchers[@default_group_key] - end - - group_watcher - end - def start super @@ -844,78 +767,6 @@ def on_timer end end - class GroupWatcher - attr_accessor :current_paths, :limit, :number_lines_read, :start_reading_time, :rate_period - - FileCounter = Struct.new( - :number_lines_read, - :start_reading_time, - ) - - def initialize(rate_period = 60, limit = -1) - @current_paths = {} - @rate_period = rate_period - @limit = limit - end - - def add(path) - @current_paths[path] = FileCounter.new(0, nil) - end - - def include?(path) - @current_paths.key?(path) - end - - def size - @current_paths.size - end - - def delete(path) - @current_paths.delete(path) - end - - def update_reading_time(path) - @current_paths[path].start_reading_time ||= Fluent::Clock.now - end - - def update_lines_read(path, value) - @current_paths[path].number_lines_read += value - end - - def reset_counter(path) - @current_paths[path].start_reading_time = nil - @current_paths[path].number_lines_read = 0 - end - - def time_spent_reading(path) - Fluent::Clock.now - @current_paths[path].start_reading_time - end - - def limit_time_period_reached?(path) - time_spent_reading(path) < @rate_period - end - - def limit_lines_reached?(path) - return true unless include?(path) - return true if @limit == 0 - - return false if @limit < 0 - return false if @current_paths[path].number_lines_read < @limit / size - - # update_reading_time(path) - if limit_time_period_reached?(path) # Exceeds limit - true - else # Does not exceed limit - reset_counter(path) - false - end - end - - def to_s - super + " current_paths: #{@current_paths} rate_period: #{@rate_period} limit: #{@limit}" - end - end - class TailWatcher def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) @path = target_info.path diff --git a/lib/fluent/plugin/in_tail/group_watcher.rb b/lib/fluent/plugin/in_tail/group_watcher.rb new file mode 100644 index 0000000000..a157675b22 --- /dev/null +++ b/lib/fluent/plugin/in_tail/group_watcher.rb @@ -0,0 +1,191 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/input' + +module Fluent::Plugin + class TailInput < Fluent::Plugin::Input + module GroupWatchParams + include Fluent::Configurable + + DEFAULT_KEY = /.*/ + DEFAULT_LIMIT = -1 + REGEXP_JOIN = "_" + + config_section :group, param_name: :group, required: false, multi: false do + desc 'Regex for extracting group\'s metadata' + config_param :pattern, + :regexp, + default: /^\/var\/log\/containers\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$/ + + desc 'Period of time in which the group_line_limit is applied' + config_param :rate_period, :time, default: 5 + + config_section :rule, param_name: :rule, required: true, multi: true do + desc 'Key-value pairs for grouping' + config_param :match, :hash, value_type: :regexp, default: {namespace: [DEFAULT_KEY], podname: [DEFAULT_KEY]} + desc 'Maximum number of log lines allowed per group over a period of rate_period' + config_param :limit, :integer, default: DEFAULT_LIMIT + end + end + end + + module GroupWatch + def self.included(mod) + mod.include GroupWatchParams + end + + attr_reader :group_watchers, :default_group_key + + def initialize + super + @group_watchers = {} + @group_keys = nil + @default_group_key = nil + end + + def configure(conf) + super + + unless @group.nil? + ## Ensuring correct time period syntax + @group.rule.each { |rule| + raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= GroupWatchParams::DEFAULT_LIMIT + } + + @group_keys = Regexp.compile(@group.pattern).named_captures.keys + @default_group_key = ([GroupWatchParams::DEFAULT_KEY] * @group_keys.length).join(GroupWatchParams::REGEXP_JOIN) + + ## Ensures that "specific" rules (with larger number of `rule.match` keys) + ## have a higher priority against "generic" rules (with less number of `rule.match` keys). + ## This will be helpful when a file satisfies more than one rule. + @group.rule.sort_by!{ |rule| -rule.match.length() } + construct_groupwatchers + @group_watchers[@default_group_key] ||= GroupWatcher.new(@group.rate_period, GroupWatchParams::DEFAULT_LIMIT) + end + end + + def construct_group_key(named_captures) + match_rule = [] + @group_keys.each { |key| + match_rule.append(named_captures.fetch(key, GroupWatchParams::DEFAULT_KEY)) + } + match_rule = match_rule.join(GroupWatchParams::REGEXP_JOIN) + + match_rule + end + + def construct_groupwatchers + @group.rule.each { |rule| + match_rule = construct_group_key(rule.match) + @group_watchers[match_rule] ||= GroupWatcher.new(@group.rate_period, rule.limit) + } + end + + def find_group(metadata) + metadata_key = construct_group_key(metadata) + gw_key = @group_watchers.keys.find{ |regexp| metadata_key.match?(regexp) && regexp != @default_group_key} + gw_key ||= @default_group_key + + @group_watchers[gw_key] + end + + def find_group_from_metadata(path) + begin + metadata = @group.pattern.match(path).named_captures + group_watcher = find_group(metadata) + rescue => e + log.warn "Cannot find group from metadata, Adding file in the default group" + group_watcher = @group_watchers[@default_group_key] + end + + group_watcher + end + end + + class GroupWatcher + attr_accessor :current_paths, :limit, :number_lines_read, :start_reading_time, :rate_period + + FileCounter = Struct.new( + :number_lines_read, + :start_reading_time, + ) + + def initialize(rate_period = 60, limit = -1) + @current_paths = {} + @rate_period = rate_period + @limit = limit + end + + def add(path) + @current_paths[path] = FileCounter.new(0, nil) + end + + def include?(path) + @current_paths.key?(path) + end + + def size + @current_paths.size + end + + def delete(path) + @current_paths.delete(path) + end + + def update_reading_time(path) + @current_paths[path].start_reading_time ||= Fluent::Clock.now + end + + def update_lines_read(path, value) + @current_paths[path].number_lines_read += value + end + + def reset_counter(path) + @current_paths[path].start_reading_time = nil + @current_paths[path].number_lines_read = 0 + end + + def time_spent_reading(path) + Fluent::Clock.now - @current_paths[path].start_reading_time + end + + def limit_time_period_reached?(path) + time_spent_reading(path) < @rate_period + end + + def limit_lines_reached?(path) + return true unless include?(path) + return true if @limit == 0 + + return false if @limit < 0 + return false if @current_paths[path].number_lines_read < @limit / size + + # update_reading_time(path) + if limit_time_period_reached?(path) # Exceeds limit + true + else # Does not exceed limit + reset_counter(path) + false + end + end + + def to_s + super + " current_paths: #{@current_paths} rate_period: #{@rate_period} limit: #{@limit}" + end + end + end +end From aefde972a8fd402788f5f8118b21347d4f7fe5cf Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Fri, 6 May 2022 22:49:28 +0900 Subject: [PATCH 09/23] in_tail: Remove needless spaces Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 6 +++--- test/plugin/test_in_tail.rb | 41 ++++++++++++++++++------------------ 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 68f617daa9..0d3f42f4b0 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -409,7 +409,7 @@ def setup_watcher(target_info, pe) event_loop_attach(watcher) end - unless @group.nil? + unless @group.nil? group_watcher = find_group_from_metadata(target_info.path) group_watcher.add(tw.path) unless group_watcher.include?(tw.path) tw.group_watcher = group_watcher @@ -470,7 +470,7 @@ def start_watchers(targets_info) def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true) targets_info.each_value { |target_info| - unless @group.nil? + unless @group.nil? group_watcher = find_group_from_metadata(target_info.path) group_watcher.delete(target_info.path) end @@ -1093,7 +1093,7 @@ def handle_notify group_watcher.update_lines_read(@path, @lines.size) unless group_watcher.nil? if group_watcher&.limit_lines_reached?(@path) || should_shutdown_now? - @log.debug "Reading Limit exceeded #{@path} #{group_watcher.number_lines_read}" unless should_shutdown_now? + @log.debug "Reading Limit exceeded #{@path} #{group_watcher.number_lines_read}" unless should_shutdown_now? read_more = false break end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 8fb1bfa6c7..a80a64edc2 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -143,7 +143,7 @@ def create_target_info(path) "format_firstline" => "/^[s]/" }) ]) - + TAILING_GROUP_PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" DEBUG_LOG_LEVEL = config_element("", "", { "@log_level" => "debug" @@ -159,7 +159,7 @@ def create_group_directive(pattern, rate_period, *rules) end def create_rule_directive(match_named_captures, limit) - params = { + params = { "limit" => limit, "match" => match_named_captures, } @@ -179,7 +179,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) sub_test_case "configure" do test " required" do conf = create_group_directive('.', '1m') + SINGLE_LINE_CONFIG - assert_raise(Fluent::ConfigError) do + assert_raise(Fluent::ConfigError) do d = create_driver(conf) end end @@ -201,7 +201,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) }, 0) conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG - assert_nothing_raised do + assert_nothing_raised do d = create_driver(conf) end end @@ -216,9 +216,9 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) "podname"=> "/podname-f/", }, 50) conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2) + SINGLE_LINE_CONFIG - assert_raise(RuntimeError) do + assert_raise(RuntimeError) do d = create_driver(conf) - end + end end test "plain single line" do @@ -347,18 +347,18 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) rule3 = create_rule_directive({ "namespace"=> "/namespace-a/", }, 100) - + conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule3, rule1, rule2) + SINGLE_LINE_CONFIG assert_nothing_raised do d = create_driver(conf) instance = d.instance metadata = { - "namespace"=> "namespace-a", + "namespace"=> "namespace-a", "podname"=> "podname-b", } assert_equal 50, instance.find_group(metadata).limit - + metadata = { "namespace" => "namespace-a", "podname" => "podname-c", @@ -410,7 +410,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) assert_true instance.group_watchers[key].include? File.join(TMP_DIR, 'test3.txt') end end - + test "valid regex pattern places file in their respective groups" do rule1 = create_rule_directive({ "namespace"=> "/test-namespace1/", @@ -447,7 +447,6 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) assert_equal 400, instance.find_group_from_metadata(file4).limit end end - end sub_test_case "singleline" do @@ -2554,7 +2553,7 @@ def test_shutdown_timeout "file test2.log no_limit 1024 text: test" => ["test2.log", 1024, "test"]) def test_lines_collected_with_no_throttling(data) file, num_lines, msg = data - + pattern = "/^#{TMP_DIR}\/(?.+)\.log$/" rule = create_rule_directive({ "file" => "/test.*/", @@ -2565,7 +2564,7 @@ def test_lines_collected_with_no_throttling(data) conf = ROOT_CONFIG + group + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG File.open("#{TMP_DIR}/#{file}", 'wb') do |f| - num_lines.times do + num_lines.times do f.puts "#{msg}\n" end end @@ -2582,14 +2581,14 @@ def test_lines_collected_with_no_throttling(data) prev_count = d.record_count ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver sleep(1) until Fluent::Clock.now - start_time > 12 - ## after waiting for 10 secs, limit will reset - ## Plugin will start reading but it will encounter EOF Error + ## after waiting for 10 secs, limit will reset + ## Plugin will start reading but it will encounter EOF Error ## since no logs are left to be read ## Hence, d.record_count = prev_count assert_equal 0, d.record_count - prev_count end end - + test "lines collected with throttling" do file = "podname1_namespace12_container-123456.log" limit = 1000 @@ -2606,9 +2605,9 @@ def test_lines_collected_with_no_throttling(data) d = create_driver(conf, false) file_path = "#{TMP_DIR}/#{file}" - + File.open(file_path, 'wb') do |f| - num_lines.times do + num_lines.times do f.puts msg end end @@ -2621,10 +2620,10 @@ def test_lines_collected_with_no_throttling(data) assert_true Fluent::Clock.now - start_time < 10 ## Check record_count after 10s to check lines reads assert_equal limit, d.record_count - prev_count - prev_count = d.record_count - ## sleep until rate_period seconds are over so that + prev_count = d.record_count + ## sleep until rate_period seconds are over so that ## Plugin can read lines again - sleep(1) until Fluent::Clock.now - start_time > 12 + sleep(1) until Fluent::Clock.now - start_time > 12 ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver start_time = Fluent::Clock.now end From 7b1bbaceb575a05ec5839d5b29cb4351619f3a9a Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Fri, 6 May 2022 22:56:23 +0900 Subject: [PATCH 10/23] in_tail: Rename group_watcher.rb to group_watch.rb Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 2 +- lib/fluent/plugin/in_tail/{group_watcher.rb => group_watch.rb} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename lib/fluent/plugin/in_tail/{group_watcher.rb => group_watch.rb} (100%) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 0d3f42f4b0..a255f5b189 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -24,7 +24,7 @@ require 'fluent/variable_store' require 'fluent/capability' require 'fluent/plugin/in_tail/position_file' -require 'fluent/plugin/in_tail/group_watcher' +require 'fluent/plugin/in_tail/group_watch' if Fluent.windows? require_relative 'file_wrapper' diff --git a/lib/fluent/plugin/in_tail/group_watcher.rb b/lib/fluent/plugin/in_tail/group_watch.rb similarity index 100% rename from lib/fluent/plugin/in_tail/group_watcher.rb rename to lib/fluent/plugin/in_tail/group_watch.rb From a2d9526e140eea21895be8092a361825482a2199 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Fri, 6 May 2022 23:20:14 +0900 Subject: [PATCH 11/23] in_tail: Add {add,remove}_group_watcher to GroupWatch module Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 9 ++------- lib/fluent/plugin/in_tail/group_watch.rb | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index a255f5b189..abba81af8b 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -409,9 +409,7 @@ def setup_watcher(target_info, pe) event_loop_attach(watcher) end - unless @group.nil? - group_watcher = find_group_from_metadata(target_info.path) - group_watcher.add(tw.path) unless group_watcher.include?(tw.path) + add_group_watcher(target_info.path) do |group_watcher| tw.group_watcher = group_watcher end @@ -470,10 +468,7 @@ def start_watchers(targets_info) def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true) targets_info.each_value { |target_info| - unless @group.nil? - group_watcher = find_group_from_metadata(target_info.path) - group_watcher.delete(target_info.path) - end + remove_group_watcher(target_info.path) if remove_watcher tw = @tails.delete(target_info) diff --git a/lib/fluent/plugin/in_tail/group_watch.rb b/lib/fluent/plugin/in_tail/group_watch.rb index a157675b22..e6db267337 100644 --- a/lib/fluent/plugin/in_tail/group_watch.rb +++ b/lib/fluent/plugin/in_tail/group_watch.rb @@ -78,6 +78,21 @@ def configure(conf) end end + def add_group_watcher(path, &block) + return if @group.nil? + + group_watcher = find_group_from_metadata(path) + group_watcher.add(path) unless group_watcher.include?(path) + block.call(group_watcher) if block_given? + end + + def remove_group_watcher(path, &block) + return if @group.nil? + group_watcher = find_group_from_metadata(path) + group_watcher.delete(path) + block.call(group_watcher) if block_given? + end + def construct_group_key(named_captures) match_rule = [] @group_keys.each { |key| From 433883bae99b33f363dc704a25ac2d264f1ca743 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Fri, 6 May 2022 23:21:20 +0900 Subject: [PATCH 12/23] in_tail: Remove a needless blank line Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail/group_watch.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail/group_watch.rb b/lib/fluent/plugin/in_tail/group_watch.rb index e6db267337..7cdf9defdf 100644 --- a/lib/fluent/plugin/in_tail/group_watch.rb +++ b/lib/fluent/plugin/in_tail/group_watch.rb @@ -80,7 +80,6 @@ def configure(conf) def add_group_watcher(path, &block) return if @group.nil? - group_watcher = find_group_from_metadata(path) group_watcher.add(path) unless group_watcher.include?(path) block.call(group_watcher) if block_given? From 46e898f3387511d830c34d9be7485ca12358f509 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Sat, 7 May 2022 00:04:36 +0900 Subject: [PATCH 13/23] in_tail: Rename add_group_watcher & remove_group_watcher add_group_watcher -> add_path_to_group_watcher remove_group_watcher -> remove_path_from_group_watcher And simplify them. Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 6 ++---- lib/fluent/plugin/in_tail/group_watch.rb | 7 +++---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index abba81af8b..fb93b83c66 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -409,9 +409,7 @@ def setup_watcher(target_info, pe) event_loop_attach(watcher) end - add_group_watcher(target_info.path) do |group_watcher| - tw.group_watcher = group_watcher - end + tw.group_watcher = add_path_to_group_watcher(target_info.path) tw rescue => e @@ -468,7 +466,7 @@ def start_watchers(targets_info) def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true) targets_info.each_value { |target_info| - remove_group_watcher(target_info.path) + remove_path_from_group_watcher(target_info.path) if remove_watcher tw = @tails.delete(target_info) diff --git a/lib/fluent/plugin/in_tail/group_watch.rb b/lib/fluent/plugin/in_tail/group_watch.rb index 7cdf9defdf..fbe8d83f09 100644 --- a/lib/fluent/plugin/in_tail/group_watch.rb +++ b/lib/fluent/plugin/in_tail/group_watch.rb @@ -78,18 +78,17 @@ def configure(conf) end end - def add_group_watcher(path, &block) + def add_path_to_group_watcher(path) return if @group.nil? group_watcher = find_group_from_metadata(path) group_watcher.add(path) unless group_watcher.include?(path) - block.call(group_watcher) if block_given? + group_watcher end - def remove_group_watcher(path, &block) + def remove_path_from_group_watcher(path) return if @group.nil? group_watcher = find_group_from_metadata(path) group_watcher.delete(path) - block.call(group_watcher) if block_given? end def construct_group_key(named_captures) From e8d893a4447655cc7db1bf7f5a7026267ce7a0d2 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Sat, 7 May 2022 00:26:21 +0900 Subject: [PATCH 14/23] in_tail: Make sure to return nil early in add_path_to_group_watcher Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail/group_watch.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail/group_watch.rb b/lib/fluent/plugin/in_tail/group_watch.rb index fbe8d83f09..937434368d 100644 --- a/lib/fluent/plugin/in_tail/group_watch.rb +++ b/lib/fluent/plugin/in_tail/group_watch.rb @@ -79,7 +79,7 @@ def configure(conf) end def add_path_to_group_watcher(path) - return if @group.nil? + return nil if @group.nil? group_watcher = find_group_from_metadata(path) group_watcher.add(path) unless group_watcher.include?(path) group_watcher From d311670e9eb876cf03a1f50386550554ce9e6591 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Mon, 9 May 2022 01:00:04 +0900 Subject: [PATCH 15/23] in_tai: Simplify detecting group watcher's limit Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index fb93b83c66..a275ab8e89 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1076,25 +1076,26 @@ def handle_notify begin while true @start_reading_time ||= Fluent::Clock.now - group_watcher.update_reading_time(@path) unless group_watcher.nil? + group_watcher&.update_reading_time(@path) + data = io.readpartial(BYTES_TO_READ, @iobuf) @eof = false @number_bytes_read += data.bytesize @fifo << data - group_watcher.update_lines_read(@path, -@lines.size) unless group_watcher.nil? + + n_lines_before_read = @lines.size @fifo.read_lines(@lines) - group_watcher.update_lines_read(@path, @lines.size) unless group_watcher.nil? + group_watcher&.update_lines_read(@path, @lines.size - n_lines_before_read) - if group_watcher&.limit_lines_reached?(@path) || should_shutdown_now? - @log.debug "Reading Limit exceeded #{@path} #{group_watcher.number_lines_read}" unless should_shutdown_now? - read_more = false - break - end - if limit_bytes_per_second_reached? || should_shutdown_now? + group_watcher_limit = group_watcher&.limit_lines_reached?(@path) + @log.debug "Reading Limit exceeded #{@path} #{group_watcher.number_lines_read}" if group_watcher_limit + + if group_watcher_limit || limit_bytes_per_second_reached? || should_shutdown_now? # Just get out from tailing loop. read_more = false break end + if @lines.size >= @read_lines_limit # not to use too much memory in case the file is very large read_more = true From e291daeefb74b6df6704fe26fa4d004dc16bcdd5 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Mon, 9 May 2022 01:26:25 +0900 Subject: [PATCH 16/23] in_tail: Fix warnings of group watcher tests /home/aho/Projects/Fluentd/fluentd/test/plugin/test_in_tail.rb:183: warning: assigned but unused variable - d /home/aho/Projects/Fluentd/fluentd/test/plugin/test_in_tail.rb:205: warning: assigned but unused variable - d /home/aho/Projects/Fluentd/fluentd/test/plugin/test_in_tail.rb:220: warning: assigned but unused variable - d /home/aho/Projects/Fluentd/fluentd/test/plugin/test_in_tail.rb:385: warning: ambiguous first argument; put parentheses or a space even after `-' operator /home/aho/Projects/Fluentd/fluentd/lib/fluent/plugin/in_tail/group_watch.rb:123: warning: assigned but unused variable - e Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail/group_watch.rb | 2 +- test/plugin/test_in_tail.rb | 48 ++++++++++++------------ 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/lib/fluent/plugin/in_tail/group_watch.rb b/lib/fluent/plugin/in_tail/group_watch.rb index 937434368d..d338aae9ac 100644 --- a/lib/fluent/plugin/in_tail/group_watch.rb +++ b/lib/fluent/plugin/in_tail/group_watch.rb @@ -120,7 +120,7 @@ def find_group_from_metadata(path) begin metadata = @group.pattern.match(path).named_captures group_watcher = find_group(metadata) - rescue => e + rescue log.warn "Cannot find group from metadata, Adding file in the default group" group_watcher = @group_watchers[@default_group_key] end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index a80a64edc2..371604198c 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -180,7 +180,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) test " required" do conf = create_group_directive('.', '1m') + SINGLE_LINE_CONFIG assert_raise(Fluent::ConfigError) do - d = create_driver(conf) + create_driver(conf) end end @@ -202,7 +202,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG assert_nothing_raised do - d = create_driver(conf) + create_driver(conf) end end @@ -217,7 +217,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) }, 50) conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2) + SINGLE_LINE_CONFIG assert_raise(RuntimeError) do - d = create_driver(conf) + create_driver(conf) end end @@ -357,32 +357,32 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) "namespace"=> "namespace-a", "podname"=> "podname-b", } - assert_equal 50, instance.find_group(metadata).limit + assert_equal(50, instance.find_group(metadata).limit) metadata = { "namespace" => "namespace-a", "podname" => "podname-c", } - assert_equal 50, instance.find_group(metadata).limit + assert_equal(50, instance.find_group(metadata).limit) metadata = { "namespace" => "namespace-a", "podname" => "podname-d", } - assert_equal 100, instance.find_group(metadata).limit + assert_equal(100, instance.find_group(metadata).limit) metadata = { "namespace" => "namespace-f", "podname" => "podname-b", } - assert_equal 400, instance.find_group(metadata).limit + assert_equal(400, instance.find_group(metadata).limit) metadata = { "podname" => "podname-c", } - assert_equal 400, instance.find_group(metadata).limit + assert_equal(400, instance.find_group(metadata).limit) - assert_equal -1, instance.find_group({}).limit + assert_equal(-1, instance.find_group({}).limit) end end @@ -403,11 +403,11 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) instance = d.instance key = instance.default_group_key - assert_equal 3, instance.log.logs.count{|a| a.match?("Cannot find group from metadata, Adding file in the default group\n")} - assert_equal 3, instance.group_watchers[key].size - assert_true instance.group_watchers[key].include? File.join(TMP_DIR, 'test1.txt') - assert_true instance.group_watchers[key].include? File.join(TMP_DIR, 'test2.txt') - assert_true instance.group_watchers[key].include? File.join(TMP_DIR, 'test3.txt') + assert_equal(3, instance.log.logs.count{|a| a.match?("Cannot find group from metadata, Adding file in the default group\n")}) + assert_equal(3, instance.group_watchers[key].size) + assert_true(instance.group_watchers[key].include? File.join(TMP_DIR, 'test1.txt')) + assert_true(instance.group_watchers[key].include? File.join(TMP_DIR, 'test2.txt')) + assert_true(instance.group_watchers[key].include? File.join(TMP_DIR, 'test3.txt')) end end @@ -441,10 +441,10 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) File.open(file4, 'w') instance = d.instance - assert_equal 100, instance.find_group_from_metadata(file1).limit - assert_equal 200, instance.find_group_from_metadata(file2).limit - assert_equal 300, instance.find_group_from_metadata(file3).limit - assert_equal 400, instance.find_group_from_metadata(file4).limit + assert_equal(100, instance.find_group_from_metadata(file1).limit) + assert_equal(200, instance.find_group_from_metadata(file2).limit) + assert_equal(300, instance.find_group_from_metadata(file3).limit) + assert_equal(400, instance.find_group_from_metadata(file4).limit) end end end @@ -2574,8 +2574,8 @@ def test_lines_collected_with_no_throttling(data) d.run do start_time = Fluent::Clock.now - assert_true Fluent::Clock.now - start_time < 10 - assert_equal num_lines, d.record_count + assert_true(Fluent::Clock.now - start_time < 10) + assert_equal(num_lines, d.record_count) assert_equal({ "message" => msg }, d.events[0][2]) prev_count = d.record_count @@ -2585,7 +2585,7 @@ def test_lines_collected_with_no_throttling(data) ## Plugin will start reading but it will encounter EOF Error ## since no logs are left to be read ## Hence, d.record_count = prev_count - assert_equal 0, d.record_count - prev_count + assert_equal(0, d.record_count - prev_count) end end @@ -2617,9 +2617,9 @@ def test_lines_collected_with_no_throttling(data) prev_count = 0 (num_lines/limit).times do - assert_true Fluent::Clock.now - start_time < 10 + assert_true(Fluent::Clock.now - start_time < 10) ## Check record_count after 10s to check lines reads - assert_equal limit, d.record_count - prev_count + assert_equal(limit, d.record_count - prev_count) prev_count = d.record_count ## sleep until rate_period seconds are over so that ## Plugin can read lines again @@ -2632,7 +2632,7 @@ def test_lines_collected_with_no_throttling(data) ## number_lines_read will be 0 gw = d.instance.find_group_from_metadata(file_path) - assert_equal 0, gw.current_paths[file_path].number_lines_read + assert_equal(0, gw.current_paths[file_path].number_lines_read) end end end From 0de4e66072b43e232d1c40cb0ad0d0f29b6a41aa Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Sun, 15 May 2022 01:29:27 +0900 Subject: [PATCH 17/23] in_tail: Add throttling_is_enabled? Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index a275ab8e89..c0788c726e 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -549,18 +549,19 @@ def detach_watcher(tw, ino, close_io = true) end end + def throttling_is_enabled?(tw = nil) + return true if @read_bytes_limit_per_second > 0 + return true if tw.group_watcher && tw.group_watcher.limit > 0 + false + end + def detach_watcher_after_rotate_wait(tw, ino) # Call event_loop_attach/event_loop_detach is high-cost for short-live object. # If this has a problem with large number of files, use @_event_loop directly instead of timer_execute. if @open_on_every_update # Detach now because it's already closed, waiting it doesn't make sense. detach_watcher(tw, ino) - elsif @read_bytes_limit_per_second < 0 || (!tw.group_watcher.nil? && tw.group_watcher.limit <= 0) - # throttling isn't enabled, just wait @rotate_wait - timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do - detach_watcher(tw, ino) - end - else + elsif throttling_is_enabled?(tw) # When the throttling feature is enabled, it might not reach EOF yet. # Should ensure to read all contents before closing it, with keeping throttling. start_time_to_wait = Fluent::Clock.now @@ -571,6 +572,11 @@ def detach_watcher_after_rotate_wait(tw, ino) detach_watcher(tw, ino) end end + else + # when the throttling feature isn't enabled, just wait @rotate_wait + timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do + detach_watcher(tw, ino) + end end end From dc680bef30857dc3b041174ecb6f92f7fad9256c Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Sun, 15 May 2022 01:42:50 +0900 Subject: [PATCH 18/23] in_tail: Remove a needless default value of throttling_is_enabled? Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index c0788c726e..774c6bdc8b 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -549,7 +549,7 @@ def detach_watcher(tw, ino, close_io = true) end end - def throttling_is_enabled?(tw = nil) + def throttling_is_enabled?(tw) return true if @read_bytes_limit_per_second > 0 return true if tw.group_watcher && tw.group_watcher.limit > 0 false From 07ef5627b5ab54275794e95382395cf89b5c641b Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Sun, 15 May 2022 09:54:13 +0900 Subject: [PATCH 19/23] test_in_tail: Move group config tests to separated sub_test_case Signed-off-by: Takuro Ashie --- test/plugin/test_in_tail.rb | 92 ++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 371604198c..ad7fbf9186 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -177,50 +177,6 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) end sub_test_case "configure" do - test " required" do - conf = create_group_directive('.', '1m') + SINGLE_LINE_CONFIG - assert_raise(Fluent::ConfigError) do - create_driver(conf) - end - end - - test "valid configuration" do - rule1 = create_rule_directive({ - "namespace"=> "/namespace-a/", - "podname"=> "/podname-[b|c]/" - }, 100) - rule2 = create_rule_directive({ - "namespace"=> "/namespace-[d|e]/", - "podname"=> "/podname-f/", - }, 50) - rule3 = create_rule_directive({ - "podname"=> "/podname-g/", - }, -1) - rule4 = create_rule_directive({ - "namespace"=> "/namespace-h/", - }, 0) - - conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG - assert_nothing_raised do - create_driver(conf) - end - end - - test "limit should be greater than DEFAULT_LIMIT (-1)" do - rule1 = create_rule_directive({ - "namespace"=> "/namespace-a/", - "podname"=> "/podname-[b|c]/", - }, -100) - rule2 = create_rule_directive({ - "namespace"=> "/namespace-[d|e]/", - "podname"=> "/podname-f/", - }, 50) - conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2) + SINGLE_LINE_CONFIG - assert_raise(RuntimeError) do - create_driver(conf) - end - end - test "plain single line" do d = create_driver assert_equal ["#{TMP_DIR}/tail.txt"], d.instance.paths @@ -334,8 +290,53 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) end end - sub_test_case "group rules line limit resolution" do + sub_test_case "configure group" do + test " required" do + conf = create_group_directive('.', '1m') + SINGLE_LINE_CONFIG + assert_raise(Fluent::ConfigError) do + create_driver(conf) + end + end + test "valid configuration" do + rule1 = create_rule_directive({ + "namespace"=> "/namespace-a/", + "podname"=> "/podname-[b|c]/" + }, 100) + rule2 = create_rule_directive({ + "namespace"=> "/namespace-[d|e]/", + "podname"=> "/podname-f/", + }, 50) + rule3 = create_rule_directive({ + "podname"=> "/podname-g/", + }, -1) + rule4 = create_rule_directive({ + "namespace"=> "/namespace-h/", + }, 0) + + conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG + assert_nothing_raised do + create_driver(conf) + end + end + + test "limit should be greater than DEFAULT_LIMIT (-1)" do + rule1 = create_rule_directive({ + "namespace"=> "/namespace-a/", + "podname"=> "/podname-[b|c]/", + }, -100) + rule2 = create_rule_directive({ + "namespace"=> "/namespace-[d|e]/", + "podname"=> "/podname-f/", + }, 50) + conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2) + SINGLE_LINE_CONFIG + assert_raise(RuntimeError) do + create_driver(conf) + end + end + end + + sub_test_case "group rules line limit resolution" do test "valid" do rule1 = create_rule_directive({ "namespace"=> "/namespace-a/", @@ -385,7 +386,6 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) assert_equal(-1, instance.find_group({}).limit) end end - end sub_test_case "files should be placed in groups" do From efa7d2dd081eab33b2aad343b6be1393e3e936de Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Mon, 16 May 2022 14:32:27 +0900 Subject: [PATCH 20/23] test_in_tail: Reduce test time of group watch tests Signed-off-by: Takuro Ashie --- test/plugin/test_in_tail.rb | 56 +++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index ad7fbf9186..959089f005 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2548,7 +2548,6 @@ def test_shutdown_timeout end sub_test_case "throttling logs at in_tail level" do - data("file test1.log no_limit 5120 text: msg" => ["test1.log", 5120, "msg"], "file test2.log no_limit 1024 text: test" => ["test2.log", 1024, "test"]) def test_lines_collected_with_no_throttling(data) @@ -2558,7 +2557,7 @@ def test_lines_collected_with_no_throttling(data) rule = create_rule_directive({ "file" => "/test.*/", }, -1) - group = create_group_directive(pattern, '10s', rule) + group = create_group_directive(pattern, "1s", rule) path_element = create_path_element(file) conf = ROOT_CONFIG + group + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG @@ -2571,20 +2570,23 @@ def test_lines_collected_with_no_throttling(data) d = create_driver(conf, false) - d.run do + d.run(timeout: 3) do start_time = Fluent::Clock.now - assert_true(Fluent::Clock.now - start_time < 10) assert_equal(num_lines, d.record_count) assert_equal({ "message" => msg }, d.events[0][2]) prev_count = d.record_count - ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver - sleep(1) until Fluent::Clock.now - start_time > 12 - ## after waiting for 10 secs, limit will reset + sleep(0.1) while d.emit_count < 1 + assert_true(Fluent::Clock.now - start_time < 2) + ## after waiting for 1 (+ jitter) secs, limit will reset ## Plugin will start reading but it will encounter EOF Error ## since no logs are left to be read ## Hence, d.record_count = prev_count + tail_watcher_interval = 1.0 # hard coded value in in_tail + safety_mergin = 1.02 + jitter = tail_watcher_interval * safety_mergin + sleep(1.0 + jitter) assert_equal(0, d.record_count - prev_count) end end @@ -2592,16 +2594,16 @@ def test_lines_collected_with_no_throttling(data) test "lines collected with throttling" do file = "podname1_namespace12_container-123456.log" limit = 1000 - rate_period = '10s' + rate_period = 2 num_lines = 3000 - msg = "a"*8190 # Total size = 8190 bytes + 2 (\n) bytes + msg = "a" * 8190 # Total size = 8190 bytes + 2 (\n) bytes rule = create_rule_directive({ "namespace"=> "/namespace.+/", "podname"=> "/podname.+/", }, limit) path_element = create_path_element(file) - conf = ROOT_CONFIG + create_group_directive(TAILING_GROUP_PATTERN, rate_period, rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD + conf = ROOT_CONFIG + create_group_directive(TAILING_GROUP_PATTERN, "#{rate_period}s", rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD d = create_driver(conf, false) file_path = "#{TMP_DIR}/#{file}" @@ -2612,25 +2614,37 @@ def test_lines_collected_with_no_throttling(data) end end - d.run do - start_time = Fluent::Clock.now + d.run(timeout: 15) do + sleep_interval = 0.1 + tail_watcher_interval = 1.0 # hard coded value in in_tail + safety_mergin = 1.02 + lower_jitter = sleep_interval * safety_mergin + upper_jitter = (tail_watcher_interval + sleep_interval) * safety_mergin + lower_interval = rate_period - lower_jitter + upper_interval = rate_period + upper_jitter + + emit_count = 0 prev_count = 0 - (num_lines/limit).times do - assert_true(Fluent::Clock.now - start_time < 10) - ## Check record_count after 10s to check lines reads + while emit_count < 3 do + start_time = Fluent::Clock.now + sleep(sleep_interval) while d.emit_count <= emit_count + elapsed_seconds = Fluent::Clock.now - start_time + if emit_count > 0 + assert_true(elapsed_seconds > lower_interval && elapsed_seconds < upper_interval, + "elapsed_seconds #{elapsed_seconds} is out of allowed range:\n" + + " lower: #{lower_interval} [sec]\n" + + " upper: #{upper_interval} [sec]") + end assert_equal(limit, d.record_count - prev_count) + emit_count = d.emit_count prev_count = d.record_count - ## sleep until rate_period seconds are over so that - ## Plugin can read lines again - sleep(1) until Fluent::Clock.now - start_time > 12 - ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver - start_time = Fluent::Clock.now end + ## When all the lines are read and rate_period seconds are over ## limit will reset and since there are no more logs to be read, ## number_lines_read will be 0 - + sleep upper_interval gw = d.instance.find_group_from_metadata(file_path) assert_equal(0, gw.current_paths[file_path].number_lines_read) end From bd5a704a0dc9ba900009ee415ce36ce5d5202d1f Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 17 May 2022 00:59:43 +0900 Subject: [PATCH 21/23] Fix coding style Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail/group_watch.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/in_tail/group_watch.rb b/lib/fluent/plugin/in_tail/group_watch.rb index d338aae9ac..758e3a76a9 100644 --- a/lib/fluent/plugin/in_tail/group_watch.rb +++ b/lib/fluent/plugin/in_tail/group_watch.rb @@ -36,7 +36,7 @@ module GroupWatchParams config_section :rule, param_name: :rule, required: true, multi: true do desc 'Key-value pairs for grouping' - config_param :match, :hash, value_type: :regexp, default: {namespace: [DEFAULT_KEY], podname: [DEFAULT_KEY]} + config_param :match, :hash, value_type: :regexp, default: { namespace: [DEFAULT_KEY], podname: [DEFAULT_KEY] } desc 'Maximum number of log lines allowed per group over a period of rate_period' config_param :limit, :integer, default: DEFAULT_LIMIT end @@ -72,7 +72,7 @@ def configure(conf) ## Ensures that "specific" rules (with larger number of `rule.match` keys) ## have a higher priority against "generic" rules (with less number of `rule.match` keys). ## This will be helpful when a file satisfies more than one rule. - @group.rule.sort_by!{ |rule| -rule.match.length() } + @group.rule.sort_by! { |rule| -rule.match.length() } construct_groupwatchers @group_watchers[@default_group_key] ||= GroupWatcher.new(@group.rate_period, GroupWatchParams::DEFAULT_LIMIT) end @@ -110,7 +110,7 @@ def construct_groupwatchers def find_group(metadata) metadata_key = construct_group_key(metadata) - gw_key = @group_watchers.keys.find{ |regexp| metadata_key.match?(regexp) && regexp != @default_group_key} + gw_key = @group_watchers.keys.find { |regexp| metadata_key.match?(regexp) && regexp != @default_group_key } gw_key ||= @default_group_key @group_watchers[gw_key] From c6c5989ae0f6dbcbe0c01b032a7a0fd7e93e343d Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 17 May 2022 10:46:17 +0900 Subject: [PATCH 22/23] test_in_tail: Fix inappropriate variable name safety_mergin -> safety_ratio Signed-off-by: Takuro Ashie --- test/plugin/test_in_tail.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 959089f005..8972e8bba9 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2584,8 +2584,8 @@ def test_lines_collected_with_no_throttling(data) ## since no logs are left to be read ## Hence, d.record_count = prev_count tail_watcher_interval = 1.0 # hard coded value in in_tail - safety_mergin = 1.02 - jitter = tail_watcher_interval * safety_mergin + safety_ratio = 1.02 + jitter = tail_watcher_interval * safety_ratio sleep(1.0 + jitter) assert_equal(0, d.record_count - prev_count) end @@ -2617,9 +2617,9 @@ def test_lines_collected_with_no_throttling(data) d.run(timeout: 15) do sleep_interval = 0.1 tail_watcher_interval = 1.0 # hard coded value in in_tail - safety_mergin = 1.02 - lower_jitter = sleep_interval * safety_mergin - upper_jitter = (tail_watcher_interval + sleep_interval) * safety_mergin + safety_ratio = 1.02 + lower_jitter = sleep_interval * safety_ratio + upper_jitter = (tail_watcher_interval + sleep_interval) * safety_ratio lower_interval = rate_period - lower_jitter upper_interval = rate_period + upper_jitter From 5f54dd045e6b8d5fb2b8c90006fdb1acf0836763 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 17 May 2022 11:45:49 +0900 Subject: [PATCH 23/23] in_tail: Fix wrong lower bound of group watch limit Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 774c6bdc8b..da9bba81c9 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -551,7 +551,7 @@ def detach_watcher(tw, ino, close_io = true) def throttling_is_enabled?(tw) return true if @read_bytes_limit_per_second > 0 - return true if tw.group_watcher && tw.group_watcher.limit > 0 + return true if tw.group_watcher && tw.group_watcher.limit >= 0 false end