diff --git a/lib/fluent/plugin/filter_kubernetes_metadata.rb b/lib/fluent/plugin/filter_kubernetes_metadata.rb index e6e29a1..71db3ba 100644 --- a/lib/fluent/plugin/filter_kubernetes_metadata.rb +++ b/lib/fluent/plugin/filter_kubernetes_metadata.rb @@ -118,17 +118,21 @@ def fetch_pod_metadata(namespace_name, pod_name) @stats.bump(:pod_cache_api_updates) log.trace("parsed metadata for #{namespace_name}/#{pod_name}: #{metadata}") @cache[metadata['pod_id']] = metadata - rescue StandardError => e - @stats.bump(:pod_cache_api_nil_error) - log.debug "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}" - if e.message == "Unauthorized" - @client = nil - # recreate client to refresh token - log.info("Re-creating Kubernetes API Client to refresh auth bearer token.") - create_client() + rescue KubeException => e + if e.error_code == 401 + # recreate client to refresh token + log.info("Encountered '401 Unauthorized' exception, recreating client to refresh token") + create_client() + else + log.error "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}" + @stats.bump(:pod_cache_api_nil_error) + end + {} + rescue StandardError => e + @stats.bump(:pod_cache_api_nil_error) + log.error "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}" + {} end - {} - end def dump_stats @curr_time = Time.now @@ -156,16 +160,20 @@ def fetch_namespace_metadata(namespace_name) @stats.bump(:namespace_cache_api_updates) log.trace("parsed metadata for #{namespace_name}: #{metadata}") @namespace_cache[metadata['namespace_id']] = metadata - rescue StandardError => e - @stats.bump(:namespace_cache_api_nil_error) - log.debug "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}" - if e.message == "Unauthorized" - @client = nil - # recreate client to refresh token - log.info("Re-creating Kubernetes API Client to refresh auth bearer token.") - create_client() - end - {} + rescue KubeException => e + if e.error_code == 401 + # recreate client to refresh token + log.info("Encountered '401 Unauthorized' exception, recreating client to refresh token") + create_client() + else + log.error "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}" + @stats.bump(:namespace_cache_api_nil_error) + end + {} + rescue StandardError => e + @stats.bump(:namespace_cache_api_nil_error) + log.error "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}" + {} end def initialize @@ -311,6 +319,7 @@ def configure(conf) def create_client() log.debug 'Creating K8S client' + @client = nil @client = Kubeclient::Client.new( @kubernetes_url, @apiVersion, diff --git a/lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb b/lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb index 3b6a726..9fa6888 100644 --- a/lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb +++ b/lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb @@ -46,14 +46,14 @@ def set_up_namespace_thread @stats.bump(:namespace_watch_gone_errors) log.info('410 Gone encountered. Restarting namespace watch to reset resource versions.', e) namespace_watcher = nil - rescue StandardError => e - if e.message == "Unauthorized" - @client = nil + rescue KubeException => e + if e.error_code == 401 # recreate client to refresh token - log.info("Encountered 'Unauthorized' exception in watch, recreating client to refresh token") + log.info("Encountered '401 Unauthorized' exception in watch, recreating client to refresh token") create_client() namespace_watcher = nil else + # treat all other errors the same as StandardError, log, swallow and reset @stats.bump(:namespace_watch_failures) if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times # Instead of raising exceptions and crashing Fluentd, swallow @@ -79,6 +79,31 @@ def set_up_namespace_thread raise Fluent::UnrecoverableError, message end end + rescue StandardError => e + @stats.bump(:namespace_watch_failures) + if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times + # Instead of raising exceptions and crashing Fluentd, swallow + # the exception and reset the watcher. + log.info( + 'Exception encountered parsing namespace watch event. ' \ + 'The connection might have been closed. Sleeping for ' \ + "#{Thread.current[:namespace_watch_retry_backoff_interval]} " \ + 'seconds and resetting the namespace watcher.', e + ) + sleep(Thread.current[:namespace_watch_retry_backoff_interval]) + Thread.current[:namespace_watch_retry_count] += 1 + Thread.current[:namespace_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base + namespace_watcher = nil + else + # Since retries failed for many times, log as errors instead + # of info and raise exceptions and trigger Fluentd to restart. + message = + 'Exception encountered parsing namespace watch event. The ' \ + 'connection might have been closed. Retried ' \ + "#{@watch_retry_max_times} times yet still failing. Restarting." + log.error(message, e) + raise Fluent::UnrecoverableError, message + end end end diff --git a/lib/fluent/plugin/kubernetes_metadata_watch_pods.rb b/lib/fluent/plugin/kubernetes_metadata_watch_pods.rb index c75b82a..9fe0d92 100644 --- a/lib/fluent/plugin/kubernetes_metadata_watch_pods.rb +++ b/lib/fluent/plugin/kubernetes_metadata_watch_pods.rb @@ -47,14 +47,14 @@ def set_up_pod_thread @stats.bump(:pod_watch_gone_errors) log.info('410 Gone encountered. Restarting pod watch to reset resource versions.', e) pod_watcher = nil - rescue StandardError => e - if e.message == "Unauthorized" - @client = nil + rescue KubeException => e + if e.error_code == 401 # recreate client to refresh token - log.info("Encountered 'Unauthorized' exception in watch, recreating client to refresh token") + log.info("Encountered '401 Unauthorized' exception in watch, recreating client to refresh token") create_client() - pod_watcher = nil + namespace_watcher = nil else + # treat all other errors the same as StandardError, log, swallow and reset @stats.bump(:pod_watch_failures) if Thread.current[:pod_watch_retry_count] < @watch_retry_max_times # Instead of raising exceptions and crashing Fluentd, swallow @@ -80,6 +80,31 @@ def set_up_pod_thread raise Fluent::UnrecoverableError, message end end + rescue StandardError => e + @stats.bump(:pod_watch_failures) + if Thread.current[:pod_watch_retry_count] < @watch_retry_max_times + # Instead of raising exceptions and crashing Fluentd, swallow + # the exception and reset the watcher. + log.info( + 'Exception encountered parsing pod watch event. The ' \ + 'connection might have been closed. Sleeping for ' \ + "#{Thread.current[:pod_watch_retry_backoff_interval]} " \ + 'seconds and resetting the pod watcher.', e + ) + sleep(Thread.current[:pod_watch_retry_backoff_interval]) + Thread.current[:pod_watch_retry_count] += 1 + Thread.current[:pod_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base + pod_watcher = nil + else + # Since retries failed for many times, log as errors instead + # of info and raise exceptions and trigger Fluentd to restart. + message = + 'Exception encountered parsing pod watch event. The ' \ + 'connection might have been closed. Retried ' \ + "#{@watch_retry_max_times} times yet still failing. Restarting." + log.error(message, e) + raise Fluent::UnrecoverableError, message + end end end