Skip to content

Commit

Permalink
Refactor auth with layers
Browse files Browse the repository at this point in the history
  • Loading branch information
kazk committed Jun 3, 2021
1 parent bb7ca97 commit 336d5f5
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 50 deletions.
30 changes: 5 additions & 25 deletions kube/src/client/mod.rs
Expand Up @@ -13,7 +13,7 @@ use std::convert::TryFrom;
use bytes::Bytes;
use either::{Either, Left, Right};
use futures::{self, Stream, StreamExt, TryStream, TryStreamExt};
use http::{self, HeaderValue, Request, Response, StatusCode};
use http::{self, Request, Response, StatusCode};
use hyper::{client::HttpConnector, Body};
use hyper_timeout::TimeoutConnector;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
Expand All @@ -33,8 +33,8 @@ use tower_http::{

use crate::{
api::WatchEvent,
error::{ConfigError, ErrorResponse},
service::{AuthLayer, Authentication, SetBaseUriLayer, SetHeadersLayer},
error::ErrorResponse,
service::{Authentication, SetBaseUriLayer, SetHeadersLayer},
Config, Error, Result,
};

Expand Down Expand Up @@ -406,30 +406,10 @@ impl TryFrom<Config> for Client {
use tracing::Span;

let cluster_url = config.cluster_url.clone();
let mut default_headers = config.headers.clone();
let default_headers = config.headers.clone();
let timeout = config.timeout;
let default_ns = config.default_ns.clone();

// AuthLayer is not necessary unless `RefreshableToken`
let maybe_auth = match Authentication::try_from(&config.auth_info)? {
Authentication::None => None,
Authentication::Basic(s) => {
let mut value =
HeaderValue::try_from(format!("Basic {}", &s)).map_err(ConfigError::InvalidBasicAuth)?;
value.set_sensitive(true);
default_headers.insert(http::header::AUTHORIZATION, value);
None
}
Authentication::Token(s) => {
let mut value = HeaderValue::try_from(format!("Bearer {}", &s))
.map_err(ConfigError::InvalidBearerToken)?;
value.set_sensitive(true);
default_headers.insert(http::header::AUTHORIZATION, value);
None
}
Authentication::RefreshableToken(r) => Some(AuthLayer::new(r)),
};

let common = ServiceBuilder::new()
.layer(SetBaseUriLayer::new(cluster_url))
.layer(SetHeadersLayer::new(default_headers))
Expand Down Expand Up @@ -472,7 +452,7 @@ impl TryFrom<Config> for Client {

let inner = ServiceBuilder::new()
.layer(common)
.option_layer(maybe_auth)
.option_layer(Authentication::try_from(&config.auth_info)?.into_layer())
.layer(
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
Expand Down
63 changes: 63 additions & 0 deletions kube/src/service/auth/add_authorization.rs
@@ -0,0 +1,63 @@
// Borrowing from https://github.com/tower-rs/tower-http/pull/95
// TODO Use `tower-http`'s version once released
use std::task::{Context, Poll};

use http::{HeaderValue, Request};
use tower::{layer::Layer, Service};

#[derive(Debug, Clone)]
pub struct AddAuthorizationLayer {
value: HeaderValue,
}

impl AddAuthorizationLayer {
pub fn basic(username: &str, password: &str) -> Self {
let encoded = base64::encode(format!("{}:{}", username, password));
let mut value = HeaderValue::from_str(&format!("Basic {}", encoded)).unwrap();
value.set_sensitive(true);
Self { value }
}

pub fn bearer(token: &str) -> Self {
let mut value =
HeaderValue::from_str(&format!("Bearer {}", token)).expect("token is not valid header");
value.set_sensitive(true);
Self { value }
}
}

impl<S> Layer<S> for AddAuthorizationLayer {
type Service = AddAuthorization<S>;

fn layer(&self, inner: S) -> Self::Service {
AddAuthorization {
inner,
value: self.value.clone(),
}
}
}

#[derive(Debug, Clone)]
pub struct AddAuthorization<S> {
inner: S,
value: HeaderValue,
}

impl<S, ReqBody> Service<Request<ReqBody>> for AddAuthorization<S>
where
S: Service<Request<ReqBody>>,
{
type Error = S::Error;
type Future = S::Future;
type Response = S::Response;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, mut req: Request<ReqBody>) -> Self::Future {
req.headers_mut()
.insert(http::header::AUTHORIZATION, self.value.clone());
self.inner.call(req)
}
}
24 changes: 19 additions & 5 deletions kube/src/service/auth/mod.rs
Expand Up @@ -5,6 +5,7 @@ use http::HeaderValue;
use jsonpath_lib::select as jsonpath_select;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tower::util::Either;

use crate::{
config::{read_file_to_string, AuthInfo, AuthProviderConfig, ExecConfig},
Expand All @@ -14,18 +15,31 @@ use crate::{

#[cfg(feature = "oauth")] mod oauth;

mod layer;
pub(crate) use layer::AuthLayer;
mod add_authorization;
mod refreshing_token;
pub(crate) use add_authorization::AddAuthorizationLayer;
pub(crate) use refreshing_token::RefreshingTokenLayer;

#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Authentication {
None,
Basic(String),
Basic(String, String),
Token(String),
RefreshableToken(RefreshableToken),
}

impl Authentication {
pub(crate) fn into_layer(self) -> Option<Either<AddAuthorizationLayer, RefreshingTokenLayer>> {
match self {
Authentication::None => None,
Authentication::Basic(user, pass) => Some(Either::A(AddAuthorizationLayer::basic(&user, &pass))),
Authentication::Token(token) => Some(Either::A(AddAuthorizationLayer::bearer(&token))),
Authentication::RefreshableToken(r) => Some(Either::B(RefreshingTokenLayer::new(r))),
}
}
}

// See https://github.com/kubernetes/kubernetes/tree/master/staging/src/k8s.io/client-go/plugin/pkg/client/auth
// for the list of auth-plugins supported by client-go.
// We currently support the following:
Expand All @@ -48,7 +62,7 @@ impl RefreshableToken {
// conditions where the token expires while we are refreshing
if Utc::now() + Duration::seconds(60) >= locked_data.1 {
match Authentication::try_from(&locked_data.2)? {
Authentication::None | Authentication::Basic(_) | Authentication::Token(_) => {
Authentication::None | Authentication::Basic(_, _) | Authentication::Token(_) => {
return Err(ConfigError::UnrefreshableTokenResponse).map_err(Error::from);
}

Expand Down Expand Up @@ -124,7 +138,7 @@ impl TryFrom<&AuthInfo> for Authentication {
}

if let (Some(u), Some(p)) = (&auth_info.username, &auth_info.password) {
return Ok(Authentication::Basic(base64::encode(&format!("{}:{}", u, p))));
return Ok(Authentication::Basic(u.to_owned(), p.to_owned()));
}

let (raw_token, expiration) = match &auth_info.token {
Expand Down
Expand Up @@ -11,34 +11,35 @@ use tower::{layer::Layer, BoxError, Service};
use super::RefreshableToken;
use crate::Result;

/// `Layer` to decorate the request with `Authorization` header.
pub struct AuthLayer {
auth: RefreshableToken,
// TODO Come up with a better name
/// `Layer` to decorate the request with `Authorization` header with token refreshing automatically.
pub struct RefreshingTokenLayer {
refreshable: RefreshableToken,
}

impl AuthLayer {
pub(crate) fn new(auth: RefreshableToken) -> Self {
Self { auth }
impl RefreshingTokenLayer {
pub(crate) fn new(refreshable: RefreshableToken) -> Self {
Self { refreshable }
}
}

impl<S> Layer<S> for AuthLayer {
type Service = AuthService<S>;
impl<S> Layer<S> for RefreshingTokenLayer {
type Service = RefreshingToken<S>;

fn layer(&self, service: S) -> Self::Service {
AuthService {
auth: self.auth.clone(),
RefreshingToken {
refreshable: self.refreshable.clone(),
service,
}
}
}

pub struct AuthService<S> {
auth: RefreshableToken,
pub struct RefreshingToken<S> {
refreshable: RefreshableToken,
service: S,
}

impl<S, ReqB, ResB> Service<Request<ReqB>> for AuthService<S>
impl<S, ReqB, ResB> Service<Request<ReqB>> for RefreshingToken<S>
where
S: Service<Request<ReqB>, Response = Response<ResB>> + Clone,
S::Error: Into<BoxError>,
Expand All @@ -62,7 +63,7 @@ where
let service = self.service.clone();
let service = std::mem::replace(&mut self.service, service);

let auth = self.auth.clone();
let auth = self.refreshable.clone();
let request = async move {
auth.to_header().await.map_err(BoxError::from).map(|value| {
req.headers_mut().insert(AUTHORIZATION, value);
Expand Down Expand Up @@ -147,7 +148,7 @@ mod tests {
const TOKEN: &str = "test";
let auth = test_token(TOKEN.into());
let (mut service, handle): (_, Handle<Request<hyper::Body>, Response<hyper::Body>>) =
mock::spawn_layer(AuthLayer::new(auth));
mock::spawn_layer(RefreshingTokenLayer::new(auth));

let spawned = tokio::spawn(async move {
// Receive the requests and respond
Expand All @@ -173,7 +174,7 @@ mod tests {
const TOKEN: &str = "\n";
let auth = test_token(TOKEN.into());
let (mut service, _handle) =
mock::spawn_layer::<Request<Body>, Response<Body>, _>(AuthLayer::new(auth));
mock::spawn_layer::<Request<Body>, Response<Body>, _>(RefreshingTokenLayer::new(auth));
let err = service
.call(Request::builder().uri("/").body(Body::empty()).unwrap())
.await
Expand Down
5 changes: 1 addition & 4 deletions kube/src/service/mod.rs
Expand Up @@ -4,8 +4,5 @@ mod auth;
mod base_uri;
mod headers;

pub(crate) use self::{
auth::{AuthLayer, Authentication},
headers::SetHeadersLayer,
};
pub(crate) use self::{auth::Authentication, headers::SetHeadersLayer};
pub use base_uri::{SetBaseUri, SetBaseUriLayer};

0 comments on commit 336d5f5

Please sign in to comment.