diff --git a/lib/fluent/plugin/filter_kubernetes_metadata.rb b/lib/fluent/plugin/filter_kubernetes_metadata.rb index e8e4bdb..be782d1 100644 --- a/lib/fluent/plugin/filter_kubernetes_metadata.rb +++ b/lib/fluent/plugin/filter_kubernetes_metadata.rb @@ -314,30 +314,32 @@ def create_time_from_record(record, internal_time) def filter(tag, time, record) tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) unless @use_journal batch_miss_cache = {} - tag_metadata = get_metadata_for_record( - tag_match_data['namespace'], - tag_match_data['pod_name'], - tag_match_data['container_name'], - tag_match_data['docker_id'], - create_time_from_record(record, time), - batch_miss_cache - ) if tag_match_data - + if tag_match_data + tag_metadata = get_metadata_for_record( + tag_match_data['namespace'], + tag_match_data['pod_name'], + tag_match_data['container_name'], + tag_match_data['docker_id'], + create_time_from_record(record, time), + batch_miss_cache + ) + end + metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata if (@use_journal || @use_journal.nil?) && - (j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache)) + (j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache)) metadata = j_metadata end - if @lookup_from_k8s_field && record.has_key?('kubernetes') && record.has_key?('docker') && - record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) && - record['kubernetes'].has_key?('namespace_name') && - record['kubernetes'].has_key?('pod_name') && - record['kubernetes'].has_key?('container_name') && - record['docker'].has_key?('container_id') && - (k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'], - record['kubernetes']['container_name'], record['docker']['container_id'], - create_time_from_record(record, time), batch_miss_cache)) - metadata = k_metadata + if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') && + record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) && + record['kubernetes'].key?('namespace_name') && + record['kubernetes'].key?('pod_name') && + record['kubernetes'].key?('container_name') && + record['docker'].key?('container_id') && + (k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'], + record['kubernetes']['container_name'], record['docker']['container_id'], + create_time_from_record(record, time), batch_miss_cache)) + metadata = k_metadata end metadata ? record.merge(metadata) : record end diff --git a/test/plugin/test_filter_kubernetes_metadata.rb b/test/plugin/test_filter_kubernetes_metadata.rb index 3067353..62acb1f 100644 --- a/test/plugin/test_filter_kubernetes_metadata.rb +++ b/test/plugin/test_filter_kubernetes_metadata.rb @@ -145,21 +145,12 @@ def emit_with_tag(tag, msg = {}, config = ' ') d = create_driver(config) d.run(default_tag: tag) do - d.feed(@time, msg) + d.feed(msg) end - d.filtered.map(&:last) - end - - test 'nil event stream' do - # not certain how this is possible but adding test to properly - # guard against this condition we have seen - test for nil, - # empty, no empty method, not an event stream - plugin = create_driver.instance - plugin.filter_stream('tag', nil) - plugin.filter_stream('tag', Fluent::MultiEventStream.new) + d.filtered_records end - test 'inability to connect to the api server handles exception and doensnt block pipeline' do + test 'inability to connect to the api server handles exception and does not block pipeline' do VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }]) do driver = create_driver(' kubernetes_url https://localhost:8443