Skip to content

Commit

Permalink
fix #263: use filter in lieu of filter_stream as recommended optimiza…
Browse files Browse the repository at this point in the history
…tion (#276)
  • Loading branch information
jcantrill committed Jun 24, 2022
1 parent 3c95be5 commit c6fd9d2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 44 deletions.
62 changes: 27 additions & 35 deletions lib/fluent/plugin/filter_kubernetes_metadata.rb
Expand Up @@ -355,44 +355,36 @@ def get_metadata_for_record(namespace_name, pod_name, container_name, cache_key,
metadata
end

def filter_stream(tag, es)
return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream)
new_es = Fluent::MultiEventStream.new
def filter(tag, time, record)
tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) unless @use_journal
tag_metadata = nil
batch_miss_cache = {}
es.each do |time, record|
if tag_match_data && tag_metadata.nil?
cache_key = if tag_match_data.names.include?('pod_uuid') && !tag_match_data['pod_uuid'].nil?
tag_match_data['pod_uuid']
else
tag_match_data['docker_id']
end
docker_id = tag_match_data.names.include?('docker_id') ? tag_match_data['docker_id'] : nil
tag_metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'],
cache_key, create_time_from_record(record, time), batch_miss_cache, docker_id)
end
metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata
if (@use_journal || @use_journal.nil?) &&
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
metadata = j_metadata
end
if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') &&
record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
record['kubernetes'].key?('namespace_name') &&
record['kubernetes'].key?('pod_name') &&
record['kubernetes'].key?('container_name') &&
record['docker'].key?('container_id') &&
(k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'],
record['kubernetes']['container_name'], record['docker']['container_id'],
create_time_from_record(record, time), batch_miss_cache, record['docker']['container_id']))
metadata = k_metadata
end
record = record.merge(metadata) if metadata
new_es.add(time, record)
if tag_match_data
cache_key = if tag_match_data.names.include?('pod_uuid') && !tag_match_data['pod_uuid'].nil?
tag_match_data['pod_uuid']
else
tag_match_data['docker_id']
end
docker_id = tag_match_data.names.include?('docker_id') ? tag_match_data['docker_id'] : nil
tag_metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'],
cache_key, create_time_from_record(record, time), batch_miss_cache, docker_id)
end
metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata
if (@use_journal || @use_journal.nil?) &&
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
metadata = j_metadata
end
if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') &&
record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
record['kubernetes'].key?('namespace_name') &&
record['kubernetes'].key?('pod_name') &&
record['kubernetes'].key?('container_name') &&
record['docker'].key?('container_id') &&
(k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'],
record['kubernetes']['container_name'], record['docker']['container_id'],
create_time_from_record(record, time), batch_miss_cache, record['docker']['container_id']))
metadata = k_metadata
end
dump_stats
new_es
metadata ? record.merge(metadata) : record
end

def get_metadata_for_journal_record(record, time, batch_miss_cache)
Expand Down
9 changes: 0 additions & 9 deletions test/plugin/test_filter_kubernetes_metadata.rb
Expand Up @@ -156,15 +156,6 @@ def emit_with_tag(tag, msg = {}, config = '
d.filtered.map(&:last)
end

test 'nil event stream' do
# not certain how this is possible but adding test to properly
# guard against this condition we have seen - test for nil,
# empty, no empty method, not an event stream
plugin = create_driver.instance
plugin.filter_stream('tag', nil)
plugin.filter_stream('tag', Fluent::MultiEventStream.new)
end

sub_test_case 'parsing_pod_metadata when container_status is missing from the pod status' do
test 'using the tag_to_kubernetes_name_regexp for /var/log/containers ' do
VCR.use_cassettes(
Expand Down

0 comments on commit c6fd9d2

Please sign in to comment.