From 1aaef8234e0a08d8999fca304f996b9cbca77fec Mon Sep 17 00:00:00 2001 From: Jeff Cantrill Date: Mon, 25 Jan 2021 11:26:16 -0500 Subject: [PATCH] fix #263: use filter in lieu of filter_stream as recommended optimization --- .../plugin/filter_kubernetes_metadata.rb | 57 +++++++++---------- .../plugin/test_filter_kubernetes_metadata.rb | 15 +---- 2 files changed, 30 insertions(+), 42 deletions(-) diff --git a/lib/fluent/plugin/filter_kubernetes_metadata.rb b/lib/fluent/plugin/filter_kubernetes_metadata.rb index 80a1b7d..dc62292 100644 --- a/lib/fluent/plugin/filter_kubernetes_metadata.rb +++ b/lib/fluent/plugin/filter_kubernetes_metadata.rb @@ -296,40 +296,37 @@ def get_metadata_for_record(namespace_name, pod_name, container_name, container_ metadata end - def filter_stream(tag, es) - return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream) - - new_es = Fluent::MultiEventStream.new + def filter(tag, time, record) tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) unless @use_journal - tag_metadata = nil batch_miss_cache = {} - es.each do |time, record| - if tag_match_data && tag_metadata.nil? - tag_metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'], - tag_match_data['docker_id'], create_time_from_record(record, time), batch_miss_cache) - end - metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata - if (@use_journal || @use_journal.nil?) && - (j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache)) - metadata = j_metadata - end - if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') && - record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) && - record['kubernetes'].key?('namespace_name') && - record['kubernetes'].key?('pod_name') && - record['kubernetes'].key?('container_name') && - record['docker'].key?('container_id') && - (k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'], - record['kubernetes']['container_name'], record['docker']['container_id'], - create_time_from_record(record, time), batch_miss_cache)) - metadata = k_metadata - end + if tag_match_data + tag_metadata = get_metadata_for_record( + tag_match_data['namespace'], + tag_match_data['pod_name'], + tag_match_data['container_name'], + tag_match_data['docker_id'], + create_time_from_record(record, time), + batch_miss_cache + ) + end - record = record.merge(metadata) if metadata - new_es.add(time, record) + metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata + if (@use_journal || @use_journal.nil?) && + (j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache)) + metadata = j_metadata + end + if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') && + record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) && + record['kubernetes'].key?('namespace_name') && + record['kubernetes'].key?('pod_name') && + record['kubernetes'].key?('container_name') && + record['docker'].key?('container_id') && + (k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'], + record['kubernetes']['container_name'], record['docker']['container_id'], + create_time_from_record(record, time), batch_miss_cache)) + metadata = k_metadata end - dump_stats - new_es + metadata ? record.merge(metadata) : record end def get_metadata_for_journal_record(record, time, batch_miss_cache) diff --git a/test/plugin/test_filter_kubernetes_metadata.rb b/test/plugin/test_filter_kubernetes_metadata.rb index 3067353..62acb1f 100644 --- a/test/plugin/test_filter_kubernetes_metadata.rb +++ b/test/plugin/test_filter_kubernetes_metadata.rb @@ -145,21 +145,12 @@ def emit_with_tag(tag, msg = {}, config = ' ') d = create_driver(config) d.run(default_tag: tag) do - d.feed(@time, msg) + d.feed(msg) end - d.filtered.map(&:last) - end - - test 'nil event stream' do - # not certain how this is possible but adding test to properly - # guard against this condition we have seen - test for nil, - # empty, no empty method, not an event stream - plugin = create_driver.instance - plugin.filter_stream('tag', nil) - plugin.filter_stream('tag', Fluent::MultiEventStream.new) + d.filtered_records end - test 'inability to connect to the api server handles exception and doensnt block pipeline' do + test 'inability to connect to the api server handles exception and does not block pipeline' do VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }]) do driver = create_driver(' kubernetes_url https://localhost:8443