Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refresh k8s client on 'Unathorized' exceptions #337

Merged
merged 2 commits into from Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 29 additions & 14 deletions lib/fluent/plugin/filter_kubernetes_metadata.rb
Expand Up @@ -121,6 +121,12 @@ def fetch_pod_metadata(namespace_name, pod_name)
rescue StandardError => e
@stats.bump(:pod_cache_api_nil_error)
log.debug "Exception '#{e}' encountered fetching pod metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
if e.message == "Unauthorized"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a transport error of some kind that is throw where we can evaluate a response code (e.g. 401)? This would seem to be more consistent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me explain how I chose this option. But first disclaimer- so to be honest I am not a ruby dev and I sort I actually mostly work on Fluent Bit but I was asked to work on it since its important to our customers but I mostly don't know what I'm doing here.

I had the same thought as you though, what's the most canonical way to match this specific error.

So I ran this code and recorded what it outputted: https://github.com/PettitWesley/fluent-plugin-kubernetes_metadata_filter/blob/attempt_2/lib/fluent/plugin/kubernetes_metadata_watch_pods.rb#L53

And the result is this screenshot. Basically, in the string representation of the exception, which gets printed when you print it, there is an http code in the string (but not in the message or full_message fields, which is interesting). It felt wrong to match on that full string and the "Unauthorized" was easy to match on so I picked it. it didn't seem like there was an actual field on the exception object that would give me the code but may be I just didn't know how to find it.

Screen Shot 2022-06-06 at 6 42 00 PM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do believe using 401 error_code to decide the correct path is the proper option here.
you'll probably be able to use e.error_code instead of e.message.

side note, this is most properly a temporary fix as the underlying work should be done in the kubeclient library.
the bigger issue is that there is no backport coming to 4.9.4+ and they are going straight to 5.x which, in turn, means that all developers using this library will have to update to 5.x to have this feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@larivierec are you confirming this fix will be part of 5.x? Is there any reason we could not consume those changes in this library in lieu of making these larger changes to this plugin?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, as i'm not a contributor I cannot guarantee that the fix will be in the library.
however, based on master, you'll see that's this support is already on the master branch

Look at the following issue to be sure.
ManageIQ/kubeclient#561

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was that the maintainers of that repo have not given any timeline for a release. Hence, its not something we can use to get a fix out to users ASAP- which is what AWS kubernetes users have requested and thus I am working on this.

If there is any way that you can think of that we can reach out to them to speed up that release, or any way that this project can consume that code on master... its definitely preferable to my change.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a problem with the PR at all. imo, I would probably add this fix using the error_code rather than message. it's never a bad thing to rely solely on underlying libraries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool!

it's never a bad thing to rely solely on underlying libraries.

Wait I think you meant- it is a bad thing to rely solely on the underlying library, we should have protections for basic things like unauthorized exceptions in this code base as well? That's what I was thinking too after I gave it some more thought... I also just updated this PR with the error_code change and I'm testing it now..

@client = nil
# recreate client to refresh token
log.info("Re-creating Kubernetes API Client to refresh auth bearer token.")
create_client()
end
{}
end

Expand Down Expand Up @@ -153,12 +159,20 @@ def fetch_namespace_metadata(namespace_name)
rescue StandardError => e
@stats.bump(:namespace_cache_api_nil_error)
log.debug "Exception '#{e}' encountered fetching namespace metadata from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}"
if e.message == "Unauthorized"
@client = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be in "create_client"

# recreate client to refresh token
log.info("Re-creating Kubernetes API Client to refresh auth bearer token.")
create_client()
end
{}
end

def initialize
super
@prev_time = Time.now
@ssl_options = {}
@auth_options = {}
end

def configure(conf)
Expand Down Expand Up @@ -230,7 +244,7 @@ def configure(conf)
end

