Skip to content

Commit

Permalink
fix #263: use filter in lieu of filter_stream as recommended optimiza…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
jcantrill committed Jan 25, 2021
1 parent 612a5c7 commit 1aaef82
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 42 deletions.
57 changes: 27 additions & 30 deletions lib/fluent/plugin/filter_kubernetes_metadata.rb
Expand Up @@ -296,40 +296,37 @@ def get_metadata_for_record(namespace_name, pod_name, container_name, container_
metadata
end

def filter_stream(tag, es)
return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream)

new_es = Fluent::MultiEventStream.new
def filter(tag, time, record)
tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) unless @use_journal
tag_metadata = nil
batch_miss_cache = {}
es.each do |time, record|
if tag_match_data && tag_metadata.nil?
tag_metadata = get_metadata_for_record(tag_match_data['namespace'], tag_match_data['pod_name'], tag_match_data['container_name'],
tag_match_data['docker_id'], create_time_from_record(record, time), batch_miss_cache)
end
metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata
if (@use_journal || @use_journal.nil?) &&
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
metadata = j_metadata
end
if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') &&
record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
record['kubernetes'].key?('namespace_name') &&
record['kubernetes'].key?('pod_name') &&
record['kubernetes'].key?('container_name') &&
record['docker'].key?('container_id') &&
(k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'],
record['kubernetes']['container_name'], record['docker']['container_id'],
create_time_from_record(record, time), batch_miss_cache))
metadata = k_metadata
end
if tag_match_data
tag_metadata = get_metadata_for_record(
tag_match_data['namespace'],
tag_match_data['pod_name'],
tag_match_data['container_name'],
tag_match_data['docker_id'],
create_time_from_record(record, time),
batch_miss_cache
)
end

record = record.merge(metadata) if metadata
new_es.add(time, record)
metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata
if (@use_journal || @use_journal.nil?) &&
(j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache))
metadata = j_metadata
end
if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') &&
record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) &&
record['kubernetes'].key?('namespace_name') &&
record['kubernetes'].key?('pod_name') &&
record['kubernetes'].key?('container_name') &&
record['docker'].key?('container_id') &&
(k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'],
record['kubernetes']['container_name'], record['docker']['container_id'],
create_time_from_record(record, time), batch_miss_cache))
metadata = k_metadata
end
dump_stats
new_es
metadata ? record.merge(metadata) : record
end

def get_metadata_for_journal_record(record, time, batch_miss_cache)
Expand Down
15 changes: 3 additions & 12 deletions test/plugin/test_filter_kubernetes_metadata.rb
Expand Up @@ -145,21 +145,12 @@ def emit_with_tag(tag, msg = {}, config = '
')
d = create_driver(config)
d.run(default_tag: tag) do
d.feed(@time, msg)
d.feed(msg)
end
d.filtered.map(&:last)
end

test 'nil event stream' do
# not certain how this is possible but adding test to properly
# guard against this condition we have seen - test for nil,
# empty, no empty method, not an event stream
plugin = create_driver.instance
plugin.filter_stream('tag', nil)
plugin.filter_stream('tag', Fluent::MultiEventStream.new)
d.filtered_records
end

test 'inability to connect to the api server handles exception and doensnt block pipeline' do
test 'inability to connect to the api server handles exception and does not block pipeline' do
VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }]) do
driver = create_driver('
kubernetes_url https://localhost:8443
Expand Down

0 comments on commit 1aaef82

Please sign in to comment.