Skip to content

Commit

Permalink
Refresh k8s client on 'Unathorized' exceptions (#337)
Browse files Browse the repository at this point in the history
* Refresh k8s client on 'Unathorized' exceptions

Signed-off-by: Wesley Pettit <wppttt@amazon.com>

* Address PR feedback

Signed-off-by: Wesley Pettit <wppttt@amazon.com>
  • Loading branch information
PettitWesley committed Jun 24, 2022
1 parent d2cfed1 commit b80d65e
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 23 deletions.
70 changes: 47 additions & 23 deletions lib/fluent/plugin/filter_kubernetes_metadata.rb
Expand Up @@ -118,11 +118,21 @@ def fetch_pod_metadata(namespace_name, pod_name)
@stats.bump(:pod_cache_api_updates)
log.trace("parsed metadata for #{namespace_name}/#{pod_name}: #{metadata}")
@cache[metadata['pod_id']] = metadata
rescue StandardError => e
@stats.bump(:pod_cache_api_nil_error)
log.debug "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
{}
end
rescue KubeException => e
if e.error_code == 401
# recreate client to refresh token
log.info("Encountered '401 Unauthorized' exception, recreating client to refresh token")
create_client()
else
log.error "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
@stats.bump(:pod_cache_api_nil_error)
end
{}
rescue StandardError => e
@stats.bump(:pod_cache_api_nil_error)
log.error "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
{}
end

def dump_stats
@curr_time = Time.now
Expand Down Expand Up @@ -150,15 +160,27 @@ def fetch_namespace_metadata(namespace_name)
@stats.bump(:namespace_cache_api_updates)
log.trace("parsed metadata for #{namespace_name}: #{metadata}")
@namespace_cache[metadata['namespace_id']] = metadata
rescue StandardError => e
@stats.bump(:namespace_cache_api_nil_error)
log.debug "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
{}
rescue KubeException => e
if e.error_code == 401
# recreate client to refresh token
log.info("Encountered '401 Unauthorized' exception, recreating client to refresh token")
create_client()
else
log.error "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
@stats.bump(:namespace_cache_api_nil_error)
end
{}
rescue StandardError => e
@stats.bump(:namespace_cache_api_nil_error)
log.error "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
{}
end

def initialize
super
@prev_time = Time.now
@ssl_options = {}
@auth_options = {}
end

def configure(conf)
Expand Down Expand Up @@ -230,7 +252,7 @@ def configure(conf)
end

if present?(@kubernetes_url)
ssl_options = {
@ssl_options = {
client_cert: present?(@client_cert) ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil,
client_key: present?(@client_key) ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil,
ca_file: @ca_file,
Expand All @@ -249,24 +271,14 @@ def configure(conf)
0x80000
end
ssl_store.flags = OpenSSL::X509::V_FLAG_CRL_CHECK_ALL | flagval
ssl_options[:cert_store] = ssl_store
@ssl_options[:cert_store] = ssl_store
end

auth_options = {}

if present?(@bearer_token_file)
bearer_token = File.read(@bearer_token_file)
auth_options[:bearer_token] = bearer_token
@auth_options[:bearer_token_file] = @bearer_token_file
end

log.debug 'Creating K8S client'
@client = Kubeclient::Client.new(
@kubernetes_url,
@apiVersion,
ssl_options: ssl_options,
auth_options: auth_options,
as: :parsed_symbolized
)
create_client()

if @test_api_adapter
log.info "Extending client with test api adaper #{@test_api_adapter}"
Expand Down Expand Up @@ -305,6 +317,18 @@ def configure(conf)
end
end

def create_client()
log.debug 'Creating K8S client'
@client = nil
@client = Kubeclient::Client.new(
@kubernetes_url,
@apiVersion,
ssl_options: @ssl_options,
auth_options: @auth_options,
as: :parsed_symbolized
)
end

def get_metadata_for_record(namespace_name, pod_name, container_name, cache_key, create_time, batch_miss_cache, docker_id)
metadata = {
'docker' => { 'container_id' => "" },
Expand Down
33 changes: 33 additions & 0 deletions lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb
Expand Up @@ -46,6 +46,39 @@ def set_up_namespace_thread
@stats.bump(:namespace_watch_gone_errors)
log.info('410 Gone encountered. Restarting namespace watch to reset resource versions.', e)
namespace_watcher = nil
rescue KubeException => e
if e.error_code == 401
# recreate client to refresh token
log.info("Encountered '401 Unauthorized' exception in watch, recreating client to refresh token")
create_client()
namespace_watcher = nil
else
# treat all other errors the same as StandardError, log, swallow and reset
@stats.bump(:namespace_watch_failures)
if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times
# Instead of raising exceptions and crashing Fluentd, swallow
# the exception and reset the watcher.
log.info(
'Exception encountered parsing namespace watch event. ' \
'The connection might have been closed. Sleeping for ' \
"#{Thread.current[:namespace_watch_retry_backoff_interval]} " \
'seconds and resetting the namespace watcher.', e
)
sleep(Thread.current[:namespace_watch_retry_backoff_interval])
Thread.current[:namespace_watch_retry_count] += 1
Thread.current[:namespace_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base
namespace_watcher = nil
else
# Since retries failed for many times, log as errors instead
# of info and raise exceptions and trigger Fluentd to restart.
message =
'Exception encountered parsing namespace watch event. The ' \
'connection might have been closed. Retried ' \
"#{@watch_retry_max_times} times yet still failing. Restarting."
log.error(message, e)
raise Fluent::UnrecoverableError, message
end
end
rescue StandardError => e
@stats.bump(:namespace_watch_failures)
if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times
Expand Down
33 changes: 33 additions & 0 deletions lib/fluent/plugin/kubernetes_metadata_watch_pods.rb
Expand Up @@ -47,6 +47,39 @@ def set_up_pod_thread
@stats.bump(:pod_watch_gone_errors)
log.info('410 Gone encountered. Restarting pod watch to reset resource versions.', e)
pod_watcher = nil
rescue KubeException => e
if e.error_code == 401
# recreate client to refresh token
log.info("Encountered '401 Unauthorized' exception in watch, recreating client to refresh token")
create_client()
namespace_watcher = nil
else
# treat all other errors the same as StandardError, log, swallow and reset
@stats.bump(:pod_watch_failures)
if Thread.current[:pod_watch_retry_count] < @watch_retry_max_times
# Instead of raising exceptions and crashing Fluentd, swallow
# the exception and reset the watcher.
log.info(
'Exception encountered parsing pod watch event. The ' \
'connection might have been closed. Sleeping for ' \
"#{Thread.current[:pod_watch_retry_backoff_interval]} " \
'seconds and resetting the pod watcher.', e
)
sleep(Thread.current[:pod_watch_retry_backoff_interval])
Thread.current[:pod_watch_retry_count] += 1
Thread.current[:pod_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base
pod_watcher = nil
else
# Since retries failed for many times, log as errors instead
# of info and raise exceptions and trigger Fluentd to restart.
message =
'Exception encountered parsing pod watch event. The ' \
'connection might have been closed. Retried ' \
"#{@watch_retry_max_times} times yet still failing. Restarting."
log.error(message, e)
raise Fluent::UnrecoverableError, message
end
end
rescue StandardError => e
@stats.bump(:pod_watch_failures)
if Thread.current[:pod_watch_retry_count] < @watch_retry_max_times
Expand Down

0 comments on commit b80d65e

Please sign in to comment.