Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refresh k8s client on 'Unathorized' exceptions #337

Merged
merged 2 commits into from Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 47 additions & 23 deletions lib/fluent/plugin/filter_kubernetes_metadata.rb
Expand Up @@ -118,11 +118,21 @@ def fetch_pod_metadata(namespace_name, pod_name)
@stats.bump(:pod_cache_api_updates)
log.trace("parsed metadata for #{namespace_name}/#{pod_name}: #{metadata}")
@cache[metadata['pod_id']] = metadata
rescue StandardError => e
@stats.bump(:pod_cache_api_nil_error)
log.debug "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
{}
end
rescue KubeException => e
if e.error_code == 401
# recreate client to refresh token
log.info("Encountered '401 Unauthorized' exception, recreating client to refresh token")
create_client()
else
log.error "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
@stats.bump(:pod_cache_api_nil_error)
end
jcantrill marked this conversation as resolved.
Show resolved Hide resolved
{}
rescue StandardError => e
@stats.bump(:pod_cache_api_nil_error)
log.error "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
{}
end

def dump_stats
@curr_time = Time.now
Expand Down Expand Up @@ -150,15 +160,27 @@ def fetch_namespace_metadata(namespace_name)
@stats.bump(:namespace_cache_api_updates)
log.trace("parsed metadata for #{namespace_name}: #{metadata}")
@namespace_cache[metadata['namespace_id']] = metadata
rescue StandardError => e
@stats.bump(:namespace_cache_api_nil_error)
log.debug "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
{}
rescue KubeException => e
if e.error_code == 401
# recreate client to refresh token
log.info("Encountered '401 Unauthorized' exception, recreating client to refresh token")
create_client()
else
log.error "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
@stats.bump(:namespace_cache_api_nil_error)
end
jcantrill marked this conversation as resolved.
Show resolved Hide resolved
{}
rescue StandardError => e
@stats.bump(:namespace_cache_api_nil_error)
log.error "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
{}
end

def initialize
super
@prev_time = Time.now
@ssl_options = {}
@auth_options = {}
end

def configure(conf)
Expand Down Expand Up @@ -230,7 +252,7 @@ def configure(conf)
end

if present?(@kubernetes_url)
ssl_options = {
@ssl_options = {
client_cert: present?(@client_cert) ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil,
client_key: present?(@client_key) ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil,
ca_file: @ca_file,
Expand All @@ -249,24 +271,14 @@ def configure(conf)
0x80000
end
ssl_store.flags = OpenSSL::X509::V_FLAG_CRL_CHECK_ALL | flagval
ssl_options[:cert_store] = ssl_store
@ssl_options[:cert_store] = ssl_store
end

auth_options = {}

if present?(@bearer_token_file)
bearer_token = File.read(@bearer_token_file)
auth_options[:bearer_token] = bearer_token
@auth_options[:bearer_token_file] = @bearer_token_file
end

log.debug 'Creating K8S client'
@client = Kubeclient::Client.new(
@kubernetes_url,
@apiVersion,
ssl_options: ssl_options,
auth_options: auth_options,
as: :parsed_symbolized
)
create_client()

if @test_api_adapter
log.info "Extending client with test api adaper #{@test_api_adapter}"
Expand Down Expand Up @@ -305,6 +317,18 @@ def configure(conf)
end
end

def create_client()
log.debug 'Creating K8S client'
jcantrill marked this conversation as resolved.
Show resolved Hide resolved
@client = nil
@client = Kubeclient::Client.new(
@kubernetes_url,
@apiVersion,
ssl_options: @ssl_options,
auth_options: @auth_options,
as: :parsed_symbolized
)
end

def get_metadata_for_record(namespace_name, pod_name, container_name, cache_key, create_time, batch_miss_cache, docker_id)
metadata = {
'docker' => { 'container_id' => "" },
Expand Down
33 changes: 33 additions & 0 deletions lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb
Expand Up @@ -46,6 +46,39 @@ def set_up_namespace_thread
@stats.bump(:namespace_watch_gone_errors)
log.info('410 Gone encountered. Restarting namespace watch to reset resource versions.', e)
namespace_watcher = nil
rescue KubeException => e
if e.error_code == 401
# recreate client to refresh token
log.info("Encountered '401 Unauthorized' exception in watch, recreating client to refresh token")
create_client()
namespace_watcher = nil
else
# treat all other errors the same as StandardError, log, swallow and reset
@stats.bump(:namespace_watch_failures)
if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times
# Instead of raising exceptions and crashing Fluentd, swallow
# the exception and reset the watcher.
log.info(
'Exception encountered parsing namespace watch event. ' \
'The connection might have been closed. Sleeping for ' \
"#{Thread.current[:namespace_watch_retry_backoff_interval]} " \
'seconds and resetting the namespace watcher.', e
)
sleep(Thread.current[:namespace_watch_retry_backoff_interval])
Thread.current[:namespace_watch_retry_count] += 1
Thread.current[:namespace_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base
namespace_watcher = nil
else
# Since retries failed for many times, log as errors instead
# of info and raise exceptions and trigger Fluentd to restart.
message =
'Exception encountered parsing namespace watch event. The ' \
'connection might have been closed. Retried ' \
"#{@watch_retry_max_times} times yet still failing. Restarting."
log.error(message, e)
raise Fluent::UnrecoverableError, message
end
end
rescue StandardError => e
@stats.bump(:namespace_watch_failures)
if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times
Expand Down
33 changes: 33 additions & 0 deletions lib/fluent/plugin/kubernetes_metadata_watch_pods.rb
Expand Up @@ -47,6 +47,39 @@ def set_up_pod_thread
@stats.bump(:pod_watch_gone_errors)
log.info('410 Gone encountered. Restarting pod watch to reset resource versions.', e)
pod_watcher = nil
rescue KubeException => e
if e.error_code == 401
# recreate client to refresh token
log.info("Encountered '401 Unauthorized' exception in watch, recreating client to refresh token")
create_client()
namespace_watcher = nil
else
# treat all other errors the same as StandardError, log, swallow and reset
@stats.bump(:pod_watch_failures)
if Thread.current[:pod_watch_retry_count] < @watch_retry_max_times
# Instead of raising exceptions and crashing Fluentd, swallow
# the exception and reset the watcher.
log.info(
'Exception encountered parsing pod watch event. The ' \
'connection might have been closed. Sleeping for ' \
"#{Thread.current[:pod_watch_retry_backoff_interval]} " \
'seconds and resetting the pod watcher.', e
)
sleep(Thread.current[:pod_watch_retry_backoff_interval])
Thread.current[:pod_watch_retry_count] += 1
Thread.current[:pod_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base
pod_watcher = nil
else
# Since retries failed for many times, log as errors instead
# of info and raise exceptions and trigger Fluentd to restart.
message =
'Exception encountered parsing pod watch event. The ' \
'connection might have been closed. Retried ' \
"#{@watch_retry_max_times} times yet still failing. Restarting."
log.error(message, e)
raise Fluent::UnrecoverableError, message
end
end
rescue StandardError => e
@stats.bump(:pod_watch_failures)
if Thread.current[:pod_watch_retry_count] < @watch_retry_max_times
Expand Down