From 514126ca4329c81739925aa6bd140fabe87faaf2 Mon Sep 17 00:00:00 2001 From: Pranjal Gupta Date: Thu, 6 Jan 2022 20:46:41 +0530 Subject: [PATCH] in_tail: Add group based log collection from `in_tail_with_throttle` Signed-off-by: Pranjal Gupta --- lib/fluent/plugin/in_tail.rb | 182 +++++++++++- lib/fluent/plugin/in_tail_with_throttle.rb | 296 -------------------- test/plugin/test_in_tail.rb | 201 +++++++++++++- test/plugin/test_in_tail_with_throttle.rb | 308 --------------------- 4 files changed, 379 insertions(+), 608 deletions(-) delete mode 100644 lib/fluent/plugin/in_tail_with_throttle.rb delete mode 100644 test/plugin/test_in_tail_with_throttle.rb diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 3ad5943f82..cc19fa3c02 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -40,6 +40,9 @@ class TailInput < Fluent::Plugin::Input RESERVED_CHARS = ['/', '*', '%'].freeze MetricsInfo = Struct.new(:opened, :closed, :rotated) + DEFAULT_NAMESPACE = DEFAULT_APPNAME = /./ + DEFAULT_LIMIT = -1 + class WatcherSetupError < StandardError def initialize(msg) @message = msg @@ -60,6 +63,9 @@ def initialize @shutdown_start_time = nil @metrics = nil @startup = true + # Map rules with GroupWatcher objects + @group_watchers = {} + @sorted_path = nil end desc 'The paths to read. Multiple paths can be specified, separated by comma.' @@ -120,7 +126,25 @@ def initialize config_argument :usage, :string, default: 'in_tail_parser' end - attr_reader :paths + config_section :group, param_name: :group, required: false, multi: false do + desc 'Regex for extracting group\'s metadata' + config_param :pattern, + :regexp, + default: /var\/log\/containers\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$/ + desc 'Period of time in which the group_line_limit is applied' + config_param :rate_period, :time, default: 5 + + config_section :rule, multi: true, required: true do + desc 'Namespace key' + config_param :namespace, :array, value_type: :string, default: [DEFAULT_NAMESPACE] + desc 'App name key' + config_param :appname, :array, value_type: :string, default: [DEFAULT_APPNAME] + desc 'Maximum number of log lines allowed per group over a period of rate_period' + config_param :limit, :integer, default: DEFAULT_LIMIT + end + end + + attr_reader :paths, :group_watchers def configure(conf) @variable_store = Fluent::VariableStore.fetch_or_build(:in_tail) @@ -196,6 +220,14 @@ def configure(conf) @read_bytes_limit_per_second = min_bytes end end + + ## Ensuring correct time period syntax + @group.rule.each { |rule| + raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= DEFAULT_LIMIT + } unless @group.nil? + + construct_groupwatchers unless @group.nil? + opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files") closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files") rotated_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_rotated_total", help_text: "Total number of rotated files") @@ -235,6 +267,51 @@ def parse_encoding_param(encoding_name) end end + def construct_groupwatchers + @group.rule.each { |rule| + num_groups = rule.namespace.size * rule.appname.size + + rule.namespace.each { |namespace| + namespace = /#{Regexp.quote(namespace)}/ unless namespace.eql?(DEFAULT_NAMESPACE) + @group_watchers[namespace] ||= {} + + rule.appname.each { |appname| + appname = /#{Regexp.quote(appname)}/ unless appname.eql?(DEFAULT_APPNAME) + @group_watchers[namespace][appname] = GroupWatcher.new(@group.rate_period, rule.limit/num_groups) + } + + @group_watchers[namespace][DEFAULT_APPNAME] ||= GroupWatcher.new(@group.rate_period) + } + } + + if @group_watchers.dig(DEFAULT_NAMESPACE, DEFAULT_APPNAME).nil? + @group_watchers[DEFAULT_NAMESPACE] ||= {} + @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] = GroupWatcher.new(@group.rate_period) + end + end + + def find_group(namespace, appname) + namespace_key = @group_watchers.keys.find { |regexp| namespace.match?(regexp) && regexp != DEFAULT_NAMESPACE } + namespace_key ||= DEFAULT_NAMESPACE + + appname_key = @group_watchers[namespace_key].keys.find { |regexp| appname.match?(regexp) && regexp != DEFAULT_APPNAME } + appname_key ||= DEFAULT_APPNAME + + @group_watchers[namespace_key][appname_key] + end + + def find_group_from_metadata(path) + begin + metadata = @group.pattern.match(path) + group_watcher = find_group(metadata['namespace'], metadata['appname']) + rescue => e + $log.warn "Cannot find group from metadata, Adding file in the default group" + group_watcher = @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] + end + + group_watcher + end + def start super @@ -406,6 +483,12 @@ def setup_watcher(target_info, pe) event_loop_attach(watcher) end + unless @group.nil? + group_watcher = find_group_from_metadata(target_info.path) + group_watcher.add(tw.path) unless group_watcher.include?(tw.path) + tw.group_watcher = group_watcher + end + tw rescue => e if tw @@ -461,6 +544,11 @@ def start_watchers(targets_info) def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true) targets_info.each_value { |target_info| + unless @group.nil? + group_watcher = find_group_from_metadata(target_info.path) + group_watcher.delete(target_info.path) + end + if remove_watcher tw = @tails.delete(target_info) else @@ -548,7 +636,7 @@ def detach_watcher_after_rotate_wait(tw, ino) if @open_on_every_update # Detach now because it's already closed, waiting it doesn't make sense. detach_watcher(tw, ino) - elsif @read_bytes_limit_per_second < 0 + elsif @read_bytes_limit_per_second < 0 || (!tw.group_watcher.nil? && tw.group_watcher.limit <= 0) # throttling isn't enabled, just wait @rotate_wait timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do detach_watcher(tw, ino) @@ -753,6 +841,79 @@ def on_timer end end + class GroupWatcher + attr_accessor :current_paths, :limit, :number_lines_read, :start_reading_time, :rate_period + + FileCounter = Struct.new( + :number_lines_read, + :start_reading_time, + ) + + def initialize(rate_period = 60, limit = -1) + @current_paths = {} + @rate_period = rate_period + @limit = limit + end + + def add(path) + @current_paths[path] = FileCounter.new(0, nil) + end + + def include?(path) + @current_paths.key?(path) + end + + def size + @current_paths.size + end + + def delete(path) + @current_paths.delete(path) + end + + def update_reading_time(path) + @current_paths[path].start_reading_time ||= Fluent::Clock.now + end + + def update_lines_read(path, value) + @current_paths[path].number_lines_read += value + end + + def reset_counter(path) + @current_paths[path].start_reading_time = nil + @current_paths[path].number_lines_read = 0 + end + + def time_spent_reading(path) + Fluent::Clock.now - @current_paths[path].start_reading_time + end + + def limit_time_period_reached?(path) + time_spent_reading(path) < @rate_period + end + + def limit_lines_reached?(path) + return true unless include?(path) + return true if @limit == 0 + + return false if @limit < 0 + return false if @current_paths[path].number_lines_read < @limit / size + + # update_reading_time(path) + if limit_time_period_reached?(path) # Exceeds limit + true + else # Does not exceed limit + reset_counter(path) + false + end + end + + def to_s + super + " current_paths: #{@current_paths} rate_period: #{@rate_period} limit: #{@limit}" + end + end + + class TailWatcher def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) @path = target_info.path @@ -775,6 +936,11 @@ def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watch attr_reader :line_buffer_timer_flusher attr_accessor :unwatched # This is used for removing position entry from PositionFile attr_reader :watchers + attr_accessor :group_watcher + + def group_watcher=(group_watcher) + @group_watcher = group_watcher + end def tag @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') @@ -997,6 +1163,10 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, @log.info "following tail of #{@path}" end + def group_watcher + @watcher.group_watcher + end + def on_notify @notify_mutex.synchronize { handle_notify } end @@ -1054,6 +1224,7 @@ def should_shutdown_now? def handle_notify return if limit_bytes_per_second_reached? + return if !group_watcher.nil? && group_watcher.limit_lines_reached?(@path) with_io do |io| begin @@ -1063,12 +1234,19 @@ def handle_notify begin while true @start_reading_time ||= Fluent::Clock.now + group_watcher.update_reading_time(@path) unless group_watcher.nil? data = io.readpartial(BYTES_TO_READ, @iobuf) @eof = false @number_bytes_read += data.bytesize @fifo << data + group_watcher.update_lines_read(@path, -@lines.size) unless group_watcher.nil? @fifo.read_lines(@lines) + group_watcher.update_lines_read(@path, @lines.size) unless group_watcher.nil? + if !group_watcher.nil? && group_watcher.limit_lines_reached?(@path) || should_shutdown_now? + read_more = false + break + end if limit_bytes_per_second_reached? || should_shutdown_now? # Just get out from tailing loop. read_more = false diff --git a/lib/fluent/plugin/in_tail_with_throttle.rb b/lib/fluent/plugin/in_tail_with_throttle.rb deleted file mode 100644 index 4772cfd287..0000000000 --- a/lib/fluent/plugin/in_tail_with_throttle.rb +++ /dev/null @@ -1,296 +0,0 @@ -# -# Fluentd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -require 'fluent/plugin/in_tail' - -module Fluent::Plugin - class ThrottleInput < Fluent::Plugin::TailInput - Fluent::Plugin.register_input('tail_with_throttle', self) - - DEFAULT_NAMESPACE = DEFAULT_APPNAME = /./ - DEFAULT_LIMIT = -1 - - attr_reader :group_watchers - - def initialize - super - # Map rules with GroupWatcher objects - @group_watchers = {} - @sorted_path = nil - end - - config_section :group, param_name: :group, required: true, multi: false do - desc 'Regex for extracting group\'s metadata' - config_param :pattern, - :regexp, - default: /var\/log\/containers\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$/ - desc 'Period of time in which the group_line_limit is applied' - config_param :rate_period, :time, default: 5 - - config_section :rule, multi: true, required: true do - desc 'Namespace key' - config_param :namespace, :array, value_type: :string, default: [DEFAULT_NAMESPACE] - desc 'App name key' - config_param :appname, :array, value_type: :string, default: [DEFAULT_APPNAME] - desc 'Maximum number of log lines allowed per group over a period of rate_period' - config_param :limit, :integer, default: DEFAULT_LIMIT - end - end - - def configure(conf) - super - ## Ensuring correct time period syntax - @group.rule.each { |rule| - raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= DEFAULT_LIMIT - } - - construct_groupwatchers - end - - def construct_groupwatchers - @group.rule.each { |rule| - num_groups = rule.namespace.size * rule.appname.size - - rule.namespace.each { |namespace| - namespace = /#{Regexp.quote(namespace)}/ unless namespace.eql?(DEFAULT_NAMESPACE) - @group_watchers[namespace] ||= {} - - rule.appname.each { |appname| - appname = /#{Regexp.quote(appname)}/ unless appname.eql?(DEFAULT_APPNAME) - @group_watchers[namespace][appname] = GroupWatcher.new(@group.rate_period, rule.limit/num_groups) - } - - @group_watchers[namespace][DEFAULT_APPNAME] ||= GroupWatcher.new(@group.rate_period) - } - } - - if @group_watchers.dig(DEFAULT_NAMESPACE, DEFAULT_APPNAME).nil? - @group_watchers[DEFAULT_NAMESPACE] ||= {} - @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] = GroupWatcher.new(@group.rate_period) - end - end - - def find_group(namespace, appname) - namespace_key = @group_watchers.keys.find { |regexp| namespace.match?(regexp) && regexp != DEFAULT_NAMESPACE } - namespace_key ||= DEFAULT_NAMESPACE - - appname_key = @group_watchers[namespace_key].keys.find { |regexp| appname.match?(regexp) && regexp != DEFAULT_APPNAME } - appname_key ||= DEFAULT_APPNAME - - @group_watchers[namespace_key][appname_key] - end - - def find_group_from_metadata(path) - begin - metadata = @group.pattern.match(path) - group_watcher = find_group(metadata['namespace'], metadata['appname']) - rescue => e - $log.warn "Cannot find group from metadata, Adding file in the default group" - group_watcher = @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] - end - - group_watcher - end - - def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true) - targets_info.each_value { |target_info| - group_watcher = find_group_from_metadata(target_info.path) - group_watcher.delete(target_info.path) - } - super - end - - def setup_watcher(target_info, pe) - tw = super - group_watcher = find_group_from_metadata(target_info.path) - group_watcher.add(tw.path) unless group_watcher.include?(tw.path) - tw.group_watcher = group_watcher - - tw - rescue => e - raise e - end - - def detach_watcher_after_rotate_wait(tw, ino) - # Call event_loop_attach/event_loop_detach is high-cost for short-live object. - # If this has a problem with large number of files, use @_event_loop directly instead of timer_execute. - if @open_on_every_update - # Detach now because it's already closed, waiting it doesn't make sense. - detach_watcher(tw, ino) - elsif !tw.group_watcher.nil? && tw.group_watcher.limit <= 0 - # throttling isn't enabled, just wait @rotate_wait - timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do - detach_watcher(tw, ino) - end - else - # When the throttling feature is enabled, it might not reach EOF yet. - # Should ensure to read all contents before closing it, with keeping throttling. - start_time_to_wait = Fluent::Clock.now - timer = timer_execute(:in_tail_close_watcher, 1, repeat: true) do - elapsed = Fluent::Clock.now - start_time_to_wait - if tw.eof? && elapsed >= @rotate_wait - timer.detach - detach_watcher(tw, ino) - end - end - end - end - - class GroupWatcher - attr_accessor :current_paths, :limit, :number_lines_read, :start_reading_time, :rate_period - - FileCounter = Struct.new( - :number_lines_read, - :start_reading_time, - ) - - def initialize(rate_period = 60, limit = -1) - @current_paths = {} - @rate_period = rate_period - @limit = limit - end - - def add(path) - @current_paths[path] = FileCounter.new(0, nil) - end - - def include?(path) - @current_paths.key?(path) - end - - def size - @current_paths.size - end - - def delete(path) - @current_paths.delete(path) - end - - def update_reading_time(path) - @current_paths[path].start_reading_time ||= Fluent::Clock.now - end - - def update_lines_read(path, value) - @current_paths[path].number_lines_read += value - end - - def reset_counter(path) - @current_paths[path].start_reading_time = nil - @current_paths[path].number_lines_read = 0 - end - - def time_spent_reading(path) - Fluent::Clock.now - @current_paths[path].start_reading_time - end - - def limit_time_period_reached?(path) - time_spent_reading(path) < @rate_period - end - - def limit_lines_reached?(path) - return true unless include?(path) - return true if @limit == 0 - - return false if @limit < 0 - return false if @current_paths[path].number_lines_read < @limit / size - - # update_reading_time(path) - if limit_time_period_reached?(path) # Exceeds limit - true - else # Does not exceed limit - reset_counter(path) - false - end - end - - def to_s - super + " current_paths: #{@current_paths} rate_period: #{@rate_period} limit: #{@limit}" - end - end - - class Fluent::Plugin::TailInput::TailWatcher - attr_accessor :group_watcher - - def group_watcher=(group_watcher) - @group_watcher = group_watcher - end - - - class Fluent::Plugin::TailInput::TailWatcher::IOHandler - alias_method :orig_handle_notify, :handle_notify - - def group_watcher - @watcher.group_watcher - end - - def handle_notify - if group_watcher.nil? - orig_handle_notify - else - rate_limit_handle_notify - end - end - - def rate_limit_handle_notify - return if group_watcher.limit_lines_reached?(@path) - - with_io do |io| - begin - read_more = false - - if !io.nil? && @lines.empty? - begin - while true - group_watcher.update_reading_time(@path) - data = io.readpartial(BYTES_TO_READ, @iobuf) - @eof = false - @fifo << data - group_watcher.update_lines_read(@path, -@lines.size) - @fifo.read_lines(@lines) - group_watcher.update_lines_read(@path, @lines.size) - - if group_watcher.limit_lines_reached?(@path) || should_shutdown_now? - # Just get out from tailing loop. - @log.info "Read limit exceeded #{@path}" if !should_shutdown_now? - read_more = false - break - elsif @lines.size >= @read_lines_limit - # not to use too much memory in case the file is very large - read_more = true - break - end - end - rescue EOFError - @eof = true - end - end - @log.debug "Lines read: #{@path} #{group_watcher.current_paths[@path].number_lines_read}" - - unless @lines.empty? - if @receive_lines.call(@lines, @watcher) - @watcher.pe.update_pos(io.pos - @fifo.bytesize) - @lines.clear - else - read_more = false - end - end - end while read_more - end - end - end - end - end -end \ No newline at end of file diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index c96e72f558..05a320ac60 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -91,12 +91,12 @@ def create_target_info(path) TMP_DIR = File.dirname(__FILE__) + "/../tmp/tail#{ENV['TEST_ENV_NUMBER']}" - CONFIG = config_element("ROOT", "", { - "path" => "#{TMP_DIR}/tail.txt", + ROOT_CONFIG = config_element("ROOT", "", { "tag" => "t1", "rotate_wait" => "2s", "refresh_interval" => "1s" }) + CONFIG = ROOT_CONFIG + config_element("", "", { "path" => "#{TMP_DIR}/tail.txt" }) COMMON_CONFIG = CONFIG + config_element("", "", { "pos_file" => "#{TMP_DIR}/tail.pos" }) CONFIG_READ_FROM_HEAD = config_element("", "", { "read_from_head" => true }) CONFIG_DISABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false }) @@ -143,6 +143,30 @@ def create_target_info(path) "format_firstline" => "/^[s]/" }) ]) + + PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" + + def create_group_directive(pattern, rate_period, *rules) + config_element("", "", {}, [ + config_element("group", "", { + "pattern" => pattern, + "rate_period" => rate_period + }, rules) + ]) + end + + def create_rule_directive(namespace = [], appname = [], limit) + params = { + "limit" => limit, + } + params["namespace"] = namespace.join(', ') if namespace.size > 0 + params["appname"] = appname.join(', ') if appname.size > 0 + config_element("rule", "", params) + end + + def create_path_element(path) + config_element("source", "", { "path" => "#{TMP_DIR}/#{path}" }) + end def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) config = use_common_conf ? COMMON_CONFIG + conf : conf @@ -150,6 +174,34 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) end sub_test_case "configure" do + test " required" do + conf = create_group_directive('.', '1m') + SINGLE_LINE_CONFIG + assert_raise(Fluent::ConfigError) do + d = create_driver(conf) + end + end + + test "valid configuration" do + rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], 100) + rule2 = create_rule_directive(['namespace-d', 'appname-e'], ['f'], 50) + rule3 = create_rule_directive([], ['appname-g'], -1) + rule4 = create_rule_directive(['appname-h'], [], 0) + + conf = create_group_directive('.', '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG + assert_nothing_raised do + d = create_driver(conf) + end + end + + test "limit should be greater than DEFAULT_LIMIT (-1)" do + rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], -100) + rule2 = create_rule_directive(['namespace-d', 'namespace-e'], ['appname-f'], 50) + conf = create_group_directive('.', '1m', rule1, rule2) + SINGLE_LINE_CONFIG + assert_raise(RuntimeError) do + d = create_driver(conf) + end + end + test "plain single line" do d = create_driver assert_equal ["#{TMP_DIR}/tail.txt"], d.instance.paths @@ -263,6 +315,73 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) end end + sub_test_case "group rules line limit resolution" do + + test "valid" do + rule1 = create_rule_directive(['namespace-a'], ['appname-b', 'appname-c'], 50) + rule2 = create_rule_directive([], ['appname-b', 'appname-c'], 400) + rule3 = create_rule_directive(['namespace-a'], [], 100) + + conf = create_group_directive('.', '1m', rule1, rule2, rule3) + SINGLE_LINE_CONFIG + assert_nothing_raised do + d = create_driver(conf) + + assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-b/].limit + assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-c/].limit + assert_equal 100, d.instance.group_watchers[/namespace\-a/][/./].limit + assert_equal 200, d.instance.group_watchers[/./][/appname\-b/].limit + assert_equal 200, d.instance.group_watchers[/./][/appname\-c/].limit + assert_equal -1, d.instance.group_watchers[/./][/./].limit + end + end + + end + + sub_test_case "files should be placed in groups" do + test "invalid regex pattern places files in default group" do + rule1 = create_rule_directive([], [], 100) ## limits default groups + conf = ROOT_CONFIG + create_group_directive('.', '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG + + d = create_driver(conf, false) + File.open("#{TMP_DIR}/test1.txt", 'w') + File.open("#{TMP_DIR}/test2.txt", 'w') + File.open("#{TMP_DIR}/test3.txt", 'w') + + d.run do + ## checking default group_watcher's paths + assert_equal 3, d.instance.group_watchers[/./][/./].size + assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test1.txt') + assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test2.txt') + assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test3.txt') + end + end + + test "valid regex pattern places file in their respective groups" do + rule1 = create_rule_directive(['test-namespace1'], ['test-appname1'], 100) + rule2 = create_rule_directive(['test-namespace1'], [], 200) + rule3 = create_rule_directive([], ['test-appname2'], 100) + rule4 = create_rule_directive([], [], 100) + + path_element = create_path_element("test-appname*.log") + + conf = ROOT_CONFIG + create_group_directive(PATTERN, '1m', rule1, rule2, rule3, rule4) + path_element + SINGLE_LINE_CONFIG + d = create_driver(conf, false) + + File.open("#{TMP_DIR}/test-appname1_test-namespace1_test-container-15fabq.log", 'w') + File.open("#{TMP_DIR}/test-appname3_test-namespace1_test-container-15fabq.log", 'w') + File.open("#{TMP_DIR}/test-appname2_test-namespace2_test-container-15fabq.log", 'w') + File.open("#{TMP_DIR}/test-appname4_test-namespace3_test-container-15fabq.log", 'w') + + d.run do + assert_true d.instance.group_watchers[/test\-namespace1/][/test\-appname1/].include? File.join(TMP_DIR, "test-appname1_test-namespace1_test-container-15fabq.log") + assert_true d.instance.group_watchers[/test\-namespace1/][/./].include? File.join(TMP_DIR, "test-appname3_test-namespace1_test-container-15fabq.log") + assert_true d.instance.group_watchers[/./][/test\-appname2/].include? File.join(TMP_DIR, "test-appname2_test-namespace2_test-container-15fabq.log") + assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, "test-appname4_test-namespace3_test-container-15fabq.log") + end + end + + end + sub_test_case "singleline" do data(flat: SINGLE_LINE_CONFIG, parse: PARSE_SINGLE_LINE_CONFIG) @@ -2360,4 +2479,82 @@ def test_shutdown_timeout elapsed = Fluent::Clock.now - start_time assert_true(elapsed > 0.5 && elapsed < 2.5) end + + sub_test_case "throttling logs at in_tail level" do + + data("file test1.log no limit 5120 text: msg" => ["test1.log", 5120, "msg"], + "file test2.log no limit 1024 text: test" => ["test2.log", 1024, "test"]) + def test_lines_collected_with_no_throttling(data) + file, num_lines, msg = data + rule = create_rule_directive([], [], -1) + path_element = create_path_element(file) + + conf = ROOT_CONFIG + create_group_directive('.', '10s', rule) + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + File.open("#{TMP_DIR}/#{file}", 'wb') do |f| + num_lines.times do + f.puts "#{msg}\n" + end + end + + + d = create_driver(conf, false) + d.run do + start_time = Fluent::Clock.now + + assert_true Fluent::Clock.now - start_time < 10 + assert_equal num_lines, d.record_count + assert_equal({ "message" => msg }, d.events[0][2]) + + prev_count = d.record_count + ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver + sleep(1) until Fluent::Clock.now - start_time > 12 + ## after waiting for 10 secs, limit will reset + ## Plugin will start reading but it will encounter EOF Error + ## since no logs are left to be read + ## Hence, d.record_count = prev_count + assert_equal 0, d.record_count - prev_count + end + end + + test "lines collected with throttling" do + file = "appname1_namespace12_container-123456.log" + limit = 1000 + rate_period = '10s' + num_lines = 3000 + msg = "a"*8190 # Total size = 8190 bytes + 2 (\n) bytes + + rule = create_rule_directive(['namespace'], ['appname'], limit) + path_element = create_path_element(file) + conf = ROOT_CONFIG + create_group_directive(PATTERN, rate_period, rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD + + d = create_driver(conf, false) + + File.open("#{TMP_DIR}/#{file}", 'wb') do |f| + num_lines.times do + f.puts msg + end + end + + d.run do + start_time = Fluent::Clock.now + prev_count = 0 + + 3.times do + assert_true Fluent::Clock.now - start_time < 10 + ## Check record_count after 10s to check lines reads + assert_equal limit, d.record_count - prev_count + prev_count = d.record_count + ## sleep until rate_period seconds are over so that + ## Plugin can read lines again + sleep(1) until Fluent::Clock.now - start_time > 12 + ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver + start_time = Fluent::Clock.now + end + ## When all the lines are read and rate_period seconds are over + ## limit will reset and since there are no more logs to be read, + ## number_lines_read will be 0 + assert_equal 0, d.instance.group_watchers[/namespace/][/appname/].current_paths["#{TMP_DIR}/#{file}"].number_lines_read + end + end + end end diff --git a/test/plugin/test_in_tail_with_throttle.rb b/test/plugin/test_in_tail_with_throttle.rb deleted file mode 100644 index daaa4c9161..0000000000 --- a/test/plugin/test_in_tail_with_throttle.rb +++ /dev/null @@ -1,308 +0,0 @@ -require_relative '../helper' - -require 'fluent/test' -require 'fluent/test/helpers' -require 'fluent/test/driver/input' -require 'fluent/plugin/in_tail_with_throttle' - -class ThrottleInputTest < Test::Unit::TestCase - - def setup - Fluent::Test.setup - cleanup_directory(TMP_DIR) - end - - def teardown - super - cleanup_directory(TMP_DIR) - Fluent::Engine.stop - end - - def cleanup_directory(path) - unless Dir.exist?(path) - FileUtils.mkdir_p(path) - return - end - - if Fluent.windows? - Dir.glob("*", base: path).each do |name| - begin - cleanup_file(File.join(path, name)) - rescue - # expect test driver block release already owned file handle. - end - end - else - begin - FileUtils.rm_f(path, secure:true) - rescue ArgumentError - FileUtils.rm_f(path) # For Ruby 2.6 or before. - end - if File.exist?(path) - FileUtils.remove_entry_secure(path, true) - end - end - FileUtils.mkdir_p(path) - end - - def cleanup_file(path) - if Fluent.windows? - # On Windows, when the file or directory is removed and created - # frequently, there is a case that creating file or directory will - # fail. This situation is caused by pending file or directory - # deletion which is mentioned on win32 API document [1] - # As a workaround, execute rename and remove method. - # - # [1] https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#files - # - file = File.join(Dir.tmpdir, SecureRandom.hex(10)) - begin - FileUtils.mv(path, file) - FileUtils.rm_rf(file, secure: true) - rescue ArgumentError - FileUtils.rm_rf(file) # For Ruby 2.6 or before. - end - if File.exist?(file) - # ensure files are closed for Windows, on which deleted files - # are still visible from filesystem - GC.start(full_mark: true, immediate_mark: true, immediate_sweep: true) - FileUtils.remove_entry_secure(file, true) - end - else - begin - FileUtils.rm_f(path, secure: true) - rescue ArgumentError - FileUtils.rm_f(path) # For Ruby 2.6 or before. - end - if File.exist?(path) - FileUtils.remove_entry_secure(path, true) - end - end - end - - TMP_DIR = File.dirname(__FILE__) + "/../tmp/tail_with_throttle#{ENV['TEST_ENV_NUMBER']}" - - def create_group_directive(pattern, rate_period, *rules) - config_element("", "", {}, [ - config_element("group", "", { - "pattern" => pattern, - "rate_period" => rate_period - }, rules) - ]) - end - - def create_rule_directive(namespace = [], appname = [], limit) - params = { - "limit" => limit, - } - params["namespace"] = namespace.join(', ') if namespace.size > 0 - params["appname"] = appname.join(', ') if appname.size > 0 - config_element("rule", "", params) - end - - def create_path_element(path) - config_element("source", "", { "path" => "#{TMP_DIR}/#{path}" }) - end - - def create_driver(conf, add_path = true) - conf = add_path ? conf + create_path_element("tail.txt") : conf - Fluent::Test::Driver::Input.new(Fluent::Plugin::ThrottleInput).configure(conf) - end - - CONFG = config_element("source", "", { - "@type" => "tail_with_throttle", - "tag" => "t1", - "pos_file" => "#{TMP_DIR}/tail.pos", - "refresh_interval" => "1s", - "rotate_wait" => "2s", - }, [ - config_element("parse", "", { - "@type" => "/(?.*)/" }) - ] - ) - PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" - READ_FROM_HEAD = config_element("source", "", { "read_from_head" => "true" }) - - sub_test_case "#configuration" do - - test " required" do - assert_raise(Fluent::ConfigError) do - d = create_driver(CONFG) - end - end - - test " required" do - conf = CONFG + create_group_directive('.', '1m') - assert_raise(Fluent::ConfigError) do - d = create_driver(conf) - end - end - - test "valid configuration" do - rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], 100) - rule2 = create_rule_directive(['namespace-d', 'appname-e'], ['f'], 50) - rule3 = create_rule_directive([], ['appname-g'], -1) - rule4 = create_rule_directive(['appname-h'], [], 0) - - conf = CONFG + create_group_directive('.', '1m', rule1, rule2, rule3, rule4) - assert_nothing_raised do - d = create_driver(conf) - end - end - - test "limit should be greater than DEFAULT_LIMIT (-1)" do - rule1 = create_rule_directive(['namespace-a'], ['appname-b','appname-c'], -100) - rule2 = create_rule_directive(['namespace-d', 'namespace-e'], ['appname-f'], 50) - conf = CONFG + create_group_directive('.', '1m', rule1, rule2) - assert_raise(RuntimeError) do - d = create_driver(conf) - end - end - - end - - sub_test_case "group rules line limit resolution" do - - test "valid" do - rule1 = create_rule_directive(['namespace-a'], ['appname-b', 'appname-c'], 50) - rule2 = create_rule_directive([], ['appname-b', 'appname-c'], 400) - rule3 = create_rule_directive(['namespace-a'], [], 100) - - conf = CONFG + create_group_directive('.', '1m', rule1, rule2, rule3) - assert_nothing_raised do - d = create_driver(conf) - - assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-b/].limit - assert_equal 25, d.instance.group_watchers[/namespace\-a/][/appname\-c/].limit - assert_equal 100, d.instance.group_watchers[/namespace\-a/][/./].limit - assert_equal 200, d.instance.group_watchers[/./][/appname\-b/].limit - assert_equal 200, d.instance.group_watchers[/./][/appname\-c/].limit - assert_equal -1, d.instance.group_watchers[/./][/./].limit - end - end - - end - - sub_test_case "files should be placed in groups" do - test "invalid regex pattern places files in default group" do - rule1 = create_rule_directive([], [], 100) ## limits default groups - conf = CONFG + create_group_directive('.', '1m', rule1) + create_path_element("test*.txt") - - d = create_driver(conf, false) - File.open("#{TMP_DIR}/test1.txt", 'w') - File.open("#{TMP_DIR}/test2.txt", 'w') - File.open("#{TMP_DIR}/test3.txt", 'w') - - d.run do - ## checking default group_watcher's paths - assert_equal 3, d.instance.group_watchers[/./][/./].size - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test1.txt') - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test2.txt') - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, 'test3.txt') - end - end - - test "valid regex pattern places file in their respective groups" do - rule1 = create_rule_directive(['test-namespace1'], ['test-appname1'], 100) - rule2 = create_rule_directive(['test-namespace1'], [], 200) - rule3 = create_rule_directive([], ['test-appname2'], 100) - rule4 = create_rule_directive([], [], 100) - - path_element = create_path_element("test-appname*.log") - - conf = CONFG + create_group_directive(PATTERN, '1m', rule1, rule2, rule3, rule4) + path_element - d = create_driver(conf, false) - - File.open("#{TMP_DIR}/test-appname1_test-namespace1_test-container-15fabq.log", 'w') - File.open("#{TMP_DIR}/test-appname3_test-namespace1_test-container-15fabq.log", 'w') - File.open("#{TMP_DIR}/test-appname2_test-namespace2_test-container-15fabq.log", 'w') - File.open("#{TMP_DIR}/test-appname4_test-namespace3_test-container-15fabq.log", 'w') - - d.run do - assert_true d.instance.group_watchers[/test\-namespace1/][/test\-appname1/].include? File.join(TMP_DIR, "test-appname1_test-namespace1_test-container-15fabq.log") - assert_true d.instance.group_watchers[/test\-namespace1/][/./].include? File.join(TMP_DIR, "test-appname3_test-namespace1_test-container-15fabq.log") - assert_true d.instance.group_watchers[/./][/test\-appname2/].include? File.join(TMP_DIR, "test-appname2_test-namespace2_test-container-15fabq.log") - assert_true d.instance.group_watchers[/./][/./].include? File.join(TMP_DIR, "test-appname4_test-namespace3_test-container-15fabq.log") - end - end - - end - - sub_test_case "throttling logs at in_tail level" do - - data("file test1.log no limit 5120 text: msg" => ["test1.log", 5120, "msg"], - "file test2.log no limit 1024 text: test" => ["test2.log", 1024, "test"]) - def test_lines_collected_with_no_throttling(data) - file, num_lines, msg = data - rule = create_rule_directive([], [], -1) - path_element = create_path_element(file) - - conf = CONFG + create_group_directive('.', '10s', rule) + path_element + READ_FROM_HEAD - File.open("#{TMP_DIR}/#{file}", 'wb') do |f| - num_lines.times do - f.puts "#{msg}\n" - end - end - - - d = create_driver(conf, false) - d.run do - start_time = Time.now - - assert_true Time.now - start_time < 10 - assert_equal num_lines, d.record_count - assert_equal({ "message" => msg }, d.events[0][2]) - - prev_count = d.record_count - ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver - sleep(1) until Time.now - start_time > 12 - ## after waiting for 10 secs, limit will reset - ## Plugin will start reading but it will encounter EOF Error - ## since no logs are left to be read - ## Hence, d.record_count = prev_count - assert_equal 0, d.record_count - prev_count - end - end - - test "lines collected with throttling" do - file = "appname1_namespace12_container-123456.log" - limit = 1000 - rate_period = '10s' - num_lines = 3000 - msg = "a"*8190 # Total size = 8190 bytes + 2 (\n) bytes - - rule = create_rule_directive(['namespace'], ['appname'], limit) - path_element = create_path_element(file) - conf = CONFG + create_group_directive(PATTERN, rate_period, rule) + path_element + READ_FROM_HEAD - - d = create_driver(conf, false) - - File.open("#{TMP_DIR}/#{file}", 'wb') do |f| - num_lines.times do - f.puts msg - end - end - - d.run do - start_time = Time.now - prev_count = 0 - - 3.times do - assert_true Time.now - start_time < 10 - ## Check record_count after 10s to check lines reads - assert_equal limit, d.record_count - prev_count - prev_count = d.record_count - ## sleep until rate_period seconds are over so that - ## Plugin can read lines again - sleep(1) until Time.now - start_time > 12 - ## waiting for atleast 12 seconds to avoid any sync errors between plugin and test driver - start_time = Time.now - end - ## When all the lines are read and rate_period seconds are over - ## limit will reset and since there are no more logs to be read, - ## number_lines_read will be 0 - assert_equal 0, d.instance.group_watchers[/namespace/][/appname/].current_paths["#{TMP_DIR}/#{file}"].number_lines_read - end - end - end -end