From 0f95cab150151c43fe88a456060c408bab78c326 Mon Sep 17 00:00:00 2001 From: Jeff Cantrill Date: Fri, 24 Jun 2022 14:59:52 -0400 Subject: [PATCH] fix #263: use filter in lieu of filter_stream as recommended optimization --- .../plugin/filter_kubernetes_metadata.rb | 62 ++++++++----------- .../plugin/test_filter_kubernetes_metadata.rb | 9 --- 2 files changed, 27 insertions(+), 44 deletions(-) diff --git a/lib/fluent/plugin/filter_kubernetes_metadata.rb b/lib/fluent/plugin/filter_kubernetes_metadata.rb index 71db3ba..70a781c 100644 --- a/lib/fluent/plugin/filter_kubernetes_metadata.rb +++ b/lib/fluent/plugin/filter_kubernetes_metadata.rb @@ -355,44 +355,36 @@ def get_metadata_for_record(namespace_name, pod_name, container_name, cache_key, metadata end - def filter_stream(tag, es) - return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream) - new_es = Fluent::MultiEventStream.new + def filter(tag, time, record) tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) unless @use_journal - tag_metadata = nil batch_miss_cache = {} - es.each do |time, record| - if tag_match_data && tag_metadata.nil? - cache_key = if tag_match_data.names.include?('pod_uuid') && !tag_match_data['pod_uuid'].nil? - tag_match_data['pod_uuid'] - else - tag_match_data['docker_id'] - end - docker_id = tag_match_data.names.include?('docker_id') ? tag_match_data['docker_id'] : nil - tag_metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'], - cache_key, create_time_from_record(record, time), batch_miss_cache, docker_id) - end - metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata - if (@use_journal || @use_journal.nil?) && - (j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache)) - metadata = j_metadata - end - if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') && - record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) && - record['kubernetes'].key?('namespace_name') && - record['kubernetes'].key?('pod_name') && - record['kubernetes'].key?('container_name') && - record['docker'].key?('container_id') && - (k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'], - record['kubernetes']['container_name'], record['docker']['container_id'], - create_time_from_record(record, time), batch_miss_cache, record['docker']['container_id'])) - metadata = k_metadata - end - record = record.merge(metadata) if metadata - new_es.add(time, record) + if tag_match_data + cache_key = if tag_match_data.names.include?('pod_uuid') && !tag_match_data['pod_uuid'].nil? + tag_match_data['pod_uuid'] + else + tag_match_data['docker_id'] + end + docker_id = tag_match_data.names.include?('docker_id') ? tag_match_data['docker_id'] : nil + tag_metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'], + cache_key, create_time_from_record(record, time), batch_miss_cache, docker_id) + end + metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata + if (@use_journal || @use_journal.nil?) && + (j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache)) + metadata = j_metadata + end + if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') && + record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) && + record['kubernetes'].key?('namespace_name') && + record['kubernetes'].key?('pod_name') && + record['kubernetes'].key?('container_name') && + record['docker'].key?('container_id') && + (k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'], + record['kubernetes']['container_name'], record['docker']['container_id'], + create_time_from_record(record, time), batch_miss_cache, record['docker']['container_id'])) + metadata = k_metadata end - dump_stats - new_es + metadata ? record.merge(metadata) : record end def get_metadata_for_journal_record(record, time, batch_miss_cache) diff --git a/test/plugin/test_filter_kubernetes_metadata.rb b/test/plugin/test_filter_kubernetes_metadata.rb index 37a1a09..9b4e267 100644 --- a/test/plugin/test_filter_kubernetes_metadata.rb +++ b/test/plugin/test_filter_kubernetes_metadata.rb @@ -156,15 +156,6 @@ def emit_with_tag(tag, msg = {}, config = ' d.filtered.map(&:last) end - test 'nil event stream' do - # not certain how this is possible but adding test to properly - # guard against this condition we have seen - test for nil, - # empty, no empty method, not an event stream - plugin = create_driver.instance - plugin.filter_stream('tag', nil) - plugin.filter_stream('tag', Fluent::MultiEventStream.new) - end - sub_test_case 'parsing_pod_metadata when container_status is missing from the pod status' do test 'using the tag_to_kubernetes_name_regexp for /var/log/containers ' do VCR.use_cassettes(