Skip to content

Commit

Permalink
fix #263: use filter in lieu of filter_stream as recommended optimiza…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
jcantrill committed Jan 23, 2021
1 parent 7ba2d62 commit ff6af60
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 33 deletions.
38 changes: 17 additions & 21 deletions lib/fluent/plugin/filter_kubernetes_metadata.rb
Expand Up @@ -308,23 +308,24 @@ def create_time_from_record(record, internal_time)
return Time.parse(time)
end

def filter_stream(tag, es)
return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream)
new_es = Fluent::MultiEventStream.new
def filter(tag, time, record)
tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) unless @use_journal
tag_metadata = nil
batch_miss_cache = {}
es.each do |time, record|
if tag_match_data && tag_metadata.nil?
tag_metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'],
tag_match_data['docker_id'], create_time_from_record(record, time), batch_miss_cache)
end
metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata
if (@use_journal || @use_journal.nil?) &&
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
metadata = j_metadata
end
if @lookup_from_k8s_field && record.has_key?('kubernetes') && record.has_key?('docker') &&
tag_metadata = get_metadata_for_record(
tag_match_data['namespace'],
tag_match_data['pod_name'],
tag_match_data['container_name'],
tag_match_data['docker_id'],
create_time_from_record(record, time),
batch_miss_cache
) if tag_match_data

metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata
if (@use_journal || @use_journal.nil?) &&
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
metadata = j_metadata
end
if @lookup_from_k8s_field && record.has_key?('kubernetes') && record.has_key?('docker') &&
record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
record['kubernetes'].has_key?('namespace_name') &&
record['kubernetes'].has_key?('pod_name') &&
Expand All @@ -334,13 +335,8 @@ def filter_stream(tag, es)
record['kubernetes']['container_name'], record['docker']['container_id'],
create_time_from_record(record, time), batch_miss_cache))
metadata = k_metadata
end

record = record.merge(metadata) if metadata
new_es.add(time, record)
end
dump_stats
new_es
metadata ? record.merge(metadata) : record
end

def get_metadata_for_journal_record(record, time, batch_miss_cache)
Expand Down
15 changes: 3 additions & 12 deletions test/plugin/test_filter_kubernetes_metadata.rb
Expand Up @@ -128,7 +128,7 @@ def create_driver(conf = '')
end
end

sub_test_case 'filter_stream' do
sub_test_case 'filter' do

def emit(msg={}, config='
kubernetes_url https://localhost:8443
Expand All @@ -149,18 +149,9 @@ def emit_with_tag(tag, msg={}, config='
')
d = create_driver(config)
d.run(default_tag: tag) {
d.feed(@time, msg)
d.feed(msg)
}
d.filtered.map{|e| e.last}
end

test 'nil event stream' do
#not certain how this is possible but adding test to properly
#guard against this condition we have seen - test for nil,
#empty, no empty method, not an event stream
plugin = create_driver.instance
plugin.filter_stream('tag', nil)
plugin.filter_stream('tag', Fluent::MultiEventStream.new)
d.filtered_records
end

test 'inability to connect to the api server handles exception and doensnt block pipeline' do
Expand Down

0 comments on commit ff6af60

Please sign in to comment.