if present?(@kubernetes_url)
ssl_options = {
@ssl_options = {
client_cert: present?(@client_cert) ? OpenSSL::X509::Certificate.new(File.read(@client_cert)) : nil,
client_key: present?(@client_key) ? OpenSSL::PKey::RSA.new(File.read(@client_key)) : nil,
ca_file: @ca_file,
Expand All @@ -249,24 +263,14 @@ def configure(conf)
0x80000
end
ssl_store.flags = OpenSSL::X509::V_FLAG_CRL_CHECK_ALL | flagval
ssl_options[:cert_store] = ssl_store
@ssl_options[:cert_store] = ssl_store
end

auth_options = {}

if present?(@bearer_token_file)
bearer_token = File.read(@bearer_token_file)
auth_options[:bearer_token] = bearer_token
@auth_options[:bearer_token_file] = @bearer_token_file
end

log.debug 'Creating K8S client'
@client = Kubeclient::Client.new(
@kubernetes_url,
@apiVersion,
ssl_options: ssl_options,
auth_options: auth_options,
as: :parsed_symbolized
)
create_client()

if @test_api_adapter
log.info "Extending client with test api adaper #{@test_api_adapter}"
Expand Down Expand Up @@ -305,6 +309,17 @@ def configure(conf)
end
end

def create_client()
log.debug 'Creating K8S client'
jcantrill marked this conversation as resolved.
Show resolved Hide resolved
@client = Kubeclient::Client.new(
@kubernetes_url,
@apiVersion,
ssl_options: @ssl_options,
auth_options: @auth_options,
as: :parsed_symbolized
)
end

def get_metadata_for_record(namespace_name, pod_name, container_name, cache_key, create_time, batch_miss_cache, docker_id)
metadata = {
'docker' => { 'container_id' => "" },
Expand Down
50 changes: 29 additions & 21 deletions lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb
Expand Up @@ -47,29 +47,37 @@ def set_up_namespace_thread
log.info('410 Gone encountered. Restarting namespace watch to reset resource versions.', e)
namespace_watcher = nil
rescue StandardError => e
@stats.bump(:namespace_watch_failures)
if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times
# Instead of raising exceptions and crashing Fluentd, swallow
# the exception and reset the watcher.
log.info(
'Exception encountered parsing namespace watch event. ' \
'The connection might have been closed. Sleeping for ' \
"#{Thread.current[:namespace_watch_retry_backoff_interval]} " \
'seconds and resetting the namespace watcher.', e
)
sleep(Thread.current[:namespace_watch_retry_backoff_interval])
Thread.current[:namespace_watch_retry_count] += 1
Thread.current[:namespace_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base
if e.message == "Unauthorized"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question here regarding evaluating against a known code instead of a string

@client = nil
# recreate client to refresh token
log.info("Encountered 'Unauthorized' exception in watch, recreating client to refresh token")
create_client()
namespace_watcher = nil
else
# Since retries failed for many times, log as errors instead
# of info and raise exceptions and trigger Fluentd to restart.
message =
'Exception encountered parsing namespace watch event. The ' \
'connection might have been closed. Retried ' \
"#{@watch_retry_max_times} times yet still failing. Restarting."
log.error(message, e)
raise Fluent::UnrecoverableError, message
@stats.bump(:namespace_watch_failures)
if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times
# Instead of raising exceptions and crashing Fluentd, swallow
# the exception and reset the watcher.
log.info(
'Exception encountered parsing namespace watch event. ' \
'The connection might have been closed. Sleeping for ' \
"#{Thread.current[:namespace_watch_retry_backoff_interval]} " \
'seconds and resetting the namespace watcher.', e
)
sleep(Thread.current[:namespace_watch_retry_backoff_interval])
Thread.current[:namespace_watch_retry_count] += 1
Thread.current[:namespace_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base
namespace_watcher = nil
else
# Since retries failed for many times, log as errors instead
# of info and raise exceptions and trigger Fluentd to restart.
message =
'Exception encountered parsing namespace watch event. The ' \
'connection might have been closed. Retried ' \
"#{@watch_retry_max_times} times yet still failing. Restarting."
log.error(message, e)
raise Fluent::UnrecoverableError, message
end
end
end
end
Expand Down
50 changes: 29 additions & 21 deletions lib/fluent/plugin/kubernetes_metadata_watch_pods.rb
Expand Up @@ -48,29 +48,37 @@ def set_up_pod_thread
log.info('410 Gone encountered. Restarting pod watch to reset resource versions.', e)
pod_watcher = nil
rescue StandardError => e
@stats.bump(:pod_watch_failures)
if Thread.current[:pod_watch_retry_count] < @watch_retry_max_times
# Instead of raising exceptions and crashing Fluentd, swallow
# the exception and reset the watcher.
log.info(
'Exception encountered parsing pod watch event. The ' \
'connection might have been closed. Sleeping for ' \
"#{Thread.current[:pod_watch_retry_backoff_interval]} " \
'seconds and resetting the pod watcher.', e
)
sleep(Thread.current[:pod_watch_retry_backoff_interval])
Thread.current[:pod_watch_retry_count] += 1
Thread.current[:pod_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base
if e.message == "Unauthorized"
@client = nil
# recreate client to refresh token
log.info("Encountered 'Unauthorized' exception in watch, recreating client to refresh token")
create_client()
pod_watcher = nil
else
# Since retries failed for many times, log as errors instead
# of info and raise exceptions and trigger Fluentd to restart.
message =
'Exception encountered parsing pod watch event. The ' \
'connection might have been closed. Retried ' \
"#{@watch_retry_max_times} times yet still failing. Restarting."
log.error(message, e)
raise Fluent::UnrecoverableError, message
@stats.bump(:pod_watch_failures)
if Thread.current[:pod_watch_retry_count] < @watch_retry_max_times
# Instead of raising exceptions and crashing Fluentd, swallow
# the exception and reset the watcher.
log.info(
'Exception encountered parsing pod watch event. The ' \
'connection might have been closed. Sleeping for ' \
"#{Thread.current[:pod_watch_retry_backoff_interval]} " \
'seconds and resetting the pod watcher.', e
)
sleep(Thread.current[:pod_watch_retry_backoff_interval])
Thread.current[:pod_watch_retry_count] += 1
Thread.current[:pod_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base
pod_watcher = nil
else
# Since retries failed for many times, log as errors instead
# of info and raise exceptions and trigger Fluentd to restart.
message =
'Exception encountered parsing pod watch event. The ' \
'connection might have been closed. Retried ' \
"#{@watch_retry_max_times} times yet still failing. Restarting."
log.error(message, e)
raise Fluent::UnrecoverableError, message
end
end
end
end
Expand Down