Skip to content

Commit

Permalink
fix #263: use filter in lieu of filter_stream as recommended optimiza…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
jcantrill committed Jun 24, 2022
1 parent f937c75 commit 8f831b6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 32 deletions.
42 changes: 22 additions & 20 deletions lib/fluent/plugin/filter_kubernetes_metadata.rb
Expand Up @@ -314,30 +314,32 @@ def create_time_from_record(record, internal_time)
def filter(tag, time, record)
tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) unless @use_journal
batch_miss_cache = {}
tag_metadata = get_metadata_for_record(
tag_match_data['namespace'],
tag_match_data['pod_name'],
tag_match_data['container_name'],
tag_match_data['docker_id'],
create_time_from_record(record, time),
batch_miss_cache
) if tag_match_data

if tag_match_data
tag_metadata = get_metadata_for_record(
tag_match_data['namespace'],
tag_match_data['pod_name'],
tag_match_data['container_name'],
tag_match_data['docker_id'],
create_time_from_record(record, time),
batch_miss_cache
)
end

metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata
if (@use_journal || @use_journal.nil?) &&
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
metadata = j_metadata
end
if @lookup_from_k8s_field && record.has_key?('kubernetes') && record.has_key?('docker') &&
record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
record['kubernetes'].has_key?('namespace_name') &&
record['kubernetes'].has_key?('pod_name') &&
record['kubernetes'].has_key?('container_name') &&
record['docker'].has_key?('container_id') &&
(k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'],
record['kubernetes']['container_name'], record['docker']['container_id'],
create_time_from_record(record, time), batch_miss_cache))
metadata = k_metadata
if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') &&
record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
record['kubernetes'].key?('namespace_name') &&
record['kubernetes'].key?('pod_name') &&
record['kubernetes'].key?('container_name') &&
record['docker'].key?('container_id') &&
(k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'],
record['kubernetes']['container_name'], record['docker']['container_id'],
create_time_from_record(record, time), batch_miss_cache))
metadata = k_metadata
end
metadata ? record.merge(metadata) : record
end
Expand Down
15 changes: 3 additions & 12 deletions test/plugin/test_filter_kubernetes_metadata.rb
Expand Up @@ -145,21 +145,12 @@ def emit_with_tag(tag, msg = {}, config = '
')
d = create_driver(config)
d.run(default_tag: tag) do
d.feed(@time, msg)
d.feed(msg)
end
d.filtered.map(&:last)
end

test 'nil event stream' do
# not certain how this is possible but adding test to properly
# guard against this condition we have seen - test for nil,
# empty, no empty method, not an event stream
plugin = create_driver.instance
plugin.filter_stream('tag', nil)
plugin.filter_stream('tag', Fluent::MultiEventStream.new)
d.filtered_records
end

test 'inability to connect to the api server handles exception and doensnt block pipeline' do
test 'inability to connect to the api server handles exception and does not block pipeline' do
VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }]) do
driver = create_driver('
kubernetes_url https://localhost:8443
Expand Down

0 comments on commit 8f831b6

Please sign in to comment.