diff --git a/lib/fluent/plugin/filter_kubernetes_metadata.rb b/lib/fluent/plugin/filter_kubernetes_metadata.rb index d0466ed..2ed1f55 100644 --- a/lib/fluent/plugin/filter_kubernetes_metadata.rb +++ b/lib/fluent/plugin/filter_kubernetes_metadata.rb @@ -308,23 +308,24 @@ def create_time_from_record(record, internal_time) return Time.parse(time) end - def filter_stream(tag, es) - return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream) - new_es = Fluent::MultiEventStream.new + def filter(tag, time, record) tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) unless @use_journal - tag_metadata = nil batch_miss_cache = {} - es.each do |time, record| - if tag_match_data && tag_metadata.nil? - tag_metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'], - tag_match_data['docker_id'], create_time_from_record(record, time), batch_miss_cache) - end - metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata - if (@use_journal || @use_journal.nil?) && - (j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache)) - metadata = j_metadata - end - if @lookup_from_k8s_field && record.has_key?('kubernetes') && record.has_key?('docker') && + tag_metadata = get_metadata_for_record( + tag_match_data['namespace'], + tag_match_data['pod_name'], + tag_match_data['container_name'], + tag_match_data['docker_id'], + create_time_from_record(record, time), + batch_miss_cache + ) if tag_match_data + + metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata + if (@use_journal || @use_journal.nil?) && + (j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache)) + metadata = j_metadata + end + if @lookup_from_k8s_field && record.has_key?('kubernetes') && record.has_key?('docker') && record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) && record['kubernetes'].has_key?('namespace_name') && record['kubernetes'].has_key?('pod_name') && @@ -334,13 +335,8 @@ def filter_stream(tag, es) record['kubernetes']['container_name'], record['docker']['container_id'], create_time_from_record(record, time), batch_miss_cache)) metadata = k_metadata - end - - record = record.merge(metadata) if metadata - new_es.add(time, record) end - dump_stats - new_es + metadata ? record.merge(metadata) : record end def get_metadata_for_journal_record(record, time, batch_miss_cache) diff --git a/test/plugin/test_filter_kubernetes_metadata.rb b/test/plugin/test_filter_kubernetes_metadata.rb index b12f278..d900856 100644 --- a/test/plugin/test_filter_kubernetes_metadata.rb +++ b/test/plugin/test_filter_kubernetes_metadata.rb @@ -128,7 +128,7 @@ def create_driver(conf = '') end end - sub_test_case 'filter_stream' do + sub_test_case 'filter' do def emit(msg={}, config=' kubernetes_url https://localhost:8443 @@ -149,18 +149,9 @@ def emit_with_tag(tag, msg={}, config=' ') d = create_driver(config) d.run(default_tag: tag) { - d.feed(@time, msg) + d.feed(msg) } - d.filtered.map{|e| e.last} - end - - test 'nil event stream' do - #not certain how this is possible but adding test to properly - #guard against this condition we have seen - test for nil, - #empty, no empty method, not an event stream - plugin = create_driver.instance - plugin.filter_stream('tag', nil) - plugin.filter_stream('tag', Fluent::MultiEventStream.new) + d.filtered_records end test 'inability to connect to the api server handles exception and doensnt block pipeline' do