Skip to content

Commit

Permalink
in_tail: Add group.match parameter for specifying generic rules
Browse files Browse the repository at this point in the history
Signed-off-by: Pranjal Gupta <pranjal.gupta2@ibm.com>
  • Loading branch information
Pranjal-Gupta2 committed Feb 6, 2022
1 parent d72b164 commit 13fc410
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 110 deletions.
107 changes: 53 additions & 54 deletions lib/fluent/plugin/in_tail.rb
Expand Up @@ -40,8 +40,9 @@ class TailInput < Fluent::Plugin::Input
RESERVED_CHARS = ['/', '*', '%'].freeze
MetricsInfo = Struct.new(:opened, :closed, :rotated)

DEFAULT_NAMESPACE = DEFAULT_APPNAME = /./
DEFAULT_KEY = /.*/
DEFAULT_LIMIT = -1
REGEXP_JOIN = "_"

class WatcherSetupError < StandardError
def initialize(msg)
Expand All @@ -65,7 +66,8 @@ def initialize
@startup = true
# Map rules with GroupWatcher objects
@group_watchers = {}
@sorted_path = nil
@group_keys = nil
@default_group_key = nil
end

desc 'The paths to read. Multiple paths can be specified, separated by comma.'
Expand Down Expand Up @@ -129,22 +131,21 @@ def initialize
config_section :group, param_name: :group, required: false, multi: false do
desc 'Regex for extracting group\'s metadata'
config_param :pattern,
:regexp,
default: /var\/log\/containers\/(?<appname>[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace>[^_]+)_(?<container>.+)-(?<docker_id>[a-z0-9]{64})\.log$/
:regexp,
default: /^\/var\/log\/containers\/(?<podname>[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace>[^_]+)_(?<container>.+)-(?<docker_id>[a-z0-9]{64})\.log$/

desc 'Period of time in which the group_line_limit is applied'
config_param :rate_period, :time, default: 5

config_section :rule, multi: true, required: true do
desc 'Namespace key'
config_param :namespace, :array, value_type: :string, default: [DEFAULT_NAMESPACE]
desc 'App name key'
config_param :appname, :array, value_type: :string, default: [DEFAULT_APPNAME]
config_section :rule, param_name: :rule, required: true, multi: true do
desc 'Key-value pairs for grouping'
config_param :match, :hash, value_type: :regexp, default: {namespace: [DEFAULT_KEY], appname: [DEFAULT_KEY]}
desc 'Maximum number of log lines allowed per group over a period of rate_period'
config_param :limit, :integer, default: DEFAULT_LIMIT
end
end

attr_reader :paths, :group_watchers
attr_reader :paths, :group_watchers, :default_group_key

def configure(conf)
@variable_store = Fluent::VariableStore.fetch_or_build(:in_tail)
Expand Down Expand Up @@ -221,12 +222,22 @@ def configure(conf)
end
end

## Ensuring correct time period syntax
@group.rule.each { |rule|
raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= DEFAULT_LIMIT
} unless @group.nil?

construct_groupwatchers unless @group.nil?
unless @group.nil?
## Ensuring correct time period syntax
@group.rule.each { |rule|
raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= DEFAULT_LIMIT
}

@group_keys = Regexp.compile(@group.pattern).named_captures.keys
@default_group_key = ([DEFAULT_KEY] * @group_keys.length).join(REGEXP_JOIN)

## Ensures that "specific" rules (with larger number of `rule.match` keys)
## have a higher priority against "generic" rules (with less number of `rule.match` keys).
## This will be helpful when a file satisfies more than one rule.
@group.rule.sort_by!{ |rule| -rule.match.length() }
construct_groupwatchers
@group_watchers[@default_group_key] ||= GroupWatcher.new(@group.rate_period, DEFAULT_LIMIT)
end

opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files")
closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files")
Expand Down Expand Up @@ -267,46 +278,38 @@ def parse_encoding_param(encoding_name)
end
end

def construct_groupwatchers
@group.rule.each { |rule|
num_groups = rule.namespace.size * rule.appname.size

rule.namespace.each { |namespace|
namespace = /#{Regexp.quote(namespace)}/ unless namespace.eql?(DEFAULT_NAMESPACE)
@group_watchers[namespace] ||= {}

rule.appname.each { |appname|
appname = /#{Regexp.quote(appname)}/ unless appname.eql?(DEFAULT_APPNAME)
@group_watchers[namespace][appname] = GroupWatcher.new(@group.rate_period, rule.limit/num_groups)
}

@group_watchers[namespace][DEFAULT_APPNAME] ||= GroupWatcher.new(@group.rate_period)
}
def construct_group_key(named_captures)
match_rule = []
@group_keys.each { |key|
match_rule.append(named_captures.fetch(key, DEFAULT_KEY))
}
match_rule = match_rule.join(REGEXP_JOIN)

if @group_watchers.dig(DEFAULT_NAMESPACE, DEFAULT_APPNAME).nil?
@group_watchers[DEFAULT_NAMESPACE] ||= {}
@group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] = GroupWatcher.new(@group.rate_period)
end
match_rule
end

def find_group(namespace, appname)
namespace_key = @group_watchers.keys.find { |regexp| namespace.match?(regexp) && regexp != DEFAULT_NAMESPACE }
namespace_key ||= DEFAULT_NAMESPACE

appname_key = @group_watchers[namespace_key].keys.find { |regexp| appname.match?(regexp) && regexp != DEFAULT_APPNAME }
appname_key ||= DEFAULT_APPNAME
def construct_groupwatchers
@group.rule.each { |rule|
match_rule = construct_group_key(rule.match)
@group_watchers[match_rule] ||= GroupWatcher.new(@group.rate_period, rule.limit)
}
end

@group_watchers[namespace_key][appname_key]
def find_group(metadata)
metadata_key = construct_group_key(metadata)
gw_key = @group_watchers.keys.find{ |regexp| metadata_key.match?(regexp) && regexp != @default_group_key}
gw_key ||= @default_group_key

@group_watchers[gw_key]
end

def find_group_from_metadata(path)
begin
metadata = @group.pattern.match(path)
group_watcher = find_group(metadata['namespace'], metadata['appname'])
metadata = @group.pattern.match(path).named_captures
group_watcher = find_group(metadata)
rescue => e
$log.warn "Cannot find group from metadata, Adding file in the default group"
group_watcher = @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME]
log.warn "Cannot find group from metadata, Adding file in the default group"
group_watcher = @group_watchers[@default_group_key]
end

group_watcher
Expand Down Expand Up @@ -636,7 +639,7 @@ def detach_watcher_after_rotate_wait(tw, ino)
if @open_on_every_update
# Detach now because it's already closed, waiting it doesn't make sense.
detach_watcher(tw, ino)
elsif @read_bytes_limit_per_second < 0 || (!tw.group_watcher.nil? && tw.group_watcher.limit <= 0)
elsif @read_bytes_limit_per_second < 0 || tw.group_watcher&.limit <= 0
# throttling isn't enabled, just wait @rotate_wait
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
detach_watcher(tw, ino)
Expand Down Expand Up @@ -913,7 +916,6 @@ def to_s
end
end


class TailWatcher
def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics)
@path = target_info.path
Expand All @@ -938,10 +940,6 @@ def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watch
attr_reader :watchers
attr_accessor :group_watcher

def group_watcher=(group_watcher)
@group_watcher = group_watcher
end

def tag
@parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '')
end
Expand Down Expand Up @@ -1224,7 +1222,7 @@ def should_shutdown_now?

def handle_notify
return if limit_bytes_per_second_reached?
return if !group_watcher.nil? && group_watcher.limit_lines_reached?(@path)
return if group_watcher&.limit_lines_reached?(@path)

with_io do |io|
begin
Expand All @@ -1243,7 +1241,8 @@ def handle_notify
@fifo.read_lines(@lines)
group_watcher.update_lines_read(@path, @lines.size) unless group_watcher.nil?

if !group_watcher.nil? && group_watcher.limit_lines_reached?(@path) || should_shutdown_now?
if group_watcher&.limit_lines_reached?(@path) || should_shutdown_now?
@log.debug "Reading Limit exceeded #{@path} #{group_watcher.number_lines_read}" unless should_shutdown_now?
read_more = false
break
end
Expand Down

0 comments on commit 13fc410

Please sign in to comment.