Skip to content

Commit

Permalink
fix #263: use filter in lieu of filter_stream as recommended optimiza…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
jcantrill committed Jan 23, 2021
1 parent a4449e0 commit dbe6557
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 41 deletions.
57 changes: 27 additions & 30 deletions lib/fluent/plugin/filter_kubernetes_metadata.rb
Expand Up @@ -309,40 +309,37 @@ def create_time_from_record(record, internal_time)
Time.parse(time)
end

def filter_stream(tag, es)
return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream)

new_es = Fluent::MultiEventStream.new
def filter(tag, time, record)
tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) unless @use_journal
tag_metadata = nil
batch_miss_cache = {}
es.each do |time, record|
if tag_match_data && tag_metadata.nil?
tag_metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'],
tag_match_data['docker_id'], create_time_from_record(record, time), batch_miss_cache)
end
metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata
if (@use_journal || @use_journal.nil?) &&
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
metadata = j_metadata
end
if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') &&
record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
record['kubernetes'].key?('namespace_name') &&
record['kubernetes'].key?('pod_name') &&
record['kubernetes'].key?('container_name') &&
record['docker'].key?('container_id') &&
(k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'],
record['kubernetes']['container_name'], record['docker']['container_id'],
create_time_from_record(record, time), batch_miss_cache))
metadata = k_metadata
end
if tag_match_data
tag_metadata = get_metadata_for_record(
tag_match_data['namespace'],
tag_match_data['pod_name'],
tag_match_data['container_name'],
tag_match_data['docker_id'],
create_time_from_record(record, time),
batch_miss_cache
)
end

record = record.merge(metadata) if metadata
new_es.add(time, record)
metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata
if (@use_journal || @use_journal.nil?) &&
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
metadata = j_metadata
end
if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') &&
record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
record['kubernetes'].key?('namespace_name') &&
record['kubernetes'].key?('pod_name') &&
record['kubernetes'].key?('container_name') &&
record['docker'].key?('container_id') &&
(k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'],
record['kubernetes']['container_name'], record['docker']['container_id'],
create_time_from_record(record, time), batch_miss_cache))
metadata = k_metadata
end
dump_stats
new_es
metadata ? record.merge(metadata) : record
end

def get_metadata_for_journal_record(record, time, batch_miss_cache)
Expand Down
13 changes: 2 additions & 11 deletions test/plugin/test_filter_kubernetes_metadata.rb
Expand Up @@ -145,18 +145,9 @@ def emit_with_tag(tag, msg = {}, config = '
')
d = create_driver(config)
d.run(default_tag: tag) do
d.feed(@time, msg)
d.feed(msg)
end
d.filtered.map(&:last)
end

test 'nil event stream' do
# not certain how this is possible but adding test to properly
# guard against this condition we have seen - test for nil,
# empty, no empty method, not an event stream
plugin = create_driver.instance
plugin.filter_stream('tag', nil)
plugin.filter_stream('tag', Fluent::MultiEventStream.new)
d.filtered_records
end

test 'inability to connect to the api server handles exception and doensnt block pipeline' do
Expand Down

0 comments on commit dbe6557

Please sign in to comment.