From b87f564b4e5fcd8011414432f455753099c2e9c5 Mon Sep 17 00:00:00 2001 From: Jeff Cantrill Date: Sat, 23 Jan 2021 15:39:59 -0500 Subject: [PATCH] fix #251. Add ruby linter and fix formatting (#278) --- .circleci/config.yml | 2 +- .rubocop.yml | 57 ++ Gemfile | 6 +- README.md | 2 + Rakefile | 26 +- ...-plugin-kubernetes_metadata_filter.gemspec | 45 +- .../plugin/filter_kubernetes_metadata.rb | 139 ++-- .../kubernetes_metadata_cache_strategy.rb | 42 +- .../plugin/kubernetes_metadata_common.rb | 58 +- .../plugin/kubernetes_metadata_stats.rb | 12 +- .../kubernetes_metadata_watch_namespaces.rb | 130 ++-- .../plugin/kubernetes_metadata_watch_pods.rb | 139 ++-- test/helper.rb | 8 +- test/plugin/test_cache_stats.rb | 20 +- test/plugin/test_cache_strategy.rb | 315 ++++---- .../plugin/test_filter_kubernetes_metadata.rb | 681 +++++++++--------- test/plugin/test_watch_namespaces.rb | 381 +++++----- test/plugin/test_watch_pods.rb | 535 +++++++------- test/plugin/watch_test.rb | 20 +- 19 files changed, 1352 insertions(+), 1266 deletions(-) create mode 100644 .rubocop.yml diff --git a/.circleci/config.yml b/.circleci/config.yml index 121d9f4..1a0635a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -17,7 +17,7 @@ missingdeps: &missingdeps test: &test name: Test bundle - command: bundle exec rake test + command: bundle exec rake test --trace executors: ruby-2-5: diff --git a/.rubocop.yml b/.rubocop.yml new file mode 100644 index 0000000..cf369f4 --- /dev/null +++ b/.rubocop.yml @@ -0,0 +1,57 @@ +AllCops: + TargetRubyVersion: 2.5 # keep in sync with .circleci/config.yml and gemspec + NewCops: enable + +Style/EmptyMethod: + Enabled: false + +Metrics: + Enabled: false + +# not safe ... needs require 'English' +Style/SpecialGlobalVars: + Enabled: false + +Layout/LineLength: + Max: 205 # TODO: lower + +Style/Documentation: + Enabled: false + +Naming/AccessorMethodName: + Enabled: false + +Naming/MethodParameterName: + Enabled: false + +Style/IfInsideElse: + Enabled: false + +Style/GuardClause: + Enabled: false + +Lint/NestedMethodDefinition: + Enabled: false + +# TODO: fix +Style/StringConcatenation: + Enabled: false + +Style/NumericPredicate: + EnforcedStyle: comparison + +Style/IfUnlessModifier: + Enabled: false + +Style/ClassAndModuleChildren: + Enabled: false + +# TODO: enable ... somehow breaks tests +Style/HashEachMethods: + Enabled: false + +Style/WordArray: + EnforcedStyle: brackets + +Style/SymbolArray: + EnforcedStyle: brackets diff --git a/Gemfile b/Gemfile index 9021694..02c3562 100644 --- a/Gemfile +++ b/Gemfile @@ -1,7 +1,9 @@ +# frozen_string_literal: true + source 'https://rubygems.org' -gem 'codeclimate-test-reporter', '<1.0.0', :group => :test, :require => nil -gem 'rubocop', require: false +gem 'codeclimate-test-reporter', '<1.0.0', group: :test, require: nil +gem 'rubocop' # Specify your gem's dependencies in fluent-plugin-add.gemspec gemspec diff --git a/README.md b/README.md index fc9293b..2ead4a9 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ [![Circle CI](https://circleci.com/gh/fabric8io/fluent-plugin-kubernetes_metadata_filter.svg?style=svg)](https://circleci.com/gh/fabric8io/fluent-plugin-kubernetes_metadata_filter) [![Code Climate](https://codeclimate.com/github/fabric8io/fluent-plugin-kubernetes_metadata_filter/badges/gpa.svg)](https://codeclimate.com/github/fabric8io/fluent-plugin-kubernetes_metadata_filter) [![Test Coverage](https://codeclimate.com/github/fabric8io/fluent-plugin-kubernetes_metadata_filter/badges/coverage.svg)](https://codeclimate.com/github/fabric8io/fluent-plugin-kubernetes_metadata_filter) +[![Ruby Style Guide](https://img.shields.io/badge/code_style-rubocop-brightgreen.svg)](https://github.com/rubocop-hq/rubocop) +[![Ruby Style Guide](https://img.shields.io/badge/code_style-community-brightgreen.svg)](https://rubystyle.guide) The Kubernetes metadata plugin filter enriches container log records with pod and namespace metadata. diff --git a/Rakefile b/Rakefile index d117a33..59b1d71 100644 --- a/Rakefile +++ b/Rakefile @@ -1,10 +1,15 @@ +# frozen_string_literal: true + +require 'bundler/setup' require 'bundler/gem_tasks' require 'rake/testtask' require 'bump/tasks' +require 'rubocop/rake_task' -task :test => [:base_test] +task test: [:base_test] +task default: [:test, :build, :rubocop] -task :default => [:test, :build] +RuboCop::RakeTask.new desc 'Run test_unit based test' Rake::TestTask.new(:base_test) do |t| @@ -13,7 +18,6 @@ Rake::TestTask.new(:base_test) do |t| # $ bundle exec rake base_test TEST=test/test_*.rb t.libs << 'test' t.test_files = Dir['test/**/test_*.rb'].sort - #t.verbose = true t.warning = false end @@ -23,15 +27,15 @@ task :headers do require 'copyright_header' args = { - :license => 'Apache-2.0', - :copyright_software => 'Fluentd Kubernetes Metadata Filter Plugin', - :copyright_software_description => 'Enrich Fluentd events with Kubernetes metadata', - :copyright_holders => ['Red Hat, Inc.'], - :copyright_years => ['2015-2017'], - :add_path => 'lib:test', - :output_dir => '.' + license: 'Apache-2.0', + copyright_software: 'Fluentd Kubernetes Metadata Filter Plugin', + copyright_software_description: 'Enrich Fluentd events with Kubernetes metadata', + copyright_holders: ['Red Hat, Inc.'], + copyright_years: ['2015-2021'], + add_path: 'lib:test', + output_dir: '.' } - command_line = CopyrightHeader::CommandLine.new( args ) + command_line = CopyrightHeader::CommandLine.new(args) command_line.execute end diff --git a/fluent-plugin-kubernetes_metadata_filter.gemspec b/fluent-plugin-kubernetes_metadata_filter.gemspec index 71d0665..299a729 100644 --- a/fluent-plugin-kubernetes_metadata_filter.gemspec +++ b/fluent-plugin-kubernetes_metadata_filter.gemspec @@ -1,33 +1,34 @@ -# coding: utf-8 -lib = File.expand_path('../lib', __FILE__) +# frozen_string_literal: true + +lib = File.expand_path('lib', __dir__) $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) Gem::Specification.new do |gem| - gem.name = "fluent-plugin-kubernetes_metadata_filter" - gem.version = "2.6.0" - gem.authors = ["Jimmi Dyson"] - gem.email = ["jimmidyson@gmail.com"] - gem.description = %q{Filter plugin to add Kubernetes metadata} - gem.summary = %q{Fluentd filter plugin to add Kubernetes metadata} - gem.homepage = "https://github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter" - gem.license = "Apache-2.0" + gem.name = 'fluent-plugin-kubernetes_metadata_filter' + gem.version = '2.6.0' + gem.authors = ['Jimmi Dyson'] + gem.email = ['jimmidyson@gmail.com'] + gem.description = 'Filter plugin to add Kubernetes metadata' + gem.summary = 'Fluentd filter plugin to add Kubernetes metadata' + gem.homepage = 'https://github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter' + gem.license = 'Apache-2.0' gem.files = `git ls-files`.split($/) gem.required_ruby_version = '>= 2.5.0' gem.add_runtime_dependency 'fluentd', ['>= 0.14.0', '< 1.13'] - gem.add_runtime_dependency "lru_redux" - gem.add_runtime_dependency "kubeclient", '< 5' + gem.add_runtime_dependency 'kubeclient', '< 5' + gem.add_runtime_dependency 'lru_redux' - gem.add_development_dependency "bundler", "~> 2.0" - gem.add_development_dependency "rake" - gem.add_development_dependency "minitest", "~> 4.0" - gem.add_development_dependency "test-unit", "~> 3.0.2" - gem.add_development_dependency "test-unit-rr", "~> 1.0.3" - gem.add_development_dependency "copyright-header" - gem.add_development_dependency "webmock" - gem.add_development_dependency "vcr" - gem.add_development_dependency "bump" - gem.add_development_dependency "yajl-ruby" + gem.add_development_dependency 'bump' + gem.add_development_dependency 'bundler', '~> 2.0' + gem.add_development_dependency 'copyright-header' + gem.add_development_dependency 'minitest', '~> 4.0' + gem.add_development_dependency 'rake' + gem.add_development_dependency 'test-unit', '~> 3.0.2' + gem.add_development_dependency 'test-unit-rr', '~> 1.0.3' + gem.add_development_dependency 'vcr' + gem.add_development_dependency 'webmock' + gem.add_development_dependency 'yajl-ruby' end diff --git a/lib/fluent/plugin/filter_kubernetes_metadata.rb b/lib/fluent/plugin/filter_kubernetes_metadata.rb index 2ed1f55..6aa4836 100644 --- a/lib/fluent/plugin/filter_kubernetes_metadata.rb +++ b/lib/fluent/plugin/filter_kubernetes_metadata.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata @@ -49,7 +51,7 @@ class KubernetesMetadataFilter < Fluent::Plugin::Filter config_param :verify_ssl, :bool, default: true config_param :tag_to_kubernetes_name_regexp, :string, - :default => 'var\.log\.containers\.(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$' + default: 'var\.log\.containers\.(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{64})\.log$' config_param :bearer_token_file, :string, default: nil config_param :secret_dir, :string, default: '/var/run/secrets/kubernetes.io/serviceaccount' config_param :de_dot, :bool, default: true @@ -65,7 +67,7 @@ class KubernetesMetadataFilter < Fluent::Plugin::Filter # parse format is defined here: https://github.com/kubernetes/kubernetes/blob/release-1.6/pkg/kubelet/dockertools/docker.go#L317 config_param :container_name_to_kubernetes_regexp, :string, - :default => '^(?[^_]+)_(?[^\._]+)(\.(?[^_]+))?_(?[^_]+)_(?[^_]+)_[^_]+_[^_]+$' + default: '^(?[^_]+)_(?[^\._]+)(\.(?[^_]+))?_(?[^_]+)_(?[^_]+)_[^_]+_[^_]+$' config_param :annotation_match, :array, default: [] config_param :stats_interval, :integer, default: 30 @@ -96,7 +98,7 @@ def fetch_pod_metadata(namespace_name, pod_name) @stats.bump(:pod_cache_api_updates) log.trace("parsed metadata for #{namespace_name}/#{pod_name}: #{metadata}") if log.trace? @cache[metadata['pod_id']] = metadata - rescue => e + rescue StandardError => e @stats.bump(:pod_cache_api_nil_error) log.debug "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}" {} @@ -105,6 +107,7 @@ def fetch_pod_metadata(namespace_name, pod_name) def dump_stats @curr_time = Time.now return if @curr_time.to_i - @prev_time.to_i < @stats_interval + @prev_time = @curr_time @stats.set(:pod_cache_size, @cache.count) @stats.set(:namespace_cache_size, @namespace_cache.count) if @namespace_cache @@ -124,9 +127,9 @@ def fetch_namespace_metadata(namespace_name) @stats.bump(:namespace_cache_api_updates) log.trace("parsed metadata for #{namespace_name}: #{metadata}") if log.trace? @namespace_cache[metadata['namespace_id']] = metadata - rescue => kube_error + rescue StandardError => e @stats.bump(:namespace_cache_api_nil_error) - log.debug "Exception '#{kube_error}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}" + log.debug "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}" {} end @@ -146,12 +149,12 @@ def log.trace? require 'lru_redux' @stats = KubernetesMetadata::Stats.new - if @de_dot && @de_dot_separator.include?(".") + if @de_dot && @de_dot_separator.include?('.') raise Fluent::ConfigError, "Invalid de_dot_separator: cannot be or contain '.'" end if @cache_ttl < 0 - log.info "Setting the cache TTL to :none because it was <= 0" + log.info 'Setting the cache TTL to :none because it was <= 0' @cache_ttl = :none end @@ -169,7 +172,7 @@ def log.trace? # Use Kubernetes default service account if we're in a pod. if @kubernetes_url.nil? - log.debug "Kubernetes URL is not set - inspecting environ" + log.debug 'Kubernetes URL is not set - inspecting environ' env_host = ENV['KUBERNETES_SERVICE_HOST'] env_port = ENV['KUBERNETES_SERVICE_PORT'] @@ -181,7 +184,7 @@ def log.trace? @kubernetes_url = "https://#{env_host}:#{env_port}/api" log.debug "Kubernetes URL is now '#{@kubernetes_url}'" else - log.debug "No Kubernetes URL could be found in config or environ" + log.debug 'No Kubernetes URL could be found in config or environ' end end @@ -191,12 +194,12 @@ def log.trace? ca_cert = File.join(@secret_dir, K8_POD_CA_CERT) pod_token = File.join(@secret_dir, K8_POD_TOKEN) - if !present?(@ca_file) and File.exist?(ca_cert) + if !present?(@ca_file) && File.exist?(ca_cert) log.debug "Found CA certificate: #{ca_cert}" @ca_file = ca_cert end - if !present?(@bearer_token_file) and File.exist?(pod_token) + if !present?(@bearer_token_file) && File.exist?(pod_token) log.debug "Found pod token: #{pod_token}" @bearer_token_file = pod_token end @@ -204,10 +207,10 @@ def log.trace? if present?(@kubernetes_url) ssl_options = { - client_cert: present?(@client_cert) ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil, - client_key: present?(@client_key) ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil, - ca_file: @ca_file, - verify_ssl: @verify_ssl ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE + client_cert: present?(@client_cert) ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil, + client_key: present?(@client_key) ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil, + ca_file: @ca_file, + verify_ssl: @verify_ssl ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE } if @ssl_partial_chain @@ -215,12 +218,12 @@ def log.trace? require 'openssl' ssl_store = OpenSSL::X509::Store.new ssl_store.set_default_paths - if defined? OpenSSL::X509::V_FLAG_PARTIAL_CHAIN - flagval = OpenSSL::X509::V_FLAG_PARTIAL_CHAIN - else - # this version of ruby does not define OpenSSL::X509::V_FLAG_PARTIAL_CHAIN - flagval = 0x80000 - end + flagval = if defined? OpenSSL::X509::V_FLAG_PARTIAL_CHAIN + OpenSSL::X509::V_FLAG_PARTIAL_CHAIN + else + # this version of ruby does not define OpenSSL::X509::V_FLAG_PARTIAL_CHAIN + 0x80000 + end ssl_store.flags = OpenSSL::X509::V_FLAG_CRL_CHECK_ALL | flagval ssl_options[:cert_store] = ssl_store end @@ -232,7 +235,7 @@ def log.trace? auth_options[:bearer_token] = bearer_token end - log.debug "Creating K8S client" + log.debug 'Creating K8S client' @client = Kubeclient::Client.new( @kubernetes_url, @apiVersion, @@ -243,15 +246,15 @@ def log.trace? begin @client.api_valid? - rescue KubeException => kube_error - raise Fluent::ConfigError, "Invalid Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}: #{kube_error.message}" + rescue KubeException => e + raise Fluent::ConfigError, "Invalid Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}: #{e.message}" end if @watch - pod_thread = Thread.new(self) { |this| this.set_up_pod_thread } + pod_thread = Thread.new(self, &:set_up_pod_thread) pod_thread.abort_on_exception = true - namespace_thread = Thread.new(self) { |this| this.set_up_namespace_thread } + namespace_thread = Thread.new(self, &:set_up_namespace_thread) namespace_thread.abort_on_exception = true end end @@ -262,22 +265,19 @@ def log.trace? @annotations_regexps = [] @annotation_match.each do |regexp| - begin - @annotations_regexps << Regexp.compile(regexp) - rescue RegexpError => e - log.error "Error: invalid regular expression in annotation_match: #{e}" - end + @annotations_regexps << Regexp.compile(regexp) + rescue RegexpError => e + log.error "Error: invalid regular expression in annotation_match: #{e}" end - end def get_metadata_for_record(namespace_name, pod_name, container_name, container_id, create_time, batch_miss_cache) metadata = { - 'docker' => {'container_id' => container_id}, + 'docker' => { 'container_id' => container_id }, 'kubernetes' => { - 'container_name' => container_name, - 'namespace_name' => namespace_name, - 'pod_name' => pod_name + 'container_name' => container_name, + 'namespace_name' => namespace_name, + 'pod_name' => pod_name } } if present?(@kubernetes_url) @@ -295,62 +295,65 @@ def get_metadata_for_record(namespace_name, pod_name, container_name, container_ end def create_time_from_record(record, internal_time) - time_key = @time_fields.detect{ |ii| record.has_key?(ii) } + time_key = @time_fields.detect { |ii| record.key?(ii) } time = record[time_key] if time.nil? || time.chop.empty? # `internal_time` is a Fluent::EventTime, it can't compare with Time. return Time.at(internal_time.to_f) end + if ['_SOURCE_REALTIME_TIMESTAMP', '__REALTIME_TIMESTAMP'].include?(time_key) - timei= time.to_i - return Time.at(timei / 1000000, timei % 1000000) + timei = time.to_i + return Time.at(timei / 1_000_000, timei % 1_000_000) end - return Time.parse(time) + Time.parse(time) end def filter(tag, time, record) tag_match_data = tag.match(@tag_to_kubernetes_name_regexp_compiled) unless @use_journal batch_miss_cache = {} - tag_metadata = get_metadata_for_record( - tag_match_data['namespace'], - tag_match_data['pod_name'], - tag_match_data['container_name'], - tag_match_data['docker_id'], - create_time_from_record(record, time), - batch_miss_cache - ) if tag_match_data - + if tag_match_data + tag_metadata = get_metadata_for_record( + tag_match_data['namespace'], + tag_match_data['pod_name'], + tag_match_data['container_name'], + tag_match_data['docker_id'], + create_time_from_record(record, time), + batch_miss_cache + ) + end + metadata = Marshal.load(Marshal.dump(tag_metadata)) if tag_metadata if (@use_journal || @use_journal.nil?) && - (j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache)) + (j_metadata = get_metadata_for_journal_record(record, time, batch_miss_cache)) metadata = j_metadata end - if @lookup_from_k8s_field && record.has_key?('kubernetes') && record.has_key?('docker') && - record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) && - record['kubernetes'].has_key?('namespace_name') && - record['kubernetes'].has_key?('pod_name') && - record['kubernetes'].has_key?('container_name') && - record['docker'].has_key?('container_id') && - (k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'], - record['kubernetes']['container_name'], record['docker']['container_id'], - create_time_from_record(record, time), batch_miss_cache)) - metadata = k_metadata + if @lookup_from_k8s_field && record.key?('kubernetes') && record.key?('docker') && + record['kubernetes'].respond_to?(:has_key?) && record['docker'].respond_to?(:has_key?) && + record['kubernetes'].key?('namespace_name') && + record['kubernetes'].key?('pod_name') && + record['kubernetes'].key?('container_name') && + record['docker'].key?('container_id') && + (k_metadata = get_metadata_for_record(record['kubernetes']['namespace_name'], record['kubernetes']['pod_name'], + record['kubernetes']['container_name'], record['docker']['container_id'], + create_time_from_record(record, time), batch_miss_cache)) + metadata = k_metadata end metadata ? record.merge(metadata) : record end def get_metadata_for_journal_record(record, time, batch_miss_cache) metadata = nil - if record.has_key?('CONTAINER_NAME') && record.has_key?('CONTAINER_ID_FULL') + if record.key?('CONTAINER_NAME') && record.key?('CONTAINER_ID_FULL') metadata = record['CONTAINER_NAME'].match(@container_name_to_kubernetes_regexp_compiled) do |match_data| get_metadata_for_record(match_data['namespace'], match_data['pod_name'], match_data['container_name'], - record['CONTAINER_ID_FULL'], create_time_from_record(record, time), batch_miss_cache) + record['CONTAINER_ID_FULL'], create_time_from_record(record, time), batch_miss_cache) end unless metadata log.debug "Error: could not match CONTAINER_NAME from record #{record}" @stats.bump(:container_name_match_failed) end - elsif record.has_key?('CONTAINER_NAME') && record['CONTAINER_NAME'].start_with?('k8s_') + elsif record.key?('CONTAINER_NAME') && record['CONTAINER_NAME'].start_with?('k8s_') log.debug "Error: no container name and id in record #{record}" @stats.bump(:container_name_id_missing) end @@ -359,11 +362,11 @@ def get_metadata_for_journal_record(record, time, batch_miss_cache) def de_dot!(h) h.keys.each do |ref| - if h[ref] && ref =~ /\./ - v = h.delete(ref) - newref = ref.to_s.gsub('.', @de_dot_separator) - h[newref] = v - end + next unless h[ref] && ref =~ /\./ + + v = h.delete(ref) + newref = ref.to_s.gsub('.', @de_dot_separator) + h[newref] = v end end diff --git a/lib/fluent/plugin/kubernetes_metadata_cache_strategy.rb b/lib/fluent/plugin/kubernetes_metadata_cache_strategy.rb index c09915e..fd030a3 100644 --- a/lib/fluent/plugin/kubernetes_metadata_cache_strategy.rb +++ b/lib/fluent/plugin/kubernetes_metadata_cache_strategy.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata @@ -21,33 +23,20 @@ module CacheStrategy def get_pod_metadata(key, namespace_name, pod_name, record_create_time, batch_miss_cache) metadata = {} ids = @id_cache[key] - if !ids.nil? + if ids.nil? # FAST PATH # Cache hit, fetch metadata from the cache - metadata = @cache.fetch(ids[:pod_id]) do - @stats.bump(:pod_cache_miss) - m = fetch_pod_metadata(namespace_name, pod_name) - (m.nil? || m.empty?) ? {'pod_id'=>ids[:pod_id]} : m - end - metadata.merge!(@namespace_cache.fetch(ids[:namespace_id]) do - m = unless @skip_namespace_metadata - @stats.bump(:namespace_cache_miss) - fetch_namespace_metadata(namespace_name) - end - (m.nil? || m.empty?) ? {'namespace_id'=>ids[:namespace_id]} : m - end) - else - # SLOW PATH @stats.bump(:id_cache_miss) return batch_miss_cache["#{namespace_name}_#{pod_name}"] if batch_miss_cache.key?("#{namespace_name}_#{pod_name}") + pod_metadata = fetch_pod_metadata(namespace_name, pod_name) if @skip_namespace_metadata - ids = { :pod_id=> pod_metadata['pod_id'] } + ids = { pod_id: pod_metadata['pod_id'] } @id_cache[key] = ids return pod_metadata end namespace_metadata = fetch_namespace_metadata(namespace_name) - ids = { :pod_id=> pod_metadata['pod_id'], :namespace_id => namespace_metadata['namespace_id'] } + ids = { pod_id: pod_metadata['pod_id'], namespace_id: namespace_metadata['namespace_id'] } if !ids[:pod_id].nil? && !ids[:namespace_id].nil? # pod found and namespace found metadata = pod_metadata @@ -61,7 +50,7 @@ def get_pod_metadata(key, namespace_name, pod_name, record_create_time, batch_mi # namespace is older then record for pod ids[:pod_id] = key metadata = @cache.fetch(ids[:pod_id]) do - m = { 'pod_id' => ids[:pod_id] } + { 'pod_id' => ids[:pod_id] } end end metadata.merge!(namespace_metadata) @@ -89,12 +78,25 @@ def get_pod_metadata(key, namespace_name, pod_name, record_create_time, batch_mi end end @id_cache[key] = ids unless batch_miss_cache.key?("#{namespace_name}_#{pod_name}") + else + # SLOW PATH + metadata = @cache.fetch(ids[:pod_id]) do + @stats.bump(:pod_cache_miss) + m = fetch_pod_metadata(namespace_name, pod_name) + m.nil? || m.empty? ? { 'pod_id' => ids[:pod_id] } : m + end + metadata.merge!(@namespace_cache.fetch(ids[:namespace_id]) do + m = unless @skip_namespace_metadata + @stats.bump(:namespace_cache_miss) + fetch_namespace_metadata(namespace_name) + end + m.nil? || m.empty? ? { 'namespace_id' => ids[:namespace_id] } : m + end) end # remove namespace info that is only used for comparison metadata.delete('creation_timestamp') - metadata.delete_if{|k,v| v.nil?} + metadata.delete_if { |_k, v| v.nil? } end - end end diff --git a/lib/fluent/plugin/kubernetes_metadata_common.rb b/lib/fluent/plugin/kubernetes_metadata_common.rb index f8e29c2..f039b87 100644 --- a/lib/fluent/plugin/kubernetes_metadata_common.rb +++ b/lib/fluent/plugin/kubernetes_metadata_common.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata @@ -18,9 +20,8 @@ # module KubernetesMetadata module Common - class GoneError < StandardError - def initialize(msg="410 Gone") + def initialize(msg = '410 Gone') super end end @@ -38,13 +39,13 @@ def match_annotations(annotations) end def parse_namespace_metadata(namespace_object) - labels = String.new + labels = '' labels = syms_to_strs(namespace_object[:metadata][:labels].to_h) unless @skip_labels annotations = match_annotations(syms_to_strs(namespace_object[:metadata][:annotations].to_h)) if @de_dot - self.de_dot!(labels) unless @skip_labels - self.de_dot!(annotations) + de_dot!(labels) unless @skip_labels + de_dot!(annotations) end kubernetes_metadata = { 'namespace_id' => namespace_object[:metadata][:uid], @@ -56,43 +57,43 @@ def parse_namespace_metadata(namespace_object) end def parse_pod_metadata(pod_object) - labels = String.new + labels = '' labels = syms_to_strs(pod_object[:metadata][:labels].to_h) unless @skip_labels annotations = match_annotations(syms_to_strs(pod_object[:metadata][:annotations].to_h)) if @de_dot - self.de_dot!(labels) unless @skip_labels - self.de_dot!(annotations) + de_dot!(labels) unless @skip_labels + de_dot!(annotations) end # collect container information container_meta = {} begin - pod_object[:status][:containerStatuses].each do|container_status| + pod_object[:status][:containerStatuses].each do |container_status| # get plain container id (eg. docker://hash -> hash) - container_id = container_status[:containerID].sub /^[-_a-zA-Z0-9]+:\/\//, '' - unless @skip_container_metadata - container_meta[container_id] = { - 'name' => container_status[:name], - 'image' => container_status[:image], - 'image_id' => container_status[:imageID] - } - else - container_meta[container_id] = { - 'name' => container_status[:name] - } - end + container_id = container_status[:containerID].sub(%r{^[-_a-zA-Z0-9]+://}, '') + container_meta[container_id] = if @skip_container_metadata + { + 'name' => container_status[:name] + } + else + { + 'name' => container_status[:name], + 'image' => container_status[:image], + 'image_id' => container_status[:imageID] + } + end end - rescue + rescue StandardError log.debug("parsing container meta information failed for: #{pod_object[:metadata][:namespace]}/#{pod_object[:metadata][:name]} ") end kubernetes_metadata = { - 'namespace_name' => pod_object[:metadata][:namespace], - 'pod_id' => pod_object[:metadata][:uid], - 'pod_name' => pod_object[:metadata][:name], - 'containers' => syms_to_strs(container_meta), - 'host' => pod_object[:spec][:nodeName] + 'namespace_name' => pod_object[:metadata][:namespace], + 'pod_id' => pod_object[:metadata][:uid], + 'pod_name' => pod_object[:metadata][:name], + 'containers' => syms_to_strs(container_meta), + 'host' => pod_object[:spec][:nodeName] } kubernetes_metadata['annotations'] = annotations unless annotations.empty? kubernetes_metadata['labels'] = labels unless labels.empty? @@ -102,7 +103,7 @@ def parse_pod_metadata(pod_object) def syms_to_strs(hsh) newhsh = {} - hsh.each_pair do |kk,vv| + hsh.each_pair do |kk, vv| if vv.is_a?(Hash) vv = syms_to_strs(vv) end @@ -114,6 +115,5 @@ def syms_to_strs(hsh) end newhsh end - end end diff --git a/lib/fluent/plugin/kubernetes_metadata_stats.rb b/lib/fluent/plugin/kubernetes_metadata_stats.rb index bb6c416..4dd69aa 100644 --- a/lib/fluent/plugin/kubernetes_metadata_stats.rb +++ b/lib/fluent/plugin/kubernetes_metadata_stats.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata @@ -19,17 +21,16 @@ require 'lru_redux' module KubernetesMetadata class Stats - def initialize @stats = ::LruRedux::TTL::ThreadSafeCache.new(1000, 3600) end def bump(key) - @stats[key] = @stats.getset(key) { 0 } + 1 + @stats[key] = @stats.getset(key) { 0 } + 1 end def set(key, value) - @stats[key] = value + @stats[key] = value end def [](key) @@ -37,10 +38,9 @@ def [](key) end def to_s - "stats - " + [].tap do |a| - @stats.each {|k,v| a << "#{k.to_s}: #{v}"} + 'stats - ' + [].tap do |a| + @stats.each { |k, v| a << "#{k}: #{v}" } end.join(', ') end - end end diff --git a/lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb b/lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb index 10f0068..4f7dd60 100644 --- a/lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb +++ b/lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata @@ -21,7 +23,6 @@ module KubernetesMetadata module WatchNamespaces - include ::KubernetesMetadata::Common def set_up_namespace_thread @@ -36,48 +37,47 @@ def set_up_namespace_thread # processing will be swallowed and retried. These failures / # exceptions could be caused by Kubernetes API being temporarily # down. We assume the configuration is correct at this point. - while true - begin - namespace_watcher ||= get_namespaces_and_start_watcher - process_namespace_watcher_notices(namespace_watcher) - rescue GoneError => e - # Expected error. Quietly go back through the loop in order to - # start watching from the latest resource versions - @stats.bump(:namespace_watch_gone_errors) - log.info("410 Gone encountered. Restarting namespace watch to reset resource versions.", e) + loop do + namespace_watcher ||= get_namespaces_and_start_watcher + process_namespace_watcher_notices(namespace_watcher) + rescue GoneError => e + # Expected error. Quietly go back through the loop in order to + # start watching from the latest resource versions + @stats.bump(:namespace_watch_gone_errors) + log.info('410 Gone encountered. Restarting namespace watch to reset resource versions.', e) + namespace_watcher = nil + rescue StandardError => e + @stats.bump(:namespace_watch_failures) + if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times + # Instead of raising exceptions and crashing Fluentd, swallow + # the exception and reset the watcher. + log.info( + 'Exception encountered parsing namespace watch event. ' \ + 'The connection might have been closed. Sleeping for ' \ + "#{Thread.current[:namespace_watch_retry_backoff_interval]} " \ + 'seconds and resetting the namespace watcher.', e + ) + sleep(Thread.current[:namespace_watch_retry_backoff_interval]) + Thread.current[:namespace_watch_retry_count] += 1 + Thread.current[:namespace_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base namespace_watcher = nil - rescue => e - @stats.bump(:namespace_watch_failures) - if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times - # Instead of raising exceptions and crashing Fluentd, swallow - # the exception and reset the watcher. - log.info( - "Exception encountered parsing namespace watch event. " \ - "The connection might have been closed. Sleeping for " \ - "#{Thread.current[:namespace_watch_retry_backoff_interval]} " \ - "seconds and resetting the namespace watcher.", e) - sleep(Thread.current[:namespace_watch_retry_backoff_interval]) - Thread.current[:namespace_watch_retry_count] += 1 - Thread.current[:namespace_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base - namespace_watcher = nil - else - # Since retries failed for many times, log as errors instead - # of info and raise exceptions and trigger Fluentd to restart. - message = - "Exception encountered parsing namespace watch event. The " \ - "connection might have been closed. Retried " \ - "#{@watch_retry_max_times} times yet still failing. Restarting." - log.error(message, e) - raise Fluent::UnrecoverableError.new(message) - end + else + # Since retries failed for many times, log as errors instead + # of info and raise exceptions and trigger Fluentd to restart. + message = + 'Exception encountered parsing namespace watch event. The ' \ + 'connection might have been closed. Retried ' \ + "#{@watch_retry_max_times} times yet still failing. Restarting." + log.error(message, e) + raise Fluent::UnrecoverableError, message end end end def start_namespace_watch get_namespaces_and_start_watcher - rescue => e - message = "start_namespace_watch: Exception encountered setting up " \ + rescue StandardError => e + message = 'start_namespace_watch: Exception encountered setting up ' \ "namespace watch from Kubernetes API #{@apiVersion} endpoint " \ "#{@kubernetes_url}: #{e.message}" message += " (#{e.response})" if e.respond_to?(:response) @@ -90,7 +90,7 @@ def start_namespace_watch # starting from that resourceVersion. def get_namespaces_and_start_watcher options = { - resource_version: '0' # Fetch from API server cache instead of etcd quorum read + resource_version: '0' # Fetch from API server cache instead of etcd quorum read } namespaces = @client.get_namespaces(options) namespaces[:items].each do |namespace| @@ -118,35 +118,35 @@ def reset_namespace_watch_retry_stats def process_namespace_watcher_notices(watcher) watcher.each do |notice| case notice[:type] - when 'MODIFIED' - reset_namespace_watch_retry_stats - cache_key = notice[:object][:metadata][:uid] - cached = @namespace_cache[cache_key] - if cached - @namespace_cache[cache_key] = parse_namespace_metadata(notice[:object]) - @stats.bump(:namespace_cache_watch_updates) - else - @stats.bump(:namespace_cache_watch_misses) - end - when 'DELETED' - reset_namespace_watch_retry_stats - # ignore and let age out for cases where - # deleted but still processing logs - @stats.bump(:namespace_cache_watch_deletes_ignored) - when 'ERROR' - if notice[:object] && notice[:object][:code] == 410 - @stats.bump(:namespace_watch_gone_notices) - raise GoneError - else - @stats.bump(:namespace_watch_error_type_notices) - message = notice[:object][:message] if notice[:object] && notice[:object][:message] - raise "Error while watching namespaces: #{message}" - end + when 'MODIFIED' + reset_namespace_watch_retry_stats + cache_key = notice[:object][:metadata][:uid] + cached = @namespace_cache[cache_key] + if cached + @namespace_cache[cache_key] = parse_namespace_metadata(notice[:object]) + @stats.bump(:namespace_cache_watch_updates) else - reset_namespace_watch_retry_stats - # Don't pay attention to creations, since the created namespace may not - # be used by any namespace on this node. - @stats.bump(:namespace_cache_watch_ignored) + @stats.bump(:namespace_cache_watch_misses) + end + when 'DELETED' + reset_namespace_watch_retry_stats + # ignore and let age out for cases where + # deleted but still processing logs + @stats.bump(:namespace_cache_watch_deletes_ignored) + when 'ERROR' + if notice[:object] && notice[:object][:code] == 410 + @stats.bump(:namespace_watch_gone_notices) + raise GoneError + else + @stats.bump(:namespace_watch_error_type_notices) + message = notice[:object][:message] if notice[:object] && notice[:object][:message] + raise "Error while watching namespaces: #{message}" + end + else + reset_namespace_watch_retry_stats + # Don't pay attention to creations, since the created namespace may not + # be used by any namespace on this node. + @stats.bump(:namespace_cache_watch_ignored) end end end diff --git a/lib/fluent/plugin/kubernetes_metadata_watch_pods.rb b/lib/fluent/plugin/kubernetes_metadata_watch_pods.rb index fa8a87d..4ec079e 100644 --- a/lib/fluent/plugin/kubernetes_metadata_watch_pods.rb +++ b/lib/fluent/plugin/kubernetes_metadata_watch_pods.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata @@ -20,9 +22,7 @@ require_relative 'kubernetes_metadata_common' module KubernetesMetadata - module WatchPods - include ::KubernetesMetadata::Common def set_up_pod_thread @@ -38,48 +38,47 @@ def set_up_pod_thread # processing will be swallowed and retried. These failures / # exceptions could be caused by Kubernetes API being temporarily # down. We assume the configuration is correct at this point. - while true - begin - pod_watcher ||= get_pods_and_start_watcher - process_pod_watcher_notices(pod_watcher) - rescue GoneError => e - # Expected error. Quietly go back through the loop in order to - # start watching from the latest resource versions - @stats.bump(:pod_watch_gone_errors) - log.info("410 Gone encountered. Restarting pod watch to reset resource versions.", e) + loop do + pod_watcher ||= get_pods_and_start_watcher + process_pod_watcher_notices(pod_watcher) + rescue GoneError => e + # Expected error. Quietly go back through the loop in order to + # start watching from the latest resource versions + @stats.bump(:pod_watch_gone_errors) + log.info('410 Gone encountered. Restarting pod watch to reset resource versions.', e) + pod_watcher = nil + rescue StandardError => e + @stats.bump(:pod_watch_failures) + if Thread.current[:pod_watch_retry_count] < @watch_retry_max_times + # Instead of raising exceptions and crashing Fluentd, swallow + # the exception and reset the watcher. + log.info( + 'Exception encountered parsing pod watch event. The ' \ + 'connection might have been closed. Sleeping for ' \ + "#{Thread.current[:pod_watch_retry_backoff_interval]} " \ + 'seconds and resetting the pod watcher.', e + ) + sleep(Thread.current[:pod_watch_retry_backoff_interval]) + Thread.current[:pod_watch_retry_count] += 1 + Thread.current[:pod_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base pod_watcher = nil - rescue => e - @stats.bump(:pod_watch_failures) - if Thread.current[:pod_watch_retry_count] < @watch_retry_max_times - # Instead of raising exceptions and crashing Fluentd, swallow - # the exception and reset the watcher. - log.info( - "Exception encountered parsing pod watch event. The " \ - "connection might have been closed. Sleeping for " \ - "#{Thread.current[:pod_watch_retry_backoff_interval]} " \ - "seconds and resetting the pod watcher.", e) - sleep(Thread.current[:pod_watch_retry_backoff_interval]) - Thread.current[:pod_watch_retry_count] += 1 - Thread.current[:pod_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base - pod_watcher = nil - else - # Since retries failed for many times, log as errors instead - # of info and raise exceptions and trigger Fluentd to restart. - message = - "Exception encountered parsing pod watch event. The " \ - "connection might have been closed. Retried " \ - "#{@watch_retry_max_times} times yet still failing. Restarting." - log.error(message, e) - raise Fluent::UnrecoverableError.new(message) - end + else + # Since retries failed for many times, log as errors instead + # of info and raise exceptions and trigger Fluentd to restart. + message = + 'Exception encountered parsing pod watch event. The ' \ + 'connection might have been closed. Retried ' \ + "#{@watch_retry_max_times} times yet still failing. Restarting." + log.error(message, e) + raise Fluent::UnrecoverableError, message end end end def start_pod_watch get_pods_and_start_watcher - rescue => e - message = "start_pod_watch: Exception encountered setting up pod watch " \ + rescue StandardError => e + message = 'start_pod_watch: Exception encountered setting up pod watch ' \ "from Kubernetes API #{@apiVersion} endpoint " \ "#{@kubernetes_url}: #{e.message}" message += " (#{e.response})" if e.respond_to?(:response) @@ -92,7 +91,7 @@ def start_pod_watch # from that resourceVersion. def get_pods_and_start_watcher options = { - resource_version: '0' # Fetch from API server cache instead of etcd quorum read + resource_version: '0' # Fetch from API server cache instead of etcd quorum read } if ENV['K8S_NODE_NAME'] options[:field_selector] = 'spec.nodeName=' + ENV['K8S_NODE_NAME'] @@ -133,39 +132,39 @@ def process_pod_watcher_notices(watcher) @last_seen_resource_version = version if version case notice[:type] - when 'MODIFIED' - reset_pod_watch_retry_stats - cache_key = notice.dig(:object, :metadata, :uid) - cached = @cache[cache_key] - if cached - @cache[cache_key] = parse_pod_metadata(notice[:object]) - @stats.bump(:pod_cache_watch_updates) - elsif ENV['K8S_NODE_NAME'] == notice[:object][:spec][:nodeName] then - @cache[cache_key] = parse_pod_metadata(notice[:object]) - @stats.bump(:pod_cache_host_updates) - else - @stats.bump(:pod_cache_watch_misses) - end - when 'DELETED' - reset_pod_watch_retry_stats - # ignore and let age out for cases where pods - # deleted but still processing logs - @stats.bump(:pod_cache_watch_delete_ignored) - when 'ERROR' - if notice[:object] && notice[:object][:code] == 410 - @last_seen_resource_version = nil # requested resourceVersion was too old, need to reset - @stats.bump(:pod_watch_gone_notices) - raise GoneError - else - @stats.bump(:pod_watch_error_type_notices) - message = notice[:object][:message] if notice[:object] && notice[:object][:message] - raise "Error while watching pods: #{message}" - end + when 'MODIFIED' + reset_pod_watch_retry_stats + cache_key = notice.dig(:object, :metadata, :uid) + cached = @cache[cache_key] + if cached + @cache[cache_key] = parse_pod_metadata(notice[:object]) + @stats.bump(:pod_cache_watch_updates) + elsif ENV['K8S_NODE_NAME'] == notice[:object][:spec][:nodeName] + @cache[cache_key] = parse_pod_metadata(notice[:object]) + @stats.bump(:pod_cache_host_updates) + else + @stats.bump(:pod_cache_watch_misses) + end + when 'DELETED' + reset_pod_watch_retry_stats + # ignore and let age out for cases where pods + # deleted but still processing logs + @stats.bump(:pod_cache_watch_delete_ignored) + when 'ERROR' + if notice[:object] && notice[:object][:code] == 410 + @last_seen_resource_version = nil # requested resourceVersion was too old, need to reset + @stats.bump(:pod_watch_gone_notices) + raise GoneError else - reset_pod_watch_retry_stats - # Don't pay attention to creations, since the created pod may not - # end up on this node. - @stats.bump(:pod_cache_watch_ignored) + @stats.bump(:pod_watch_error_type_notices) + message = notice[:object][:message] if notice[:object] && notice[:object][:message] + raise "Error while watching pods: #{message}" + end + else + reset_pod_watch_retry_stats + # Don't pay attention to creations, since the created pod may not + # end up on this node. + @stats.bump(:pod_cache_watch_ignored) end end end diff --git a/test/helper.rb b/test/helper.rb index 175f37d..2a5018a 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata @@ -65,16 +67,16 @@ def ipv6_enabled? begin TCPServer.open('::1', 0) true - rescue + rescue StandardError false end end # TEST_NAME='foo' ruby test_file.rb to run a single test case -if ENV["TEST_NAME"] +if ENV['TEST_NAME'] (class << Test::Unit::TestCase; self; end).prepend(Module.new do def test(name) - super if name == ENV["TEST_NAME"] + super if name == ENV['TEST_NAME'] end end) end diff --git a/test/plugin/test_cache_stats.rb b/test/plugin/test_cache_stats.rb index fde5884..7875a9d 100644 --- a/test/plugin/test_cache_stats.rb +++ b/test/plugin/test_cache_stats.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata @@ -19,15 +21,13 @@ require_relative '../helper' class KubernetesMetadataCacheStatsTest < Test::Unit::TestCase + test 'watch stats' do + require 'lru_redux' + stats = KubernetesMetadata::Stats.new + stats.bump(:missed) + stats.bump(:deleted) + stats.bump(:deleted) - test 'watch stats' do - require 'lru_redux' - stats = KubernetesMetadata::Stats.new - stats.bump(:missed) - stats.bump(:deleted) - stats.bump(:deleted) - - assert_equal("stats - deleted: 2, missed: 1", stats.to_s) - end - + assert_equal('stats - deleted: 2, missed: 1', stats.to_s) + end end diff --git a/test/plugin/test_cache_strategy.rb b/test/plugin/test_cache_strategy.rb index 1149844..255273f 100644 --- a/test/plugin/test_cache_strategy.rb +++ b/test/plugin/test_cache_strategy.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata @@ -19,175 +21,174 @@ require_relative '../helper' class TestCacheStrategy - include KubernetesMetadata::CacheStrategy - - def initialize - @stats = KubernetesMetadata::Stats.new - @cache = LruRedux::TTL::ThreadSafeCache.new(100,3600) - @id_cache = LruRedux::TTL::ThreadSafeCache.new(100,3600) - @namespace_cache = LruRedux::TTL::ThreadSafeCache.new(100,3600) - @orphaned_namespace_name = '.orphaned' - @orphaned_namespace_id = 'orphaned' - end - - attr_accessor :stats, :cache, :id_cache, :namespace_cache, :allow_orphans - - def fetch_pod_metadata(namespace_name, pod_name) - {} - end - - def fetch_namespace_metadata(namespace_name) - {} + include KubernetesMetadata::CacheStrategy + + def initialize + @stats = KubernetesMetadata::Stats.new + @cache = LruRedux::TTL::ThreadSafeCache.new(100, 3600) + @id_cache = LruRedux::TTL::ThreadSafeCache.new(100, 3600) + @namespace_cache = LruRedux::TTL::ThreadSafeCache.new(100, 3600) + @orphaned_namespace_name = '.orphaned' + @orphaned_namespace_id = 'orphaned' + end + + attr_accessor :stats, :cache, :id_cache, :namespace_cache, :allow_orphans + + def fetch_pod_metadata(_namespace_name, _pod_name) + {} + end + + def fetch_namespace_metadata(_namespace_name) + {} + end + + def log + logger = {} + def logger.trace? + true end - def log - logger = {} - def logger.trace? - true - end - def logger.trace(message) - end - logger + def logger.trace(message) end - + logger + end end class KubernetesMetadataCacheStrategyTest < Test::Unit::TestCase - - def setup - @strategy = TestCacheStrategy.new - @cache_key = 'some_long_container_id' - @namespace_name = 'some_namespace_name' - @namespace_uuid = 'some_namespace_uuid' - @pod_name = 'some_pod_name' - @pod_uuid = 'some_pod_uuid' - @time = Time.now - @pod_meta = {'pod_id'=> @pod_uuid, 'labels'=> {'meta'=>'pod'}} - @namespace_meta = {'namespace_id'=> @namespace_uuid, 'creation_timestamp'=>@time.to_s} + def setup + @strategy = TestCacheStrategy.new + @cache_key = 'some_long_container_id' + @namespace_name = 'some_namespace_name' + @namespace_uuid = 'some_namespace_uuid' + @pod_name = 'some_pod_name' + @pod_uuid = 'some_pod_uuid' + @time = Time.now + @pod_meta = { 'pod_id' => @pod_uuid, 'labels' => { 'meta' => 'pod' } } + @namespace_meta = { 'namespace_id' => @namespace_uuid, 'creation_timestamp' => @time.to_s } + end + + test 'when cached metadata is found' do + exp = @pod_meta.merge(@namespace_meta) + exp.delete('creation_timestamp') + @strategy.id_cache[@cache_key] = { + pod_id: @pod_uuid, + namespace_id: @namespace_uuid + } + @strategy.cache[@pod_uuid] = @pod_meta + @strategy.namespace_cache[@namespace_uuid] = @namespace_meta + assert_equal(exp, @strategy.get_pod_metadata(@cache_key, 'namespace', 'pod', @time, {})) + end + + test 'when previously processed record for pod but metadata is not cached and can not be fetched' do + exp = { + 'pod_id' => @pod_uuid, + 'namespace_id' => @namespace_uuid + } + @strategy.id_cache[@cache_key] = { + pod_id: @pod_uuid, + namespace_id: @namespace_uuid + } + @strategy.stub :fetch_pod_metadata, {} do + @strategy.stub :fetch_namespace_metadata, nil do + assert_equal(exp, @strategy.get_pod_metadata(@cache_key, 'namespace', 'pod', @time, {})) + end end - - test 'when cached metadata is found' do - exp = @pod_meta.merge(@namespace_meta) - exp.delete('creation_timestamp') - @strategy.id_cache[@cache_key] = { - pod_id: @pod_uuid, - namespace_id: @namespace_uuid - } - @strategy.cache[@pod_uuid] = @pod_meta - @strategy.namespace_cache[@namespace_uuid] = @namespace_meta - assert_equal(exp, @strategy.get_pod_metadata(@cache_key,'namespace', 'pod', @time, {})) + end + + test 'when metadata is not cached and is fetched' do + exp = @pod_meta.merge(@namespace_meta) + exp.delete('creation_timestamp') + @strategy.stub :fetch_pod_metadata, @pod_meta do + @strategy.stub :fetch_namespace_metadata, @namespace_meta do + assert_equal(exp, @strategy.get_pod_metadata(@cache_key, 'namespace', 'pod', @time, {})) + assert_true(@strategy.id_cache.key?(@cache_key)) + end end - - test 'when previously processed record for pod but metadata is not cached and can not be fetched' do - exp = { - 'pod_id'=> @pod_uuid, - 'namespace_id'=> @namespace_uuid - } - @strategy.id_cache[@cache_key] = { - pod_id: @pod_uuid, - namespace_id: @namespace_uuid - } - @strategy.stub :fetch_pod_metadata, {} do - @strategy.stub :fetch_namespace_metadata, nil do - assert_equal(exp, @strategy.get_pod_metadata(@cache_key,'namespace', 'pod', @time, {})) - end - end + end + + test 'when metadata is not cached and pod is deleted and namespace metadata is fetched' do + # this is the case for a record from a deleted pod where no other + # records were read. using the container hash since that is all + # we ever will have and should allow us to process all the deleted + # pod records + exp = { + 'pod_id' => @cache_key, + 'namespace_id' => @namespace_uuid + } + @strategy.stub :fetch_pod_metadata, {} do + @strategy.stub :fetch_namespace_metadata, @namespace_meta do + assert_equal(exp, @strategy.get_pod_metadata(@cache_key, 'namespace', 'pod', @time, {})) + assert_true(@strategy.id_cache.key?(@cache_key)) + end end - - test 'when metadata is not cached and is fetched' do - exp = @pod_meta.merge(@namespace_meta) - exp.delete('creation_timestamp') - @strategy.stub :fetch_pod_metadata, @pod_meta do - @strategy.stub :fetch_namespace_metadata, @namespace_meta do - assert_equal(exp, @strategy.get_pod_metadata(@cache_key,'namespace', 'pod', @time, {})) - assert_true(@strategy.id_cache.key?(@cache_key)) - end - end + end + + test 'when metadata is not cached and pod is deleted and namespace is for a different namespace with the same name' do + # this is the case for a record from a deleted pod from a deleted namespace + # where new namespace was created with the same name + exp = { + 'namespace_id' => @namespace_uuid + } + @strategy.stub :fetch_pod_metadata, {} do + @strategy.stub :fetch_namespace_metadata, @namespace_meta do + assert_equal(exp, @strategy.get_pod_metadata(@cache_key, 'namespace', 'pod', @time - 1 * 86_400, {})) + assert_true(@strategy.id_cache.key?(@cache_key)) + end end - - test 'when metadata is not cached and pod is deleted and namespace metadata is fetched' do - # this is the case for a record from a deleted pod where no other - # records were read. using the container hash since that is all - # we ever will have and should allow us to process all the deleted - # pod records - exp = { - 'pod_id'=> @cache_key, - 'namespace_id'=> @namespace_uuid - } - @strategy.stub :fetch_pod_metadata, {} do - @strategy.stub :fetch_namespace_metadata, @namespace_meta do - assert_equal(exp, @strategy.get_pod_metadata(@cache_key,'namespace', 'pod', @time, {})) - assert_true(@strategy.id_cache.key?(@cache_key)) - end - end + end + + test 'when metadata is not cached and no metadata can be fetched and not allowing orphans' do + # we should never see this since pod meta should not be retrievable + # unless the namespace exists + @strategy.stub :fetch_pod_metadata, @pod_meta do + @strategy.stub :fetch_namespace_metadata, {} do + assert_equal({}, @strategy.get_pod_metadata(@cache_key, 'namespace', 'pod', @time - 1 * 86_400, {})) + end end - - test 'when metadata is not cached and pod is deleted and namespace is for a different namespace with the same name' do - # this is the case for a record from a deleted pod from a deleted namespace - # where new namespace was created with the same name - exp = { - 'namespace_id'=> @namespace_uuid - } - @strategy.stub :fetch_pod_metadata, {} do - @strategy.stub :fetch_namespace_metadata, @namespace_meta do - assert_equal(exp, @strategy.get_pod_metadata(@cache_key,'namespace', 'pod', @time - 1*86400, {})) - assert_true(@strategy.id_cache.key?(@cache_key)) - end - end + end + + test 'when metadata is not cached and no metadata can be fetched and allowing orphans' do + # we should never see this since pod meta should not be retrievable + # unless the namespace exists + @strategy.allow_orphans = true + exp = { + 'orphaned_namespace' => 'namespace', + 'namespace_name' => '.orphaned', + 'namespace_id' => 'orphaned' + } + @strategy.stub :fetch_pod_metadata, @pod_meta do + @strategy.stub :fetch_namespace_metadata, {} do + assert_equal(exp, @strategy.get_pod_metadata(@cache_key, 'namespace', 'pod', @time - 1 * 86_400, {})) + end end - - test 'when metadata is not cached and no metadata can be fetched and not allowing orphans' do - # we should never see this since pod meta should not be retrievable - # unless the namespace exists - @strategy.stub :fetch_pod_metadata, @pod_meta do - @strategy.stub :fetch_namespace_metadata, {} do - assert_equal({}, @strategy.get_pod_metadata(@cache_key,'namespace', 'pod', @time - 1*86400, {})) - end - end + end + + test 'when metadata is not cached and no metadata can be fetched and not allowing orphans for multiple records' do + # processing a batch of records with no meta. ideally we only hit the api server once + batch_miss_cache = {} + @strategy.stub :fetch_pod_metadata, {} do + @strategy.stub :fetch_namespace_metadata, {} do + assert_equal({}, @strategy.get_pod_metadata(@cache_key, 'namespace', 'pod', @time, batch_miss_cache)) + end end - - test 'when metadata is not cached and no metadata can be fetched and allowing orphans' do - # we should never see this since pod meta should not be retrievable - # unless the namespace exists - @strategy.allow_orphans = true - exp = { - 'orphaned_namespace' => 'namespace', - 'namespace_name' => '.orphaned', - 'namespace_id' => 'orphaned' - } - @strategy.stub :fetch_pod_metadata, @pod_meta do - @strategy.stub :fetch_namespace_metadata, {} do - assert_equal(exp, @strategy.get_pod_metadata(@cache_key,'namespace', 'pod', @time - 1*86400, {})) - end - end - end - - test 'when metadata is not cached and no metadata can be fetched and not allowing orphans for multiple records' do - # processing a batch of records with no meta. ideally we only hit the api server once - batch_miss_cache = {} - @strategy.stub :fetch_pod_metadata, {} do - @strategy.stub :fetch_namespace_metadata, {} do - assert_equal({}, @strategy.get_pod_metadata(@cache_key,'namespace', 'pod', @time, batch_miss_cache)) - end - end - assert_equal({}, @strategy.get_pod_metadata(@cache_key,'namespace', 'pod', @time, batch_miss_cache)) - end - - test 'when metadata is not cached and no metadata can be fetched and allowing orphans for multiple records' do - # we should never see this since pod meta should not be retrievable - # unless the namespace exists - @strategy.allow_orphans = true - exp = { - 'orphaned_namespace' => 'namespace', - 'namespace_name' => '.orphaned', - 'namespace_id' => 'orphaned' - } - batch_miss_cache = {} - @strategy.stub :fetch_pod_metadata, {} do - @strategy.stub :fetch_namespace_metadata, {} do - assert_equal(exp, @strategy.get_pod_metadata(@cache_key,'namespace', 'pod', @time, batch_miss_cache)) - end - end - assert_equal(exp, @strategy.get_pod_metadata(@cache_key,'namespace', 'pod', @time, batch_miss_cache)) + assert_equal({}, @strategy.get_pod_metadata(@cache_key, 'namespace', 'pod', @time, batch_miss_cache)) + end + + test 'when metadata is not cached and no metadata can be fetched and allowing orphans for multiple records' do + # we should never see this since pod meta should not be retrievable + # unless the namespace exists + @strategy.allow_orphans = true + exp = { + 'orphaned_namespace' => 'namespace', + 'namespace_name' => '.orphaned', + 'namespace_id' => 'orphaned' + } + batch_miss_cache = {} + @strategy.stub :fetch_pod_metadata, {} do + @strategy.stub :fetch_namespace_metadata, {} do + assert_equal(exp, @strategy.get_pod_metadata(@cache_key, 'namespace', 'pod', @time, batch_miss_cache)) + end end + assert_equal(exp, @strategy.get_pod_metadata(@cache_key, 'namespace', 'pod', @time, batch_miss_cache)) + end end diff --git a/test/plugin/test_filter_kubernetes_metadata.rb b/test/plugin/test_filter_kubernetes_metadata.rb index d900856..3067353 100644 --- a/test/plugin/test_filter_kubernetes_metadata.rb +++ b/test/plugin/test_filter_kubernetes_metadata.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata @@ -33,7 +35,6 @@ def create_driver(conf = '') end sub_test_case 'configure' do - test 'check default' do d = create_driver assert_equal(1000, d.instance.cache_size) @@ -77,85 +78,89 @@ def create_driver(conf = '') test 'service account credentials' do VCR.use_cassette('valid_kubernetes_api_server') do - begin - ENV['KUBERNETES_SERVICE_HOST'] = 'localhost' - ENV['KUBERNETES_SERVICE_PORT'] = '8443' + ENV['KUBERNETES_SERVICE_HOST'] = 'localhost' + ENV['KUBERNETES_SERVICE_PORT'] = '8443' - Dir.mktmpdir { |dir| - # Fake token file and CA crt. - expected_cert_path = File.join(dir, Plugin::KubernetesMetadataFilter::K8_POD_CA_CERT) - expected_token_path = File.join(dir, Plugin::KubernetesMetadataFilter::K8_POD_TOKEN) + Dir.mktmpdir do |dir| + # Fake token file and CA crt. + expected_cert_path = File.join(dir, Plugin::KubernetesMetadataFilter::K8_POD_CA_CERT) + expected_token_path = File.join(dir, Plugin::KubernetesMetadataFilter::K8_POD_TOKEN) - File.open(expected_cert_path, "w") {} - File.open(expected_token_path, "w") {} + File.open(expected_cert_path, 'w') + File.open(expected_token_path, 'w') - d = create_driver(" + d = create_driver(" watch false secret_dir #{dir} ") - assert_equal(d.instance.kubernetes_url, "https://localhost:8443/api") - assert_equal(d.instance.ca_file, expected_cert_path) - assert_equal(d.instance.bearer_token_file, expected_token_path) - } - ensure - ENV['KUBERNETES_SERVICE_HOST'] = nil - ENV['KUBERNETES_SERVICE_PORT'] = nil + assert_equal(d.instance.kubernetes_url, 'https://localhost:8443/api') + assert_equal(d.instance.ca_file, expected_cert_path) + assert_equal(d.instance.bearer_token_file, expected_token_path) end + ensure + ENV['KUBERNETES_SERVICE_HOST'] = nil + ENV['KUBERNETES_SERVICE_PORT'] = nil end end test 'service account credential files are tested for existence' do VCR.use_cassette('valid_kubernetes_api_server') do - begin - ENV['KUBERNETES_SERVICE_HOST'] = 'localhost' - ENV['KUBERNETES_SERVICE_PORT'] = '8443' + ENV['KUBERNETES_SERVICE_HOST'] = 'localhost' + ENV['KUBERNETES_SERVICE_PORT'] = '8443' - Dir.mktmpdir { |dir| - d = create_driver(" + Dir.mktmpdir do |dir| + d = create_driver(" watch false secret_dir #{dir} ") - assert_equal(d.instance.kubernetes_url, "https://localhost:8443/api") - assert_nil(d.instance.ca_file, nil) - assert_nil(d.instance.bearer_token_file) - } - ensure - ENV['KUBERNETES_SERVICE_HOST'] = nil - ENV['KUBERNETES_SERVICE_PORT'] = nil + assert_equal(d.instance.kubernetes_url, 'https://localhost:8443/api') + assert_nil(d.instance.ca_file, nil) + assert_nil(d.instance.bearer_token_file) end + ensure + ENV['KUBERNETES_SERVICE_HOST'] = nil + ENV['KUBERNETES_SERVICE_PORT'] = nil end end end sub_test_case 'filter' do - - def emit(msg={}, config=' + def emit(msg = {}, config = ' kubernetes_url https://localhost:8443 watch false cache_size 1 ', d: nil) d = create_driver(config) if d.nil? - d.run(default_tag: DEFAULT_TAG) { + d.run(default_tag: DEFAULT_TAG) do d.feed(@time, msg) - } - d.filtered.map{|e| e.last} + end + d.filtered.map(&:last) end - def emit_with_tag(tag, msg={}, config=' + def emit_with_tag(tag, msg = {}, config = ' kubernetes_url https://localhost:8443 watch false cache_size 1 ') d = create_driver(config) - d.run(default_tag: tag) { - d.feed(msg) - } - d.filtered_records + d.run(default_tag: tag) do + d.feed(@time, msg) + end + d.filtered.map(&:last) + end + + test 'nil event stream' do + # not certain how this is possible but adding test to properly + # guard against this condition we have seen - test for nil, + # empty, no empty method, not an event stream + plugin = create_driver.instance + plugin.filter_stream('tag', nil) + plugin.filter_stream('tag', Fluent::MultiEventStream.new) end test 'inability to connect to the api server handles exception and doensnt block pipeline' do - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }]) do driver = create_driver(' kubernetes_url https://localhost:8443 watch false @@ -163,18 +168,18 @@ def emit_with_tag(tag, msg={}, config=' ') stub_request(:any, 'https://localhost:8443/api/v1/namespaces/default/pods/fabric8-console-controller-98rqc').to_raise(SocketError.new('error from pod fetch')) stub_request(:any, 'https://localhost:8443/api/v1/namespaces/default').to_raise(SocketError.new('socket error from namespace fetch')) - filtered = emit({'time'=>'2015-05-08T09:22:01Z'}, '', :d => driver) + filtered = emit({ 'time' => '2015-05-08T09:22:01Z' }, '', d: driver) expected_kube_metadata = { - 'time'=>'2015-05-08T09:22:01Z', + 'time' => '2015-05-08T09:22:01Z', 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'pod_name' => 'fabric8-console-controller-98rqc', + 'pod_name' => 'fabric8-console-controller-98rqc', 'container_name' => 'fabric8-console-container', - "namespace_id"=>"orphaned", + 'namespace_id' => 'orphaned', 'namespace_name' => '.orphaned', - "orphaned_namespace"=>"default" + 'orphaned_namespace' => 'default' } } assert_equal(expected_kube_metadata, filtered[0]) @@ -182,7 +187,7 @@ def emit_with_tag(tag, msg={}, config=' end test 'with docker & kubernetes metadata where id cache hit and metadata miss' do - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }]) do driver = create_driver(' kubernetes_url https://localhost:8443 watch false @@ -190,23 +195,23 @@ def emit_with_tag(tag, msg={}, config=' ') cache = driver.instance.instance_variable_get(:@id_cache) cache['49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459'] = { - :pod_id =>'c76927af-f563-11e4-b32d-54ee7527188d', - :namespace_id =>'898268c8-4a36-11e5-9d81-42010af0194c' + pod_id: 'c76927af-f563-11e4-b32d-54ee7527188d', + namespace_id: '898268c8-4a36-11e5-9d81-42010af0194c' } stub_request(:any, 'https://localhost:8443/api/v1/namespaces/default/pods/fabric8-console-controller-98rqc').to_timeout stub_request(:any, 'https://localhost:8443/api/v1/namespaces/default').to_timeout - filtered = emit({'time'=>'2015-05-08T09:22:01Z'}, '', d:driver) + filtered = emit({ 'time' => '2015-05-08T09:22:01Z' }, '', d: driver) expected_kube_metadata = { - 'time'=>'2015-05-08T09:22:01Z', + 'time' => '2015-05-08T09:22:01Z', 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'namespace_name' => 'default', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'namespace_name' => 'default', + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d' } } @@ -215,7 +220,7 @@ def emit_with_tag(tag, msg={}, config=' end test 'with docker & kubernetes metadata where id cache hit and metadata is reloaded' do - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, {name: 'kubernetes_get_pod'}, {name: 'kubernetes_get_namespace_default'}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, { name: 'kubernetes_get_pod' }, { name: 'kubernetes_get_namespace_default' }]) do driver = create_driver(' kubernetes_url https://localhost:8443 watch false @@ -223,25 +228,25 @@ def emit_with_tag(tag, msg={}, config=' ') cache = driver.instance.instance_variable_get(:@id_cache) cache['49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459'] = { - :pod_id =>'c76927af-f563-11e4-b32d-54ee7527188d', - :namespace_id =>'898268c8-4a36-11e5-9d81-42010af0194c' + pod_id: 'c76927af-f563-11e4-b32d-54ee7527188d', + namespace_id: '898268c8-4a36-11e5-9d81-42010af0194c' } - filtered = emit({'time'=>'2015-05-08T09:22:01Z'}, '', d:driver) + filtered = emit({ 'time' => '2015-05-08T09:22:01Z' }, '', d: driver) expected_kube_metadata = { - 'time'=>'2015-05-08T09:22:01Z', + 'time' => '2015-05-08T09:22:01Z', 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'namespace_name' => 'default', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', + 'namespace_name' => 'default', + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', 'labels' => { 'component' => 'fabric8Console' } @@ -253,23 +258,23 @@ def emit_with_tag(tag, msg={}, config=' end test 'with docker & kubernetes metadata' do - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, {name: 'kubernetes_get_pod'}, {name: 'kubernetes_get_namespace_default'}]) do - filtered = emit({'time'=>'2015-05-08T09:22:01Z'}) + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, { name: 'kubernetes_get_pod' }, { name: 'kubernetes_get_namespace_default' }]) do + filtered = emit({ 'time' => '2015-05-08T09:22:01Z' }) expected_kube_metadata = { - 'time'=>'2015-05-08T09:22:01Z', + 'time' => '2015-05-08T09:22:01Z', 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'namespace_name' => 'default', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', + 'namespace_name' => 'default', + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', 'labels' => { 'component' => 'fabric8Console' } @@ -281,8 +286,8 @@ def emit_with_tag(tag, msg={}, config=' end test 'with docker & kubernetes metadata & namespace_id enabled' do - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, {name: 'kubernetes_get_pod'}, - {name: 'kubernetes_get_namespace_default', options: {allow_playback_repeats: true}}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, { name: 'kubernetes_get_pod' }, + { name: 'kubernetes_get_namespace_default', options: { allow_playback_repeats: true } }]) do filtered = emit({}, ' kubernetes_url https://localhost:8443 watch false @@ -290,18 +295,18 @@ def emit_with_tag(tag, msg={}, config=' ') expected_kube_metadata = { 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'namespace_name' => 'default', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', + 'namespace_name' => 'default', + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', 'labels' => { 'component' => 'fabric8Console' } @@ -312,8 +317,8 @@ def emit_with_tag(tag, msg={}, config=' end test 'with docker & kubernetes metadata using bearer token' do - VCR.use_cassettes([{name: 'valid_kubernetes_api_server_using_token'}, {name: 'kubernetes_get_api_v1_using_token'}, - {name: 'kubernetes_get_pod_using_token'}, {name: 'kubernetes_get_namespace_default_using_token'}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server_using_token' }, { name: 'kubernetes_get_api_v1_using_token' }, + { name: 'kubernetes_get_pod_using_token' }, { name: 'kubernetes_get_namespace_default_using_token' }]) do filtered = emit({}, ' kubernetes_url https://localhost:8443 verify_ssl false @@ -325,15 +330,15 @@ def emit_with_tag(tag, msg={}, config=' 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'namespace_name' => 'default', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', + 'namespace_name' => 'default', + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', 'labels' => { 'component' => 'fabric8Console' } @@ -346,14 +351,14 @@ def emit_with_tag(tag, msg={}, config=' test 'with docker & kubernetes metadata but no configured api server' do filtered = emit({}, '') expected_kube_metadata = { - 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' - }, - 'kubernetes' => { - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'namespace_name' => 'default', - } + 'docker' => { + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + }, + 'kubernetes' => { + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'namespace_name' => 'default' + } } assert_equal(expected_kube_metadata, filtered[0]) end @@ -366,17 +371,17 @@ def emit_with_tag(tag, msg={}, config=' ) stub_request(:any, 'https://localhost:8443/api/v1/namespaces/default/pods/fabric8-console-controller-98rqc').to_timeout stub_request(:any, 'https://localhost:8443/api/v1/namespaces/default').to_timeout - filtered = emit() + filtered = emit expected_kube_metadata = { 'docker' => { 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'namespace_name' => '.orphaned', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'namespace_name' => '.orphaned', 'orphaned_namespace' => 'default', - 'namespace_id' => 'orphaned' + 'namespace_id' => 'orphaned' } } assert_equal(expected_kube_metadata, filtered[0]) @@ -395,9 +400,9 @@ def emit_with_tag(tag, msg={}, config=' 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'pod_name' => 'fabric8-console-controller.98rqc', - 'container_name' => 'fabric8-console-container', - 'namespace_name' => 'default' + 'pod_name' => 'fabric8-console-controller.98rqc', + 'container_name' => 'fabric8-console-container', + 'namespace_name' => 'default' } } assert_equal(expected_kube_metadata, filtered[0]) @@ -405,21 +410,21 @@ def emit_with_tag(tag, msg={}, config=' test 'with docker metadata, non-kubernetes' do filtered = emit_with_tag('non-kubernetes', {}, '') - assert_false(filtered[0].has_key?(:kubernetes)) + assert_false(filtered[0].key?(:kubernetes)) end test 'ignores invalid json in log field' do json_log = "{'foo':123}" msg = { - 'log' => json_log + 'log' => json_log } filtered = emit_with_tag('non-kubernetes', msg, '') assert_equal(msg, filtered[0]) end test 'with kubernetes dotted labels, de_dot enabled' do - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, - {name: 'kubernetes_docker_metadata_dotted_labels'}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, + { name: 'kubernetes_docker_metadata_dotted_labels' }]) do filtered = emit({}, ' kubernetes_url https://localhost:8443 watch false @@ -427,21 +432,21 @@ def emit_with_tag(tag, msg={}, config=' ') expected_kube_metadata = { 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'namespace_labels' => { + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'namespace_labels' => { 'kubernetes_io/namespacetest' => 'somevalue' }, - 'namespace_name' => 'default', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', + 'namespace_name' => 'default', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', 'labels' => { 'kubernetes_io/test' => 'somevalue' } @@ -452,8 +457,8 @@ def emit_with_tag(tag, msg={}, config=' end test 'with kubernetes dotted labels, de_dot disabled' do - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, - {name: 'kubernetes_docker_metadata_dotted_labels'}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, + { name: 'kubernetes_docker_metadata_dotted_labels' }]) do filtered = emit({}, ' kubernetes_url https://localhost:8443 watch false @@ -462,21 +467,21 @@ def emit_with_tag(tag, msg={}, config=' ') expected_kube_metadata = { 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'namespace_labels' => { + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'namespace_labels' => { 'kubernetes.io/namespacetest' => 'somevalue' }, - 'namespace_name' => 'default', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', + 'namespace_name' => 'default', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', 'labels' => { 'kubernetes.io/test' => 'somevalue' } @@ -502,8 +507,8 @@ def emit_with_tag(tag, msg={}, config=' 'CONTAINER_ID_FULL' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459', 'randomfield' => 'randomvalue' } - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, {name: 'kubernetes_get_pod'}, - {name: 'kubernetes_get_namespace_default'}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, { name: 'kubernetes_get_pod' }, + { name: 'kubernetes_get_namespace_default' }]) do filtered = emit_with_tag(tag, msg, ' kubernetes_url https://localhost:8443 watch false @@ -512,18 +517,18 @@ def emit_with_tag(tag, msg={}, config=' ') expected_kube_metadata = { 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'namespace_name' => 'default', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', + 'namespace_name' => 'default', + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', 'labels' => { 'component' => 'fabric8Console' } @@ -541,8 +546,8 @@ def emit_with_tag(tag, msg={}, config=' 'CONTAINER_ID_FULL' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459', 'randomfield' => 'randomvalue' } - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, {name: 'kubernetes_get_pod'}, - {name: 'kubernetes_get_namespace_default', options: {allow_playback_repeats: true}}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, { name: 'kubernetes_get_pod' }, + { name: 'kubernetes_get_namespace_default', options: { allow_playback_repeats: true } }]) do filtered = emit_with_tag(tag, msg, ' kubernetes_url https://localhost:8443 watch false @@ -551,18 +556,18 @@ def emit_with_tag(tag, msg={}, config=' ') expected_kube_metadata = { 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'namespace_name' => 'default', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', + 'namespace_name' => 'default', + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', 'labels' => { 'component' => 'fabric8Console' } @@ -583,12 +588,12 @@ def emit_with_tag(tag, msg={}, config=' 'pod_name' => 'k8s-pod-name', 'container_name' => 'k8s-container-name' }, - 'docker' => {'container_id' => 'e463bc0d3ae38f5c89d92dca49b30e049e899799920b79d4d5f705acbe82ba95'}, + 'docker' => { 'container_id' => 'e463bc0d3ae38f5c89d92dca49b30e049e899799920b79d4d5f705acbe82ba95' }, 'randomfield' => 'randomvalue' } - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, {name: 'kubernetes_get_pod'}, - {name: 'kubernetes_get_namespace_default'}, - {name: 'metadata_from_tag_journald_and_kubernetes_fields'}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, { name: 'kubernetes_get_pod' }, + { name: 'kubernetes_get_namespace_default' }, + { name: 'metadata_from_tag_journald_and_kubernetes_fields' }]) do es = emit_with_tag(tag, msg, ' kubernetes_url https://localhost:8443 watch false @@ -596,23 +601,23 @@ def emit_with_tag(tag, msg={}, config=' ') expected_kube_metadata = { 'docker' => { - 'container_id' => 'e463bc0d3ae38f5c89d92dca49b30e049e899799920b79d4d5f705acbe82ba95' + 'container_id' => 'e463bc0d3ae38f5c89d92dca49b30e049e899799920b79d4d5f705acbe82ba95' }, 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'k8s-pod-name', - 'container_name' => 'k8s-container-name', - 'container_image' => 'k8s-container-image:latest', + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'k8s-pod-name', + 'container_name' => 'k8s-container-name', + 'container_image' => 'k8s-container-image:latest', 'container_image_id' => 'docker://d78c5217c41e9af08d37d9ae2cb070afa1fe3da6bc77bfb18879a8b4bfdf8a34', - 'namespace_name' => 'k8s-namespace-name', - 'namespace_id' => '8e0dc8fc-59f2-49f7-a3e2-ed0913e19d9f', - 'pod_id' => 'ebabf749-5fcd-4750-a3f0-aedd89476da8', - 'master_url' => 'https://localhost:8443', + 'namespace_name' => 'k8s-namespace-name', + 'namespace_id' => '8e0dc8fc-59f2-49f7-a3e2-ed0913e19d9f', + 'pod_id' => 'ebabf749-5fcd-4750-a3f0-aedd89476da8', + 'master_url' => 'https://localhost:8443', 'labels' => { 'component' => 'k8s-test' } } - }.merge(msg) {|key,oldval,newval| ((key == 'kubernetes') || (key == 'docker')) ? oldval : newval} + }.merge(msg) { |key, oldval, newval| (key == 'kubernetes') || (key == 'docker') ? oldval : newval } assert_equal(expected_kube_metadata, es[0]) end end @@ -628,12 +633,12 @@ def emit_with_tag(tag, msg={}, config=' 'pod_name' => 'k8s-pod-name', 'container_name' => 'k8s-container-name' }, - 'docker' => {'container_id' => 'e463bc0d3ae38f5c89d92dca49b30e049e899799920b79d4d5f705acbe82ba95'}, + 'docker' => { 'container_id' => 'e463bc0d3ae38f5c89d92dca49b30e049e899799920b79d4d5f705acbe82ba95' }, 'randomfield' => 'randomvalue' } - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, {name: 'kubernetes_get_pod'}, - {name: 'kubernetes_get_namespace_default', options: {allow_playback_repeats: true}}, - {name: 'metadata_from_tag_and_journald_fields'}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, { name: 'kubernetes_get_pod' }, + { name: 'kubernetes_get_namespace_default', options: { allow_playback_repeats: true } }, + { name: 'metadata_from_tag_and_journald_fields' }]) do es = emit_with_tag(tag, msg, ' kubernetes_url https://localhost:8443 watch false @@ -642,23 +647,23 @@ def emit_with_tag(tag, msg={}, config=' ') expected_kube_metadata = { 'docker' => { - 'container_id' => '838350c64bacba968d39a30a50789b2795291fceca6ccbff55298671d46b0e3b' + 'container_id' => '838350c64bacba968d39a30a50789b2795291fceca6ccbff55298671d46b0e3b' }, 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'journald-pod-name', - 'container_name' => 'journald-container-name', - 'container_image' => 'journald-container-image:latest', + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'journald-pod-name', + 'container_name' => 'journald-container-name', + 'container_image' => 'journald-container-image:latest', 'container_image_id' => 'docker://dda4c95705fb7050b701b10a7fe928ca5bc971a1dcb521ae3c339194cbf36b47', - 'namespace_name' => 'journald-namespace-name', - 'namespace_id' => '8282888f-733f-4f23-a3d3-1fdfa3bdacf2', - 'pod_id' => '5e1c1e27-b637-4e81-80b6-6d8a8c404d3b', - 'master_url' => 'https://localhost:8443', + 'namespace_name' => 'journald-namespace-name', + 'namespace_id' => '8282888f-733f-4f23-a3d3-1fdfa3bdacf2', + 'pod_id' => '5e1c1e27-b637-4e81-80b6-6d8a8c404d3b', + 'master_url' => 'https://localhost:8443', 'labels' => { 'component' => 'journald-test' } } - }.merge(msg) {|key,oldval,newval| ((key == 'kubernetes') || (key == 'docker')) ? oldval : newval} + }.merge(msg) { |key, oldval, newval| (key == 'kubernetes') || (key == 'docker') ? oldval : newval } assert_equal(expected_kube_metadata, es[0]) end end @@ -674,12 +679,12 @@ def emit_with_tag(tag, msg={}, config=' 'pod_name' => 'k8s-pod-name', 'container_name' => 'k8s-container-name' }, - 'docker' => {'container_id' => 'e463bc0d3ae38f5c89d92dca49b30e049e899799920b79d4d5f705acbe82ba95'}, + 'docker' => { 'container_id' => 'e463bc0d3ae38f5c89d92dca49b30e049e899799920b79d4d5f705acbe82ba95' }, 'randomfield' => 'randomvalue' } - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, {name: 'kubernetes_get_pod'}, - {name: 'kubernetes_get_namespace_default', options: {allow_playback_repeats: true}}, - {name: 'metadata_from_tag_and_journald_fields'}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, { name: 'kubernetes_get_pod' }, + { name: 'kubernetes_get_namespace_default', options: { allow_playback_repeats: true } }, + { name: 'metadata_from_tag_and_journald_fields' }]) do es = emit_with_tag(tag, msg, ' kubernetes_url https://localhost:8443 watch false @@ -689,59 +694,59 @@ def emit_with_tag(tag, msg={}, config=' ') expected_kube_metadata = { 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'namespace_name' => 'default', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', + 'namespace_name' => 'default', + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', 'labels' => { 'component' => 'fabric8Console' } } - }.merge(msg) {|key,oldval,newval| ((key == 'kubernetes') || (key == 'docker')) ? oldval : newval} + }.merge(msg) { |key, oldval, newval| (key == 'kubernetes') || (key == 'docker') ? oldval : newval } assert_equal(expected_kube_metadata, es[0]) end end test 'with kubernetes annotations' do - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, - {name: 'kubernetes_docker_metadata_annotations'}, - {name: 'kubernetes_get_namespace_default'}]) do - filtered = emit({},' + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, + { name: 'kubernetes_docker_metadata_annotations' }, + { name: 'kubernetes_get_namespace_default' }]) do + filtered = emit({}, ' kubernetes_url https://localhost:8443 watch false cache_size 1 annotation_match [ "^custom.+", "two"] ') expected_kube_metadata = { - 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + 'docker' => { + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + }, + 'kubernetes' => { + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', + 'namespace_name' => 'default', + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', + 'labels' => { + 'component' => 'fabric8Console' }, - 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', - 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'namespace_name' => 'default', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', - 'labels' => { - 'component' => 'fabric8Console' - }, - 'annotations' => { - 'custom_field1' => 'hello_kitty', - 'field_two' => 'value' - } + 'annotations' => { + 'custom_field1' => 'hello_kitty', + 'field_two' => 'value' } + } } assert_equal(expected_kube_metadata, filtered[0]) end @@ -756,12 +761,12 @@ def emit_with_tag(tag, msg={}, config=' 'randomfield' => 'randomvalue' } VCR.use_cassettes([ - {name: 'valid_kubernetes_api_server'}, - {name: 'kubernetes_get_api_v1'}, - {name: 'kubernetes_get_pod'}, - {name: 'kubernetes_get_namespace_default'}, - {name: 'metadata_from_tag_and_journald_fields'} - ]) do + { name: 'valid_kubernetes_api_server' }, + { name: 'kubernetes_get_api_v1' }, + { name: 'kubernetes_get_pod' }, + { name: 'kubernetes_get_namespace_default' }, + { name: 'metadata_from_tag_and_journald_fields' } + ]) do filtered = emit_with_tag(tag, msg, ' kubernetes_url https://localhost:8443 watch false @@ -770,18 +775,18 @@ def emit_with_tag(tag, msg={}, config=' ') expected_kube_metadata = { 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' }, 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'namespace_name' => 'default', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', + 'namespace_name' => 'default', + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', 'labels' => { 'component' => 'fabric8Console' } @@ -792,73 +797,73 @@ def emit_with_tag(tag, msg={}, config=' end test 'with kubernetes namespace annotations' do - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, - {name: 'kubernetes_docker_metadata_annotations'}, - {name: 'kubernetes_get_namespace_default'}]) do - filtered = emit({},' + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, + { name: 'kubernetes_docker_metadata_annotations' }, + { name: 'kubernetes_get_namespace_default' }]) do + filtered = emit({}, ' kubernetes_url https://localhost:8443 watch false cache_size 1 annotation_match [ "^custom.+", "two", "workspace*"] ') expected_kube_metadata = { - 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + 'docker' => { + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + }, + 'kubernetes' => { + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'namespace_name' => 'default', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', + 'labels' => { + 'component' => 'fabric8Console' + }, + 'annotations' => { + 'custom_field1' => 'hello_kitty', + 'field_two' => 'value' }, - 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'namespace_name' => 'default', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', - 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', - 'labels' => { - 'component' => 'fabric8Console' - }, - 'annotations' => { - 'custom_field1' => 'hello_kitty', - 'field_two' => 'value' - }, - 'namespace_annotations' => { - 'workspaceId' => 'myWorkspaceName' - } + 'namespace_annotations' => { + 'workspaceId' => 'myWorkspaceName' } + } } assert_equal(expected_kube_metadata, filtered[0]) end end test 'with kubernetes namespace annotations no match' do - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, - {name: 'kubernetes_docker_metadata_annotations'}, - {name: 'kubernetes_get_namespace_default'}]) do - filtered = emit({},' + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, + { name: 'kubernetes_docker_metadata_annotations' }, + { name: 'kubernetes_get_namespace_default' }]) do + filtered = emit({}, ' kubernetes_url https://localhost:8443 watch false cache_size 1 annotation_match [ "noMatch*"] ') expected_kube_metadata = { - 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' - }, - 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', - 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'namespace_name' => 'default', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', - 'labels' => { - 'component' => 'fabric8Console' - } + 'docker' => { + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + }, + 'kubernetes' => { + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'namespace_name' => 'default', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', + 'labels' => { + 'component' => 'fabric8Console' } + } } assert_equal(expected_kube_metadata, filtered[0]) end @@ -871,9 +876,9 @@ def emit_with_tag(tag, msg={}, config=' 'CONTAINER_ID_FULL' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459', 'randomfield' => 'randomvalue' } - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, - {name: 'kubernetes_docker_metadata_annotations'}, - {name: 'kubernetes_get_namespace_default'}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, + { name: 'kubernetes_docker_metadata_annotations' }, + { name: 'kubernetes_get_namespace_default' }]) do filtered = emit_with_tag(tag, msg, ' kubernetes_url https://localhost:8443 watch false @@ -896,9 +901,9 @@ def emit_with_tag(tag, msg={}, config=' 'CONTAINER_ID_FULL' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459', 'randomfield' => 'randomvalue' } - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, - {name: 'kubernetes_docker_metadata_annotations'}, - {name: 'kubernetes_get_namespace_default'}]) do + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, + { name: 'kubernetes_docker_metadata_annotations' }, + { name: 'kubernetes_get_namespace_default' }]) do filtered = emit_with_tag(tag, msg, ' kubernetes_url https://localhost:8443 watch false @@ -915,10 +920,10 @@ def emit_with_tag(tag, msg={}, config=' end test 'processes all events when reading from MessagePackEventStream' do - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, - {name: 'kubernetes_get_pod'}, - {name: 'kubernetes_get_namespace_default'}]) do - entries = [[@time, {'time'=>'2015-05-08T09:22:01Z'}], [@time, {'time'=>'2015-05-08T09:22:01Z'}]] + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, + { name: 'kubernetes_get_pod' }, + { name: 'kubernetes_get_namespace_default' }]) do + entries = [[@time, { 'time' => '2015-05-08T09:22:01Z' }], [@time, { 'time' => '2015-05-08T09:22:01Z' }]] array_stream = Fluent::ArrayEventStream.new(entries) msgpack_stream = Fluent::MessagePackEventStream.new(array_stream.to_msgpack_stream) @@ -927,30 +932,30 @@ def emit_with_tag(tag, msg={}, config=' watch false cache_size 1 ') - d.run { + d.run do d.feed(DEFAULT_TAG, msgpack_stream) - } - filtered = d.filtered.map{|e| e.last} + end + filtered = d.filtered.map(&:last) expected_kube_metadata = { - 'time'=>'2015-05-08T09:22:01Z', - 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' - }, - 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'container_image' => 'fabric8/hawtio-kubernetes:latest', - 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - 'namespace_name' => 'default', - 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', - 'master_url' => 'https://localhost:8443', - 'labels' => { - 'component' => 'fabric8Console' - } + 'time' => '2015-05-08T09:22:01Z', + 'docker' => { + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + }, + 'kubernetes' => { + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'container_image' => 'fabric8/hawtio-kubernetes:latest', + 'container_image_id' => 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', + 'namespace_name' => 'default', + 'namespace_id' => '898268c8-4a36-11e5-9d81-42010af0194c', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d', + 'master_url' => 'https://localhost:8443', + 'labels' => { + 'component' => 'fabric8Console' } + } } assert_equal(expected_kube_metadata, filtered[0]) @@ -959,9 +964,9 @@ def emit_with_tag(tag, msg={}, config=' end test 'with docker & kubernetes metadata using skip config params' do - VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, {name: 'kubernetes_get_pod'}, - {name: 'kubernetes_get_namespace_default'}]) do - filtered = emit({},' + VCR.use_cassettes([{ name: 'valid_kubernetes_api_server' }, { name: 'kubernetes_get_api_v1' }, { name: 'kubernetes_get_pod' }, + { name: 'kubernetes_get_namespace_default' }]) do + filtered = emit({}, ' kubernetes_url https://localhost:8443 watch false cache_size 1 @@ -971,16 +976,16 @@ def emit_with_tag(tag, msg={}, config=' skip_namespace_metadata true ') expected_kube_metadata = { - 'docker' => { - 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' - }, - 'kubernetes' => { - 'host' => 'jimmi-redhat.localnet', - 'pod_name' => 'fabric8-console-controller-98rqc', - 'container_name' => 'fabric8-console-container', - 'namespace_name' => 'default', - 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d' - } + 'docker' => { + 'container_id' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + }, + 'kubernetes' => { + 'host' => 'jimmi-redhat.localnet', + 'pod_name' => 'fabric8-console-controller-98rqc', + 'container_name' => 'fabric8-console-container', + 'namespace_name' => 'default', + 'pod_id' => 'c76927af-f563-11e4-b32d-54ee7527188d' + } } assert_equal(expected_kube_metadata, filtered[0]) diff --git a/test/plugin/test_watch_namespaces.rb b/test/plugin/test_watch_namespaces.rb index a71b79d..e7fd406 100644 --- a/test/plugin/test_watch_namespaces.rb +++ b/test/plugin/test_watch_namespaces.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata @@ -20,225 +22,224 @@ require_relative 'watch_test' class WatchNamespacesTestTest < WatchTest + include KubernetesMetadata::WatchNamespaces + + setup do + @initial = { + kind: 'NamespaceList', + metadata: { resourceVersion: '123' }, + items: [ + { + metadata: { + name: 'initial', + uid: 'initial_uid' + } + }, + { + metadata: { + name: 'modified', + uid: 'modified_uid' + } + } + ] + } + + @created = { + type: 'CREATED', + object: { + metadata: { + name: 'created', + uid: 'created_uid' + } + } + } + @modified = { + type: 'MODIFIED', + object: { + metadata: { + name: 'foo', + uid: 'modified_uid' + } + } + } + @deleted = { + type: 'DELETED', + object: { + metadata: { + name: 'deleteme', + uid: 'deleted_uid' + } + } + } + @error = { + type: 'ERROR', + object: { + message: 'some error message' + } + } + @gone = { + type: 'ERROR', + object: { + code: 410, + kind: 'Status', + message: 'too old resource version: 123 (391079)', + metadata: { + name: 'gone', + namespace: 'gone', + uid: 'gone_uid' + }, + reason: 'Gone' + } + } + end + + test 'namespace list caches namespaces' do + @client.stub :get_namespaces, @initial do + process_namespace_watcher_notices(start_namespace_watch) + assert_equal(true, @namespace_cache.key?('initial_uid')) + assert_equal(true, @namespace_cache.key?('modified_uid')) + assert_equal(2, @stats[:namespace_cache_host_updates]) + end + end - include KubernetesMetadata::WatchNamespaces - - setup do - @initial = { - kind: 'NamespaceList', - metadata: {resourceVersion: '123'}, - items: [ - { - metadata: { - name: 'initial', - uid: 'initial_uid' - } - }, - { - metadata: { - name: 'modified', - uid: 'modified_uid' - } - } - ] - } - - @created = { - type: 'CREATED', - object: { - metadata: { - name: 'created', - uid: 'created_uid' - } - } - } - @modified = { - type: 'MODIFIED', - object: { - metadata: { - name: 'foo', - uid: 'modified_uid' - } - } - } - @deleted = { - type: 'DELETED', - object: { - metadata: { - name: 'deleteme', - uid: 'deleted_uid' - } - } - } - @error = { - type: 'ERROR', - object: { - message: 'some error message' - } - } - @gone = { - type: 'ERROR', - object: { - code: 410, - kind: 'Status', - message: 'too old resource version: 123 (391079)', - metadata: { - name: 'gone', - namespace: 'gone', - uid: 'gone_uid' - }, - reason: 'Gone' - } - } - end - - test 'namespace list caches namespaces' do - @client.stub :get_namespaces, @initial do + test 'namespace list caches namespaces and watch updates' do + orig_env_val = ENV['K8S_NODE_NAME'] + ENV['K8S_NODE_NAME'] = 'aNodeName' + @client.stub :get_namespaces, @initial do + @client.stub :watch_namespaces, [@modified] do process_namespace_watcher_notices(start_namespace_watch) - assert_equal(true, @namespace_cache.key?('initial_uid')) - assert_equal(true, @namespace_cache.key?('modified_uid')) assert_equal(2, @stats[:namespace_cache_host_updates]) + assert_equal(1, @stats[:namespace_cache_watch_updates]) end end - - test 'namespace list caches namespaces and watch updates' do - orig_env_val = ENV['K8S_NODE_NAME'] - ENV['K8S_NODE_NAME'] = 'aNodeName' - @client.stub :get_namespaces, @initial do - @client.stub :watch_namespaces, [@modified] do - process_namespace_watcher_notices(start_namespace_watch) - assert_equal(2, @stats[:namespace_cache_host_updates]) - assert_equal(1, @stats[:namespace_cache_watch_updates]) - end - end - ENV['K8S_NODE_NAME'] = orig_env_val - end - - test 'namespace watch ignores CREATED' do - @client.stub :watch_namespaces, [@created] do - process_namespace_watcher_notices(start_namespace_watch) - assert_equal(false, @namespace_cache.key?('created_uid')) - assert_equal(1, @stats[:namespace_cache_watch_ignored]) - end + ENV['K8S_NODE_NAME'] = orig_env_val + end + + test 'namespace watch ignores CREATED' do + @client.stub :watch_namespaces, [@created] do + process_namespace_watcher_notices(start_namespace_watch) + assert_equal(false, @namespace_cache.key?('created_uid')) + assert_equal(1, @stats[:namespace_cache_watch_ignored]) end + end - test 'namespace watch ignores MODIFIED when info not in cache' do - @client.stub :watch_namespaces, [@modified] do - process_namespace_watcher_notices(start_namespace_watch) - assert_equal(false, @namespace_cache.key?('modified_uid')) - assert_equal(1, @stats[:namespace_cache_watch_misses]) - end + test 'namespace watch ignores MODIFIED when info not in cache' do + @client.stub :watch_namespaces, [@modified] do + process_namespace_watcher_notices(start_namespace_watch) + assert_equal(false, @namespace_cache.key?('modified_uid')) + assert_equal(1, @stats[:namespace_cache_watch_misses]) end - - test 'namespace watch updates cache when MODIFIED is received and info is cached' do - @namespace_cache['modified_uid'] = {} - @client.stub :watch_namespaces, [@modified] do - process_namespace_watcher_notices(start_namespace_watch) - assert_equal(true, @namespace_cache.key?('modified_uid')) - assert_equal(1, @stats[:namespace_cache_watch_updates]) - end + end + + test 'namespace watch updates cache when MODIFIED is received and info is cached' do + @namespace_cache['modified_uid'] = {} + @client.stub :watch_namespaces, [@modified] do + process_namespace_watcher_notices(start_namespace_watch) + assert_equal(true, @namespace_cache.key?('modified_uid')) + assert_equal(1, @stats[:namespace_cache_watch_updates]) end - - test 'namespace watch ignores DELETED' do - @namespace_cache['deleted_uid'] = {} - @client.stub :watch_namespaces, [@deleted] do - process_namespace_watcher_notices(start_namespace_watch) - assert_equal(true, @namespace_cache.key?('deleted_uid')) - assert_equal(1, @stats[:namespace_cache_watch_deletes_ignored]) - end + end + + test 'namespace watch ignores DELETED' do + @namespace_cache['deleted_uid'] = {} + @client.stub :watch_namespaces, [@deleted] do + process_namespace_watcher_notices(start_namespace_watch) + assert_equal(true, @namespace_cache.key?('deleted_uid')) + assert_equal(1, @stats[:namespace_cache_watch_deletes_ignored]) end - - test 'namespace watch raises Fluent::UnrecoverableError when cannot re-establish connection to k8s API server' do - # Stub start_namespace_watch to simulate initial successful connection to API server - stub(self).start_namespace_watch - # Stub watch_namespaces to simluate not being able to set up watch connection to API server - stub(@client).watch_namespaces { raise } - @client.stub :get_namespaces, @initial do - assert_raise Fluent::UnrecoverableError do - set_up_namespace_thread - end + end + + test 'namespace watch raises Fluent::UnrecoverableError when cannot re-establish connection to k8s API server' do + # Stub start_namespace_watch to simulate initial successful connection to API server + stub(self).start_namespace_watch + # Stub watch_namespaces to simluate not being able to set up watch connection to API server + stub(@client).watch_namespaces { raise } + @client.stub :get_namespaces, @initial do + assert_raise Fluent::UnrecoverableError do + set_up_namespace_thread end - assert_equal(3, @stats[:namespace_watch_failures]) - assert_equal(2, Thread.current[:namespace_watch_retry_count]) - assert_equal(4, Thread.current[:namespace_watch_retry_backoff_interval]) - assert_nil(@stats[:namespace_watch_error_type_notices]) end - - test 'namespace watch resets watch retry count when exceptions are encountered and connection to k8s API server is re-established' do - @client.stub :get_namespaces, @initial do - @client.stub :watch_namespaces, [[@created, @exception_raised]] do - # Force the infinite watch loop to exit after 3 seconds. Verifies that - # no unrecoverable error was thrown during this period of time. - assert_raise Timeout::Error.new('execution expired') do - Timeout.timeout(3) do - set_up_namespace_thread - end + assert_equal(3, @stats[:namespace_watch_failures]) + assert_equal(2, Thread.current[:namespace_watch_retry_count]) + assert_equal(4, Thread.current[:namespace_watch_retry_backoff_interval]) + assert_nil(@stats[:namespace_watch_error_type_notices]) + end + + test 'namespace watch resets watch retry count when exceptions are encountered and connection to k8s API server is re-established' do + @client.stub :get_namespaces, @initial do + @client.stub :watch_namespaces, [[@created, @exception_raised]] do + # Force the infinite watch loop to exit after 3 seconds. Verifies that + # no unrecoverable error was thrown during this period of time. + assert_raise Timeout::Error.new('execution expired') do + Timeout.timeout(3) do + set_up_namespace_thread end - assert_operator(@stats[:namespace_watch_failures], :>=, 3) - assert_operator(Thread.current[:namespace_watch_retry_count], :<=, 1) - assert_operator(Thread.current[:namespace_watch_retry_backoff_interval], :<=, 1) end + assert_operator(@stats[:namespace_watch_failures], :>=, 3) + assert_operator(Thread.current[:namespace_watch_retry_count], :<=, 1) + assert_operator(Thread.current[:namespace_watch_retry_backoff_interval], :<=, 1) end end - - test 'namespace watch resets watch retry count when error is received and connection to k8s API server is re-established' do - @client.stub :get_namespaces, @initial do - @client.stub :watch_namespaces, [@error] do - # Force the infinite watch loop to exit after 3 seconds. Verifies that - # no unrecoverable error was thrown during this period of time. - assert_raise Timeout::Error.new('execution expired') do - Timeout.timeout(3) do - set_up_namespace_thread - end + end + + test 'namespace watch resets watch retry count when error is received and connection to k8s API server is re-established' do + @client.stub :get_namespaces, @initial do + @client.stub :watch_namespaces, [@error] do + # Force the infinite watch loop to exit after 3 seconds. Verifies that + # no unrecoverable error was thrown during this period of time. + assert_raise Timeout::Error.new('execution expired') do + Timeout.timeout(3) do + set_up_namespace_thread end - assert_operator(@stats[:namespace_watch_failures], :>=, 3) - assert_operator(Thread.current[:namespace_watch_retry_count], :<=, 1) - assert_operator(Thread.current[:namespace_watch_retry_backoff_interval], :<=, 1) end + assert_operator(@stats[:namespace_watch_failures], :>=, 3) + assert_operator(Thread.current[:namespace_watch_retry_count], :<=, 1) + assert_operator(Thread.current[:namespace_watch_retry_backoff_interval], :<=, 1) end end - - test 'namespace watch continues after retries succeed' do - @client.stub :get_namespaces, @initial do - @client.stub :watch_namespaces, [@modified, @error, @modified] do - # Force the infinite watch loop to exit after 3 seconds. Verifies that - # no unrecoverable error was thrown during this period of time. - assert_raise Timeout::Error.new('execution expired') do - Timeout.timeout(3) do - set_up_namespace_thread - end + end + + test 'namespace watch continues after retries succeed' do + @client.stub :get_namespaces, @initial do + @client.stub :watch_namespaces, [@modified, @error, @modified] do + # Force the infinite watch loop to exit after 3 seconds. Verifies that + # no unrecoverable error was thrown during this period of time. + assert_raise Timeout::Error.new('execution expired') do + Timeout.timeout(3) do + set_up_namespace_thread end - assert_operator(@stats[:namespace_watch_failures], :>=, 3) - assert_operator(Thread.current[:namespace_watch_retry_count], :<=, 1) - assert_operator(Thread.current[:namespace_watch_retry_backoff_interval], :<=, 1) - assert_operator(@stats[:namespace_watch_error_type_notices], :>=, 3) end + assert_operator(@stats[:namespace_watch_failures], :>=, 3) + assert_operator(Thread.current[:namespace_watch_retry_count], :<=, 1) + assert_operator(Thread.current[:namespace_watch_retry_backoff_interval], :<=, 1) + assert_operator(@stats[:namespace_watch_error_type_notices], :>=, 3) end end + end - test 'namespace watch raises a GoneError when a 410 Gone error is received' do - @cache['gone_uid'] = {} - @client.stub :watch_namespaces, [@gone] do - assert_raise KubernetesMetadata::Common::GoneError do - process_namespace_watcher_notices(start_namespace_watch) - end - assert_equal(1, @stats[:namespace_watch_gone_notices]) + test 'namespace watch raises a GoneError when a 410 Gone error is received' do + @cache['gone_uid'] = {} + @client.stub :watch_namespaces, [@gone] do + assert_raise KubernetesMetadata::Common::GoneError do + process_namespace_watcher_notices(start_namespace_watch) end + assert_equal(1, @stats[:namespace_watch_gone_notices]) end - - test 'namespace watch retries when 410 Gone errors are encountered' do - @client.stub :get_namespaces, @initial do - @client.stub :watch_namespaces, [@created, @gone, @modified] do - # Force the infinite watch loop to exit after 3 seconds. Verifies that - # no unrecoverable error was thrown during this period of time. - assert_raise Timeout::Error.new('execution expired') do - Timeout.timeout(3) do - set_up_namespace_thread - end + end + + test 'namespace watch retries when 410 Gone errors are encountered' do + @client.stub :get_namespaces, @initial do + @client.stub :watch_namespaces, [@created, @gone, @modified] do + # Force the infinite watch loop to exit after 3 seconds. Verifies that + # no unrecoverable error was thrown during this period of time. + assert_raise Timeout::Error.new('execution expired') do + Timeout.timeout(3) do + set_up_namespace_thread end - assert_operator(@stats[:namespace_watch_gone_errors], :>=, 3) - assert_operator(@stats[:namespace_watch_gone_notices], :>=, 3) end + assert_operator(@stats[:namespace_watch_gone_errors], :>=, 3) + assert_operator(@stats[:namespace_watch_gone_notices], :>=, 3) end end + end end diff --git a/test/plugin/test_watch_pods.rb b/test/plugin/test_watch_pods.rb index c24203f..ba070df 100644 --- a/test/plugin/test_watch_pods.rb +++ b/test/plugin/test_watch_pods.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata @@ -20,314 +22,313 @@ require_relative 'watch_test' class DefaultPodWatchStrategyTest < WatchTest + include KubernetesMetadata::WatchPods - include KubernetesMetadata::WatchPods + setup do + @initial = { + kind: 'PodList', + metadata: { resourceVersion: '123' }, + items: [ + { + metadata: { + name: 'initial', + namespace: 'initial_ns', + uid: 'initial_uid', + labels: {} + }, + spec: { + nodeName: 'aNodeName', + containers: [{ + name: 'foo', + image: 'bar' + }, { + name: 'bar', + image: 'foo' + }] + } + }, + { + metadata: { + name: 'modified', + namespace: 'create', + uid: 'modified_uid', + labels: {} + }, + spec: { + nodeName: 'aNodeName', + containers: [{ + name: 'foo', + image: 'bar' + }, { + name: 'bar', + image: 'foo' + }] + } + } + ] + } + @created = { + type: 'CREATED', + object: { + metadata: { + name: 'created', + namespace: 'create', + uid: 'created_uid', + resourceVersion: '122', + labels: {} + }, + spec: { + nodeName: 'aNodeName', + containers: [{ + name: 'foo', + image: 'bar' + }, { + name: 'bar', + image: 'foo' + }] + } + } + } + @modified = { + type: 'MODIFIED', + object: { + metadata: { + name: 'foo', + namespace: 'modified', + uid: 'modified_uid', + resourceVersion: '123', + labels: {} + }, + spec: { + nodeName: 'aNodeName', + containers: [{ + name: 'foo', + image: 'bar' + }, { + name: 'bar', + image: 'foo' + }] + }, + status: { + containerStatuses: [ + { + name: 'fabric8-console-container', + state: { + running: { + startedAt: '2015-05-08T09:22:44Z' + } + }, + lastState: {}, + ready: true, + restartCount: 0, + image: 'fabric8/hawtio-kubernetes:latest', + imageID: 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', + containerID: 'docker://49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' + } + ] + } + } + } + @deleted = { + type: 'DELETED', + object: { + metadata: { + name: 'deleteme', + namespace: 'deleted', + uid: 'deleted_uid', + resourceVersion: '124' + } + } + } + @error = { + type: 'ERROR', + object: { + message: 'some error message' + } + } + @gone = { + type: 'ERROR', + object: { + code: 410, + kind: 'Status', + message: 'too old resource version: 123 (391079)', + metadata: { + name: 'gone', + namespace: 'gone', + uid: 'gone_uid' + }, + reason: 'Gone' + } + } + end - setup do - @initial = { - kind: 'PodList', - metadata: {resourceVersion: '123'}, - items: [ - { - metadata: { - name: 'initial', - namespace: 'initial_ns', - uid: 'initial_uid', - labels: {}, - }, - spec: { - nodeName: 'aNodeName', - containers: [{ - name: 'foo', - image: 'bar', - }, { - name: 'bar', - image: 'foo', - }] - } - }, - { - metadata: { - name: 'modified', - namespace: 'create', - uid: 'modified_uid', - labels: {}, - }, - spec: { - nodeName: 'aNodeName', - containers: [{ - name: 'foo', - image: 'bar', - }, { - name: 'bar', - image: 'foo', - }] - } - } - ] - } - @created = { - type: 'CREATED', - object: { - metadata: { - name: 'created', - namespace: 'create', - uid: 'created_uid', - resourceVersion: '122', - labels: {}, - }, - spec: { - nodeName: 'aNodeName', - containers: [{ - name: 'foo', - image: 'bar', - }, { - name: 'bar', - image: 'foo', - }] - } - } - } - @modified = { - type: 'MODIFIED', - object: { - metadata: { - name: 'foo', - namespace: 'modified', - uid: 'modified_uid', - resourceVersion: '123', - labels: {}, - }, - spec: { - nodeName: 'aNodeName', - containers: [{ - name: 'foo', - image: 'bar', - }, { - name: 'bar', - image: 'foo', - }] - }, - status: { - containerStatuses: [ - { - name: 'fabric8-console-container', - state: { - running: { - startedAt: '2015-05-08T09:22:44Z' - } - }, - lastState: {}, - ready: true, - restartCount: 0, - image: 'fabric8/hawtio-kubernetes:latest', - imageID: 'docker://b2bd1a24a68356b2f30128e6e28e672c1ef92df0d9ec01ec0c7faea5d77d2303', - containerID: 'docker://49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459' - } - ] - } - } - } - @deleted = { - type: 'DELETED', - object: { - metadata: { - name: 'deleteme', - namespace: 'deleted', - uid: 'deleted_uid', - resourceVersion: '124' - } - } - } - @error = { - type: 'ERROR', - object: { - message: 'some error message' - } - } - @gone = { - type: 'ERROR', - object: { - code: 410, - kind: 'Status', - message: 'too old resource version: 123 (391079)', - metadata: { - name: 'gone', - namespace: 'gone', - uid: 'gone_uid' - }, - reason: 'Gone' - } - } - end + test 'pod list caches pods' do + orig_env_val = ENV['K8S_NODE_NAME'] + ENV['K8S_NODE_NAME'] = 'aNodeName' + @client.stub :get_pods, @initial do + process_pod_watcher_notices(start_pod_watch) + assert_equal(true, @cache.key?('initial_uid')) + assert_equal(true, @cache.key?('modified_uid')) + assert_equal(2, @stats[:pod_cache_host_updates]) + end + ENV['K8S_NODE_NAME'] = orig_env_val + end - test 'pod list caches pods' do - orig_env_val = ENV['K8S_NODE_NAME'] - ENV['K8S_NODE_NAME'] = 'aNodeName' - @client.stub :get_pods, @initial do + test 'pod list caches pods and watch updates' do + orig_env_val = ENV['K8S_NODE_NAME'] + ENV['K8S_NODE_NAME'] = 'aNodeName' + @client.stub :get_pods, @initial do + @client.stub :watch_pods, [@modified] do process_pod_watcher_notices(start_pod_watch) - assert_equal(true, @cache.key?('initial_uid')) - assert_equal(true, @cache.key?('modified_uid')) assert_equal(2, @stats[:pod_cache_host_updates]) + assert_equal(1, @stats[:pod_cache_watch_updates]) end - ENV['K8S_NODE_NAME'] = orig_env_val - end - - test 'pod list caches pods and watch updates' do - orig_env_val = ENV['K8S_NODE_NAME'] - ENV['K8S_NODE_NAME'] = 'aNodeName' - @client.stub :get_pods, @initial do - @client.stub :watch_pods, [@modified] do - process_pod_watcher_notices(start_pod_watch) - assert_equal(2, @stats[:pod_cache_host_updates]) - assert_equal(1, @stats[:pod_cache_watch_updates]) - end - end - ENV['K8S_NODE_NAME'] = orig_env_val - assert_equal('123', @last_seen_resource_version) # from @modified end + ENV['K8S_NODE_NAME'] = orig_env_val + assert_equal('123', @last_seen_resource_version) # from @modified + end - test 'pod watch notice ignores CREATED' do - @client.stub :get_pods, @initial do - @client.stub :watch_pods, [@created] do - process_pod_watcher_notices(start_pod_watch) - assert_equal(false, @cache.key?('created_uid')) - assert_equal(1, @stats[:pod_cache_watch_ignored]) - end + test 'pod watch notice ignores CREATED' do + @client.stub :get_pods, @initial do + @client.stub :watch_pods, [@created] do + process_pod_watcher_notices(start_pod_watch) + assert_equal(false, @cache.key?('created_uid')) + assert_equal(1, @stats[:pod_cache_watch_ignored]) end end + end - test 'pod watch notice is ignored when info not cached and MODIFIED is received' do - @client.stub :watch_pods, [@modified] do - process_pod_watcher_notices(start_pod_watch) - assert_equal(false, @cache.key?('modified_uid')) - assert_equal(1, @stats[:pod_cache_watch_misses]) - end + test 'pod watch notice is ignored when info not cached and MODIFIED is received' do + @client.stub :watch_pods, [@modified] do + process_pod_watcher_notices(start_pod_watch) + assert_equal(false, @cache.key?('modified_uid')) + assert_equal(1, @stats[:pod_cache_watch_misses]) end + end - test 'pod MODIFIED cached when hostname matches' do - orig_env_val = ENV['K8S_NODE_NAME'] - ENV['K8S_NODE_NAME'] = 'aNodeName' - @client.stub :watch_pods, [@modified] do - process_pod_watcher_notices(start_pod_watch) - assert_equal(true, @cache.key?('modified_uid')) - assert_equal(1, @stats[:pod_cache_host_updates]) - end - ENV['K8S_NODE_NAME'] = orig_env_val + test 'pod MODIFIED cached when hostname matches' do + orig_env_val = ENV['K8S_NODE_NAME'] + ENV['K8S_NODE_NAME'] = 'aNodeName' + @client.stub :watch_pods, [@modified] do + process_pod_watcher_notices(start_pod_watch) + assert_equal(true, @cache.key?('modified_uid')) + assert_equal(1, @stats[:pod_cache_host_updates]) end + ENV['K8S_NODE_NAME'] = orig_env_val + end - test 'pod watch notice is updated when MODIFIED is received' do - @cache['modified_uid'] = {} - @client.stub :watch_pods, [@modified] do - process_pod_watcher_notices(start_pod_watch) - assert_equal(true, @cache.key?('modified_uid')) - assert_equal(1, @stats[:pod_cache_watch_updates]) - end + test 'pod watch notice is updated when MODIFIED is received' do + @cache['modified_uid'] = {} + @client.stub :watch_pods, [@modified] do + process_pod_watcher_notices(start_pod_watch) + assert_equal(true, @cache.key?('modified_uid')) + assert_equal(1, @stats[:pod_cache_watch_updates]) end + end - test 'pod watch notice is ignored when delete is received' do - @cache['deleted_uid'] = {} - @client.stub :watch_pods, [@deleted] do - process_pod_watcher_notices(start_pod_watch) - assert_equal(true, @cache.key?('deleted_uid')) - assert_equal(1, @stats[:pod_cache_watch_delete_ignored]) - end + test 'pod watch notice is ignored when delete is received' do + @cache['deleted_uid'] = {} + @client.stub :watch_pods, [@deleted] do + process_pod_watcher_notices(start_pod_watch) + assert_equal(true, @cache.key?('deleted_uid')) + assert_equal(1, @stats[:pod_cache_watch_delete_ignored]) end + end - test 'pod watch raises Fluent::UnrecoverableError when cannot re-establish connection to k8s API server' do - # Stub start_pod_watch to simulate initial successful connection to API server - stub(self).start_pod_watch - # Stub watch_pods to simluate not being able to set up watch connection to API server - stub(@client).watch_pods { raise } - @client.stub :get_pods, @initial do - assert_raise Fluent::UnrecoverableError do - set_up_pod_thread - end + test 'pod watch raises Fluent::UnrecoverableError when cannot re-establish connection to k8s API server' do + # Stub start_pod_watch to simulate initial successful connection to API server + stub(self).start_pod_watch + # Stub watch_pods to simluate not being able to set up watch connection to API server + stub(@client).watch_pods { raise } + @client.stub :get_pods, @initial do + assert_raise Fluent::UnrecoverableError do + set_up_pod_thread end - assert_equal(3, @stats[:pod_watch_failures]) - assert_equal(2, Thread.current[:pod_watch_retry_count]) - assert_equal(4, Thread.current[:pod_watch_retry_backoff_interval]) - assert_nil(@stats[:pod_watch_error_type_notices]) end + assert_equal(3, @stats[:pod_watch_failures]) + assert_equal(2, Thread.current[:pod_watch_retry_count]) + assert_equal(4, Thread.current[:pod_watch_retry_backoff_interval]) + assert_nil(@stats[:pod_watch_error_type_notices]) + end - test 'pod watch resets watch retry count when exceptions are encountered and connection to k8s API server is re-established' do - @client.stub :get_pods, @initial do - @client.stub :watch_pods, [[@created, @exception_raised]] do - # Force the infinite watch loop to exit after 3 seconds. Verifies that - # no unrecoverable error was thrown during this period of time. - assert_raise Timeout::Error.new('execution expired') do - Timeout.timeout(3) do - set_up_pod_thread - end - end - assert_operator(@stats[:pod_watch_failures], :>=, 3) - assert_operator(Thread.current[:pod_watch_retry_count], :<=, 1) - assert_operator(Thread.current[:pod_watch_retry_backoff_interval], :<=, 1) + test 'pod watch resets watch retry count when exceptions are encountered and connection to k8s API server is re-established' do + @client.stub :get_pods, @initial do + @client.stub :watch_pods, [[@created, @exception_raised]] do + # Force the infinite watch loop to exit after 3 seconds. Verifies that + # no unrecoverable error was thrown during this period of time. + assert_raise Timeout::Error.new('execution expired') do + Timeout.timeout(3) do + set_up_pod_thread + end end + assert_operator(@stats[:pod_watch_failures], :>=, 3) + assert_operator(Thread.current[:pod_watch_retry_count], :<=, 1) + assert_operator(Thread.current[:pod_watch_retry_backoff_interval], :<=, 1) end end + end - test 'pod watch resets watch retry count when error is received and connection to k8s API server is re-established' do - @client.stub :get_pods, @initial do - @client.stub :watch_pods, [@error] do - # Force the infinite watch loop to exit after 3 seconds. Verifies that - # no unrecoverable error was thrown during this period of time. - assert_raise Timeout::Error.new('execution expired') do - Timeout.timeout(3) do - set_up_pod_thread - end + test 'pod watch resets watch retry count when error is received and connection to k8s API server is re-established' do + @client.stub :get_pods, @initial do + @client.stub :watch_pods, [@error] do + # Force the infinite watch loop to exit after 3 seconds. Verifies that + # no unrecoverable error was thrown during this period of time. + assert_raise Timeout::Error.new('execution expired') do + Timeout.timeout(3) do + set_up_pod_thread end - assert_operator(@stats[:pod_watch_failures], :>=, 3) - assert_operator(Thread.current[:pod_watch_retry_count], :<=, 1) - assert_operator(Thread.current[:pod_watch_retry_backoff_interval], :<=, 1) - assert_operator(@stats[:pod_watch_error_type_notices], :>=, 3) end + assert_operator(@stats[:pod_watch_failures], :>=, 3) + assert_operator(Thread.current[:pod_watch_retry_count], :<=, 1) + assert_operator(Thread.current[:pod_watch_retry_backoff_interval], :<=, 1) + assert_operator(@stats[:pod_watch_error_type_notices], :>=, 3) end end + end - test 'pod watch continues after retries succeed' do - @client.stub :get_pods, @initial do - @client.stub :watch_pods, [@modified, @error, @modified] do - # Force the infinite watch loop to exit after 3 seconds. Verifies that - # no unrecoverable error was thrown during this period of time. - assert_raise Timeout::Error.new('execution expired') do - Timeout.timeout(3) do - set_up_pod_thread - end + test 'pod watch continues after retries succeed' do + @client.stub :get_pods, @initial do + @client.stub :watch_pods, [@modified, @error, @modified] do + # Force the infinite watch loop to exit after 3 seconds. Verifies that + # no unrecoverable error was thrown during this period of time. + assert_raise Timeout::Error.new('execution expired') do + Timeout.timeout(3) do + set_up_pod_thread end - assert_operator(@stats[:pod_watch_failures], :>=, 3) - assert_operator(Thread.current[:pod_watch_retry_count], :<=, 1) - assert_operator(Thread.current[:pod_watch_retry_backoff_interval], :<=, 1) - assert_operator(@stats[:pod_watch_error_type_notices], :>=, 3) end + assert_operator(@stats[:pod_watch_failures], :>=, 3) + assert_operator(Thread.current[:pod_watch_retry_count], :<=, 1) + assert_operator(Thread.current[:pod_watch_retry_backoff_interval], :<=, 1) + assert_operator(@stats[:pod_watch_error_type_notices], :>=, 3) end end + end - test 'pod watch raises a GoneError when a 410 Gone error is received' do - @cache['gone_uid'] = {} - @client.stub :watch_pods, [@gone] do - @last_seen_resource_version = '100' - assert_raise KubernetesMetadata::Common::GoneError do - process_pod_watcher_notices(start_pod_watch) - end - assert_equal(1, @stats[:pod_watch_gone_notices]) - assert_nil @last_seen_resource_version # forced restart + test 'pod watch raises a GoneError when a 410 Gone error is received' do + @cache['gone_uid'] = {} + @client.stub :watch_pods, [@gone] do + @last_seen_resource_version = '100' + assert_raise KubernetesMetadata::Common::GoneError do + process_pod_watcher_notices(start_pod_watch) end + assert_equal(1, @stats[:pod_watch_gone_notices]) + assert_nil @last_seen_resource_version # forced restart end + end - test 'pod watch retries when 410 Gone errors are encountered' do - @client.stub :get_pods, @initial do - @client.stub :watch_pods, [@created, @gone, @modified] do - # Force the infinite watch loop to exit after 3 seconds because the code sleeps 3 times. - # Verifies that no unrecoverable error was thrown during this period of time. - assert_raise Timeout::Error.new('execution expired') do - Timeout.timeout(3) do - set_up_pod_thread - end + test 'pod watch retries when 410 Gone errors are encountered' do + @client.stub :get_pods, @initial do + @client.stub :watch_pods, [@created, @gone, @modified] do + # Force the infinite watch loop to exit after 3 seconds because the code sleeps 3 times. + # Verifies that no unrecoverable error was thrown during this period of time. + assert_raise Timeout::Error.new('execution expired') do + Timeout.timeout(3) do + set_up_pod_thread end - assert_operator(@stats[:pod_watch_gone_errors], :>=, 3) - assert_operator(@stats[:pod_watch_gone_notices], :>=, 3) end + assert_operator(@stats[:pod_watch_gone_errors], :>=, 3) + assert_operator(@stats[:pod_watch_gone_notices], :>=, 3) end end + end end diff --git a/test/plugin/watch_test.rb b/test/plugin/watch_test.rb index 2a50a12..ee3b97c 100644 --- a/test/plugin/watch_test.rb +++ b/test/plugin/watch_test.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # # Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with # Kubernetes metadata @@ -19,7 +21,6 @@ require_relative '../helper' class WatchTest < Test::Unit::TestCase - def thread_current_running? true end @@ -36,17 +37,20 @@ def thread_current_running? Thread.current[:namespace_watch_retry_count] = 0 @client = OpenStruct.new - def @client.watch_pods(options = {}) + def @client.watch_pods(_options = {}) [] end - def @client.watch_namespaces(options = {}) + + def @client.watch_namespaces(_options = {}) [] end - def @client.get_namespaces(options = {}) - {items: [], metadata: {resourceVersion: '12345'}} + + def @client.get_namespaces(_options = {}) + { items: [], metadata: { resourceVersion: '12345' } } end - def @client.get_pods(options = {}) - {items: [], metadata: {resourceVersion: '12345'}} + + def @client.get_pods(_options = {}) + { items: [], metadata: { resourceVersion: '12345' } } end @exception_raised = :blow_up_when_used @@ -59,8 +63,10 @@ def log logger = {} def logger.debug(message) end + def logger.info(message, error) end + def logger.error(message, error) end logger