Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
  • Loading branch information
PettitWesley committed Jun 22, 2022
1 parent c12217b commit 7f16705
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 29 deletions.
47 changes: 27 additions & 20 deletions lib/fluent/plugin/filter_kubernetes_metadata.rb
Expand Up @@ -118,17 +118,20 @@ def fetch_pod_metadata(namespace_name, pod_name)
@stats.bump(:pod_cache_api_updates)
log.trace("parsed metadata for #{namespace_name}/#{pod_name}: #{metadata}")
@cache[metadata['pod_id']] = metadata
rescue StandardError => e
@stats.bump(:pod_cache_api_nil_error)
log.debug "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
if e.message == "Unauthorized"
@client = nil
# recreate client to refresh token
log.info("Re-creating Kubernetes API Client to refresh auth bearer token.")
create_client()
rescue KubeException => e
if e.error_code == 401
# recreate client to refresh token
log.info("Encountered '401 Unauthorized' exception, recreating client to refresh token")
create_client()
else
log.error "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
@stats.bump(:pod_cache_api_nil_error)
end
rescue StandardError => e
@stats.bump(:pod_cache_api_nil_error)
log.error "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
{}
end
{}
end

def dump_stats
@curr_time = Time.now
Expand Down Expand Up @@ -156,16 +159,19 @@ def fetch_namespace_metadata(namespace_name)
@stats.bump(:namespace_cache_api_updates)
log.trace("parsed metadata for #{namespace_name}: #{metadata}")
@namespace_cache[metadata['namespace_id']] = metadata
rescue StandardError => e
@stats.bump(:namespace_cache_api_nil_error)
log.debug "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
if e.message == "Unauthorized"
@client = nil
# recreate client to refresh token
log.info("Re-creating Kubernetes API Client to refresh auth bearer token.")
create_client()
end
{}
rescue KubeException => e
if e.error_code == 401
# recreate client to refresh token
log.info("Encountered '401 Unauthorized' exception, recreating client to refresh token")
create_client()
else
log.error "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
@stats.bump(:namespace_cache_api_nil_error)
end
rescue StandardError => e
@stats.bump(:namespace_cache_api_nil_error)
log.error "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
{}
end

def initialize
Expand Down Expand Up @@ -311,6 +317,7 @@ def configure(conf)

def create_client()
log.debug 'Creating K8S client'
@client = nil
@client = Kubeclient::Client.new(
@kubernetes_url,
@apiVersion,
Expand Down
33 changes: 29 additions & 4 deletions lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb
Expand Up @@ -46,14 +46,14 @@ def set_up_namespace_thread
@stats.bump(:namespace_watch_gone_errors)
log.info('410 Gone encountered. Restarting namespace watch to reset resource versions.', e)
namespace_watcher = nil
rescue StandardError => e
if e.message == "Unauthorized"
@client = nil
rescue KubeException => e
if e.error_code == 401
# recreate client to refresh token
log.info("Encountered 'Unauthorized' exception in watch, recreating client to refresh token")
log.info("Encountered '401 Unauthorized' exception in watch, recreating client to refresh token")
create_client()
namespace_watcher = nil
else
# treat all other errors the same as StandardError, log, swallow and reset
@stats.bump(:namespace_watch_failures)
if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times
# Instead of raising exceptions and crashing Fluentd, swallow
Expand All @@ -79,6 +79,31 @@ def set_up_namespace_thread
raise Fluent::UnrecoverableError, message
end
end
rescue StandardError => e
@stats.bump(:namespace_watch_failures)
if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times
# Instead of raising exceptions and crashing Fluentd, swallow
# the exception and reset the watcher.
log.info(
'Exception encountered parsing namespace watch event. ' \
'The connection might have been closed. Sleeping for ' \
"#{Thread.current[:namespace_watch_retry_backoff_interval]} " \
'seconds and resetting the namespace watcher.', e
)
sleep(Thread.current[:namespace_watch_retry_backoff_interval])
Thread.current[:namespace_watch_retry_count] += 1
Thread.current[:namespace_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base
namespace_watcher = nil
else
# Since retries failed for many times, log as errors instead
# of info and raise exceptions and trigger Fluentd to restart.
message =
'Exception encountered parsing namespace watch event. The ' \
'connection might have been closed. Retried ' \
"#{@watch_retry_max_times} times yet still failing. Restarting."
log.error(message, e)
raise Fluent::UnrecoverableError, message
end
end
end

Expand Down
35 changes: 30 additions & 5 deletions lib/fluent/plugin/kubernetes_metadata_watch_pods.rb
Expand Up @@ -47,14 +47,14 @@ def set_up_pod_thread
@stats.bump(:pod_watch_gone_errors)
log.info('410 Gone encountered. Restarting pod watch to reset resource versions.', e)
pod_watcher = nil
rescue StandardError => e
if e.message == "Unauthorized"
@client = nil
rescue KubeException => e
if e.error_code == 401
# recreate client to refresh token
log.info("Encountered 'Unauthorized' exception in watch, recreating client to refresh token")
log.info("Encountered '401 Unauthorized' exception in watch, recreating client to refresh token")
create_client()
pod_watcher = nil
namespace_watcher = nil
else
# treat all other errors the same as StandardError, log, swallow and reset
@stats.bump(:pod_watch_failures)
if Thread.current[:pod_watch_retry_count] < @watch_retry_max_times
# Instead of raising exceptions and crashing Fluentd, swallow
Expand All @@ -80,6 +80,31 @@ def set_up_pod_thread
raise Fluent::UnrecoverableError, message
end
end
rescue StandardError => e
@stats.bump(:pod_watch_failures)
if Thread.current[:pod_watch_retry_count] < @watch_retry_max_times
# Instead of raising exceptions and crashing Fluentd, swallow
# the exception and reset the watcher.
log.info(
'Exception encountered parsing pod watch event. The ' \
'connection might have been closed. Sleeping for ' \
"#{Thread.current[:pod_watch_retry_backoff_interval]} " \
'seconds and resetting the pod watcher.', e
)
sleep(Thread.current[:pod_watch_retry_backoff_interval])
Thread.current[:pod_watch_retry_count] += 1
Thread.current[:pod_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base
pod_watcher = nil
else
# Since retries failed for many times, log as errors instead
# of info and raise exceptions and trigger Fluentd to restart.
message =
'Exception encountered parsing pod watch event. The ' \
'connection might have been closed. Retried ' \
"#{@watch_retry_max_times} times yet still failing. Restarting."
log.error(message, e)
raise Fluent::UnrecoverableError, message
end
end
end

Expand Down

0 comments on commit 7f16705

Please sign in to comment.