Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #263: use filter in lieu of filter_stream as recommended optimization #276

Merged
merged 1 commit into from Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 27 additions & 35 deletions lib/fluent/plugin/filter_kubernetes_metadata.rb
Expand Up @@ -355,44 +355,36 @@ def get_metadata_for_record(namespace_name, pod_name, container_name, cache_key,
metadata
end

def filter_stream(tag, es)
return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream)
new_es = Fluent::MultiEventStream.new
def filter(tag, time, record)
tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) unless @use_journal
tag_metadata = nil
batch_miss_cache = {}
es.each do |time, record|
if tag_match_data && tag_metadata.nil?
cache_key = if tag_match_data.names.include?('pod_uuid') && !tag_match_data['pod_uuid'].nil?
tag_match_data['pod_uuid']
else
tag_match_data['docker_id']
end
docker_id = tag_match_data.names.include?('docker_id') ? tag_match_data['docker_id'] : nil
tag_metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'],
cache_key, create_time_from_record(record, time), batch_miss_cache, docker_id)
end
metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata
if (@use_journal || @use_journal.nil?) &&
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
metadata = j_metadata
end
if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') &&
record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
record['kubernetes'].key?('namespace_name') &&
record['kubernetes'].key?('pod_name') &&
record['kubernetes'].key?('container_name') &&
record['docker'].key?('container_id') &&
(k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'],
record['kubernetes']['container_name'], record['docker']['container_id'],
create_time_from_record(record, time), batch_miss_cache, record['docker']['container_id']))
metadata = k_metadata
end
record = record.merge(metadata) if metadata
new_es.add(time, record)
if tag_match_data
cache_key = if tag_match_data.names.include?('pod_uuid') && !tag_match_data['pod_uuid'].nil?
tag_match_data['pod_uuid']
else
tag_match_data['docker_id']
end
docker_id = tag_match_data.names.include?('docker_id') ? tag_match_data['docker_id'] : nil
tag_metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'],
cache_key, create_time_from_record(record, time), batch_miss_cache, docker_id)
end
metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata
if (@use_journal || @use_journal.nil?) &&
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
metadata = j_metadata
end
if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') &&
record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
record['kubernetes'].key?('namespace_name') &&
record['kubernetes'].key?('pod_name') &&
record['kubernetes'].key?('container_name') &&
record['docker'].key?('container_id') &&
(k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'],
record['kubernetes']['container_name'], record['docker']['container_id'],
create_time_from_record(record, time), batch_miss_cache, record['docker']['container_id']))
metadata = k_metadata
end
dump_stats
new_es
metadata ? record.merge(metadata) : record
end

def get_metadata_for_journal_record(record, time, batch_miss_cache)
Expand Down
9 changes: 0 additions & 9 deletions test/plugin/test_filter_kubernetes_metadata.rb
Expand Up @@ -156,15 +156,6 @@ def emit_with_tag(tag, msg = {}, config = '
d.filtered.map(&:last)
end

test 'nil event stream' do
# not certain how this is possible but adding test to properly
# guard against this condition we have seen - test for nil,
# empty, no empty method, not an event stream
plugin = create_driver.instance
plugin.filter_stream('tag', nil)
plugin.filter_stream('tag', Fluent::MultiEventStream.new)
end

sub_test_case 'parsing_pod_metadata when container_status is missing from the pod status' do
test 'using the tag_to_kubernetes_name_regexp for /var/log/containers ' do
VCR.use_cassettes(
Expand Down