diff --git a/kube/src/client/mod.rs b/kube/src/client/mod.rs index f87670d99..55c2270ee 100644 --- a/kube/src/client/mod.rs +++ b/kube/src/client/mod.rs @@ -13,7 +13,7 @@ use std::convert::TryFrom; use bytes::Bytes; use either::{Either, Left, Right}; use futures::{self, Stream, StreamExt, TryStream, TryStreamExt}; -use http::{self, HeaderValue, Request, Response, StatusCode}; +use http::{self, Request, Response, StatusCode}; use hyper::{client::HttpConnector, Body}; use hyper_timeout::TimeoutConnector; use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1; @@ -33,8 +33,8 @@ use tower_http::{ use crate::{ api::WatchEvent, - error::{ConfigError, ErrorResponse}, - service::{AuthLayer, Authentication, SetBaseUriLayer, SetHeadersLayer}, + error::ErrorResponse, + service::{Authentication, SetBaseUriLayer, SetHeadersLayer}, Config, Error, Result, }; @@ -406,30 +406,10 @@ impl TryFrom for Client { use tracing::Span; let cluster_url = config.cluster_url.clone(); - let mut default_headers = config.headers.clone(); + let default_headers = config.headers.clone(); let timeout = config.timeout; let default_ns = config.default_ns.clone(); - // AuthLayer is not necessary unless `RefreshableToken` - let maybe_auth = match Authentication::try_from(&config.auth_info)? { - Authentication::None => None, - Authentication::Basic(s) => { - let mut value = - HeaderValue::try_from(format!("Basic {}", &s)).map_err(ConfigError::InvalidBasicAuth)?; - value.set_sensitive(true); - default_headers.insert(http::header::AUTHORIZATION, value); - None - } - Authentication::Token(s) => { - let mut value = HeaderValue::try_from(format!("Bearer {}", &s)) - .map_err(ConfigError::InvalidBearerToken)?; - value.set_sensitive(true); - default_headers.insert(http::header::AUTHORIZATION, value); - None - } - Authentication::RefreshableToken(r) => Some(AuthLayer::new(r)), - }; - let common = ServiceBuilder::new() .layer(SetBaseUriLayer::new(cluster_url)) .layer(SetHeadersLayer::new(default_headers)) @@ -472,7 +452,7 @@ impl TryFrom for Client { let inner = ServiceBuilder::new() .layer(common) - .option_layer(maybe_auth) + .option_layer(Authentication::try_from(&config.auth_info)?.into_layer()) .layer( // Attribute names follow [Semantic Conventions]. // [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md diff --git a/kube/src/service/auth/add_authorization.rs b/kube/src/service/auth/add_authorization.rs new file mode 100644 index 000000000..6c3e2428b --- /dev/null +++ b/kube/src/service/auth/add_authorization.rs @@ -0,0 +1,63 @@ +// Borrowing from https://github.com/tower-rs/tower-http/pull/95 +// TODO Use `tower-http`'s version once released +use std::task::{Context, Poll}; + +use http::{HeaderValue, Request}; +use tower::{layer::Layer, Service}; + +#[derive(Debug, Clone)] +pub struct AddAuthorizationLayer { + value: HeaderValue, +} + +impl AddAuthorizationLayer { + pub fn basic(username: &str, password: &str) -> Self { + let encoded = base64::encode(format!("{}:{}", username, password)); + let mut value = HeaderValue::from_str(&format!("Basic {}", encoded)).unwrap(); + value.set_sensitive(true); + Self { value } + } + + pub fn bearer(token: &str) -> Self { + let mut value = + HeaderValue::from_str(&format!("Bearer {}", token)).expect("token is not valid header"); + value.set_sensitive(true); + Self { value } + } +} + +impl Layer for AddAuthorizationLayer { + type Service = AddAuthorization; + + fn layer(&self, inner: S) -> Self::Service { + AddAuthorization { + inner, + value: self.value.clone(), + } + } +} + +#[derive(Debug, Clone)] +pub struct AddAuthorization { + inner: S, + value: HeaderValue, +} + +impl Service> for AddAuthorization +where + S: Service>, +{ + type Error = S::Error; + type Future = S::Future; + type Response = S::Response; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: Request) -> Self::Future { + req.headers_mut() + .insert(http::header::AUTHORIZATION, self.value.clone()); + self.inner.call(req) + } +} diff --git a/kube/src/service/auth/mod.rs b/kube/src/service/auth/mod.rs index 1d70b095e..c677548ce 100644 --- a/kube/src/service/auth/mod.rs +++ b/kube/src/service/auth/mod.rs @@ -5,6 +5,7 @@ use http::HeaderValue; use jsonpath_lib::select as jsonpath_select; use serde::{Deserialize, Serialize}; use tokio::sync::Mutex; +use tower::util::Either; use crate::{ config::{read_file_to_string, AuthInfo, AuthProviderConfig, ExecConfig}, @@ -14,18 +15,31 @@ use crate::{ #[cfg(feature = "oauth")] mod oauth; -mod layer; -pub(crate) use layer::AuthLayer; +mod add_authorization; +mod refreshing_token; +pub(crate) use add_authorization::AddAuthorizationLayer; +pub(crate) use refreshing_token::RefreshingTokenLayer; #[derive(Debug, Clone)] #[allow(clippy::large_enum_variant)] pub(crate) enum Authentication { None, - Basic(String), + Basic(String, String), Token(String), RefreshableToken(RefreshableToken), } +impl Authentication { + pub(crate) fn into_layer(self) -> Option> { + match self { + Authentication::None => None, + Authentication::Basic(user, pass) => Some(Either::A(AddAuthorizationLayer::basic(&user, &pass))), + Authentication::Token(token) => Some(Either::A(AddAuthorizationLayer::bearer(&token))), + Authentication::RefreshableToken(r) => Some(Either::B(RefreshingTokenLayer::new(r))), + } + } +} + // See https://github.com/kubernetes/kubernetes/tree/master/staging/src/k8s.io/client-go/plugin/pkg/client/auth // for the list of auth-plugins supported by client-go. // We currently support the following: @@ -48,7 +62,7 @@ impl RefreshableToken { // conditions where the token expires while we are refreshing if Utc::now() + Duration::seconds(60) >= locked_data.1 { match Authentication::try_from(&locked_data.2)? { - Authentication::None | Authentication::Basic(_) | Authentication::Token(_) => { + Authentication::None | Authentication::Basic(_, _) | Authentication::Token(_) => { return Err(ConfigError::UnrefreshableTokenResponse).map_err(Error::from); } @@ -124,7 +138,7 @@ impl TryFrom<&AuthInfo> for Authentication { } if let (Some(u), Some(p)) = (&auth_info.username, &auth_info.password) { - return Ok(Authentication::Basic(base64::encode(&format!("{}:{}", u, p)))); + return Ok(Authentication::Basic(u.to_owned(), p.to_owned())); } let (raw_token, expiration) = match &auth_info.token { diff --git a/kube/src/service/auth/layer.rs b/kube/src/service/auth/refreshing_token.rs similarity index 87% rename from kube/src/service/auth/layer.rs rename to kube/src/service/auth/refreshing_token.rs index 07a7b07d6..ff5e204b7 100644 --- a/kube/src/service/auth/layer.rs +++ b/kube/src/service/auth/refreshing_token.rs @@ -11,34 +11,35 @@ use tower::{layer::Layer, BoxError, Service}; use super::RefreshableToken; use crate::Result; -/// `Layer` to decorate the request with `Authorization` header. -pub struct AuthLayer { - auth: RefreshableToken, +// TODO Come up with a better name +/// `Layer` to decorate the request with `Authorization` header with token refreshing automatically. +pub struct RefreshingTokenLayer { + refreshable: RefreshableToken, } -impl AuthLayer { - pub(crate) fn new(auth: RefreshableToken) -> Self { - Self { auth } +impl RefreshingTokenLayer { + pub(crate) fn new(refreshable: RefreshableToken) -> Self { + Self { refreshable } } } -impl Layer for AuthLayer { - type Service = AuthService; +impl Layer for RefreshingTokenLayer { + type Service = RefreshingToken; fn layer(&self, service: S) -> Self::Service { - AuthService { - auth: self.auth.clone(), + RefreshingToken { + refreshable: self.refreshable.clone(), service, } } } -pub struct AuthService { - auth: RefreshableToken, +pub struct RefreshingToken { + refreshable: RefreshableToken, service: S, } -impl Service> for AuthService +impl Service> for RefreshingToken where S: Service, Response = Response> + Clone, S::Error: Into, @@ -62,7 +63,7 @@ where let service = self.service.clone(); let service = std::mem::replace(&mut self.service, service); - let auth = self.auth.clone(); + let auth = self.refreshable.clone(); let request = async move { auth.to_header().await.map_err(BoxError::from).map(|value| { req.headers_mut().insert(AUTHORIZATION, value); @@ -147,7 +148,7 @@ mod tests { const TOKEN: &str = "test"; let auth = test_token(TOKEN.into()); let (mut service, handle): (_, Handle, Response>) = - mock::spawn_layer(AuthLayer::new(auth)); + mock::spawn_layer(RefreshingTokenLayer::new(auth)); let spawned = tokio::spawn(async move { // Receive the requests and respond @@ -173,7 +174,7 @@ mod tests { const TOKEN: &str = "\n"; let auth = test_token(TOKEN.into()); let (mut service, _handle) = - mock::spawn_layer::, Response, _>(AuthLayer::new(auth)); + mock::spawn_layer::, Response, _>(RefreshingTokenLayer::new(auth)); let err = service .call(Request::builder().uri("/").body(Body::empty()).unwrap()) .await diff --git a/kube/src/service/mod.rs b/kube/src/service/mod.rs index f9883276d..db7dda50f 100644 --- a/kube/src/service/mod.rs +++ b/kube/src/service/mod.rs @@ -4,8 +4,5 @@ mod auth; mod base_uri; mod headers; -pub(crate) use self::{ - auth::{AuthLayer, Authentication}, - headers::SetHeadersLayer, -}; +pub(crate) use self::{auth::Authentication, headers::SetHeadersLayer}; pub use base_uri::{SetBaseUri, SetBaseUriLayer};