Skip to content

Commit

Permalink
add moving with_default_namespace Client setter - fixes #543 (#544)
Browse files Browse the repository at this point in the history
* add moving with_default_ns Client setter - fixes #543

* rename Config::default_ns to Config::default_namespace

* remove Client::new_with_default_ns in favour of setter

also document

* default_namespace as required arg to avoid errors

* Update kube/src/client/mod.rs

Co-authored-by: kazk <kazk.dev@gmail.com>

Co-authored-by: kazk <kazk.dev@gmail.com>
  • Loading branch information
clux and kazk committed Jun 7, 2021
1 parent 0339b85 commit 79e7e10
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 96 deletions.
24 changes: 11 additions & 13 deletions examples/custom_client.rs
@@ -1,30 +1,28 @@
// Minimal custom client example.
use k8s_openapi::api::core::v1::ConfigMap;
use tower::ServiceBuilder;
use k8s_openapi::api::core::v1::Pod;

use kube::{
api::{Api, ListParams},
Api, ResourceExt,
client::ConfigExt,
Client, Config,
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
std::env::set_var("RUST_LOG", "info,kube=trace");
tracing_subscriber::fmt::init();

let config = Config::infer().await?;
let https = config.native_tls_https_connector()?;
let client = Client::new(
ServiceBuilder::new()
.layer(config.base_uri_layer())
.option_layer(config.auth_layer()?)
.service(hyper::Client::builder().build(https)),
);
let service = tower::ServiceBuilder::new()
.layer(config.base_uri_layer())
.option_layer(config.auth_layer()?)
.service(hyper::Client::builder().build(https));
let client = Client::new(service, config.default_namespace);

let cms: Api<ConfigMap> = Api::namespaced(client, "default");
for cm in cms.list(&ListParams::default()).await? {
println!("{:?}", cm);
let pods: Api<Pod> = Api::default_namespaced(client);
for p in pods.list(&Default::default()).await? {
println!("{}", p.name());
}

Ok(())
Expand Down
28 changes: 13 additions & 15 deletions examples/custom_client_tls.rs
@@ -1,11 +1,11 @@
// Custom client supporting both native-tls and rustls-tls
// Must enable `rustls-tls` feature to run this.
// Run with `USE_RUSTLS=1` to pick rustls.
use k8s_openapi::api::core::v1::ConfigMap;
use k8s_openapi::api::core::v1::Pod;
use tower::ServiceBuilder;

use kube::{
api::{Api, ListParams},
Api, ResourceExt,
client::ConfigExt,
Client, Config,
};
Expand All @@ -21,23 +21,21 @@ async fn main() -> anyhow::Result<()> {
let use_rustls = std::env::var("USE_RUSTLS").map(|s| s == "1").unwrap_or(false);
let client = if use_rustls {
let https = config.rustls_https_connector()?;
Client::new(
ServiceBuilder::new()
.layer(config.base_uri_layer())
.service(hyper::Client::builder().build(https)),
)
let service = ServiceBuilder::new()
.layer(config.base_uri_layer())
.service(hyper::Client::builder().build(https));
Client::new(service, config.default_namespace)
} else {
let https = config.native_tls_https_connector()?;
Client::new(
ServiceBuilder::new()
.layer(config.base_uri_layer())
.service(hyper::Client::builder().build(https)),
)
let service = ServiceBuilder::new()
.layer(config.base_uri_layer())
.service(hyper::Client::builder().build(https));
Client::new(service, config.default_namespace)
};

let cms: Api<ConfigMap> = Api::namespaced(client, "default");
for cm in cms.list(&ListParams::default()).await? {
println!("{:?}", cm);
let pods: Api<Pod> = Api::default_namespaced(client);
for p in pods.list(&Default::default()).await? {
println!("{}", p.name());
}

Ok(())
Expand Down
78 changes: 39 additions & 39 deletions examples/custom_client_trace.rs
Expand Up @@ -3,13 +3,13 @@ use std::time::Duration;

use http::{Request, Response};
use hyper::Body;
use k8s_openapi::api::core::v1::ConfigMap;
use k8s_openapi::api::core::v1::Pod;
use tower::ServiceBuilder;
use tower_http::{decompression::DecompressionLayer, trace::TraceLayer};
use tracing::Span;

use kube::{
api::{Api, ListParams},
Api, ResourceExt,
client::ConfigExt,
Client, Config,
};
Expand All @@ -21,44 +21,44 @@ async fn main() -> anyhow::Result<()> {

let config = Config::infer().await?;
let https = config.native_tls_https_connector()?;
let client = Client::new(
ServiceBuilder::new()
.layer(config.base_uri_layer())
// Add `DecompressionLayer` to make request headers interesting.
.layer(DecompressionLayer::new())
.layer(
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-client
TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
tracing::debug_span!(
"HTTP",
http.method = %request.method(),
http.url = %request.uri(),
http.status_code = tracing::field::Empty,
otel.name = %format!("HTTP {}", request.method()),
otel.kind = "client",
otel.status_code = tracing::field::Empty,
)
})
.on_request(|request: &Request<Body>, _span: &Span| {
tracing::debug!("payload: {:?} headers: {:?}", request.body(), request.headers())
})
.on_response(|response: &Response<Body>, latency: Duration, span: &Span| {
let status = response.status();
span.record("http.status_code", &status.as_u16());
if status.is_client_error() || status.is_server_error() {
span.record("otel.status_code", &"ERROR");
}
tracing::debug!("finished in {}ms", latency.as_millis())
}),
)
.service(hyper::Client::builder().build(https)),
);
let service = ServiceBuilder::new()
.layer(config.base_uri_layer())
// Add `DecompressionLayer` to make request headers interesting.
.layer(DecompressionLayer::new())
.layer(
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-client
TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
tracing::debug_span!(
"HTTP",
http.method = %request.method(),
http.url = %request.uri(),
http.status_code = tracing::field::Empty,
otel.name = %format!("HTTP {}", request.method()),
otel.kind = "client",
otel.status_code = tracing::field::Empty,
)
})
.on_request(|request: &Request<Body>, _span: &Span| {
tracing::debug!("payload: {:?} headers: {:?}", request.body(), request.headers())
})
.on_response(|response: &Response<Body>, latency: Duration, span: &Span| {
let status = response.status();
span.record("http.status_code", &status.as_u16());
if status.is_client_error() || status.is_server_error() {
span.record("otel.status_code", &"ERROR");
}
tracing::debug!("finished in {}ms", latency.as_millis())
}),
)
.service(hyper::Client::builder().build(https));

let cms: Api<ConfigMap> = Api::namespaced(client, "default");
for cm in cms.list(&ListParams::default()).await? {
println!("{:?}", cm);
let client = Client::new(service, config.default_namespace);

let pods: Api<Pod> = Api::default_namespaced(client);
for p in pods.list(&Default::default()).await? {
println!("{}", p.name());
}

Ok(())
Expand Down
34 changes: 11 additions & 23 deletions kube/src/client/mod.rs
Expand Up @@ -81,42 +81,30 @@ impl Client {
/// use tower::ServiceBuilder;
///
/// let config = Config::infer().await?;
/// let client = Client::new(
/// ServiceBuilder::new()
/// .layer(config.base_uri_layer())
/// .option_layer(config.auth_layer()?)
/// .service(hyper::Client::new()),
/// );
/// let service = ServiceBuilder::new()
/// .layer(config.base_uri_layer())
/// .option_layer(config.auth_layer()?)
/// .service(hyper::Client::new());
/// let client = Client::new(service, config.default_namespace);
/// # Ok(())
/// # }
/// ```
pub fn new<S, B>(service: S) -> Self
where
S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: std::error::Error + Send + Sync + 'static,
{
Self::new_with_default_ns(service, "default")
}

/// Create and initialize a [`Client`] using the given `Service` and the default namespace.
fn new_with_default_ns<S, B, T: Into<String>>(service: S, default_ns: T) -> Self
pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
where
S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: std::error::Error + Send + Sync + 'static,
T: Into<String>
{
// Transform response body to `hyper::Body` and use type erased error to avoid type parameters.
let service = MapResponseBodyLayer::new(|b: B| Body::wrap_stream(b.into_stream()))
.layer(service)
.map_err(|e| e.into());
Self {
inner: Buffer::new(BoxService::new(service), 1024),
default_ns: default_ns.into(),
default_ns: default_namespace.into(),
}
}

Expand Down Expand Up @@ -432,7 +420,7 @@ impl TryFrom<Config> for Client {
use tracing::Span;

let timeout = config.timeout;
let default_ns = config.default_ns.clone();
let default_ns = config.default_namespace.clone();

let client: hyper::Client<_, Body> = {
let mut connector = HttpConnector::new();
Expand Down Expand Up @@ -518,7 +506,7 @@ impl TryFrom<Config> for Client {
}),
)
.service(client);
Ok(Self::new_with_default_ns(service, default_ns))
Ok(Self::new(service, default_ns))
}
}

Expand Down Expand Up @@ -616,7 +604,7 @@ mod tests {
);
});

let pods: Api<Pod> = Api::namespaced(Client::new(mock_service), "default");
let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "default"));
let pod = pods.get("test").await.unwrap();
assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
spawned.await.unwrap();
Expand Down
12 changes: 6 additions & 6 deletions kube/src/config/mod.rs
Expand Up @@ -23,7 +23,7 @@ pub struct Config {
/// The configured cluster url
pub cluster_url: http::Uri,
/// The configured default namespace
pub default_ns: String,
pub default_namespace: String,
/// The configured root certificate
pub root_cert: Option<Vec<Vec<u8>>>,
/// Timeout for calls to the Kubernetes API.
Expand Down Expand Up @@ -51,7 +51,7 @@ impl Config {
pub fn new(cluster_url: http::Uri) -> Self {
Self {
cluster_url,
default_ns: String::from("default"),
default_namespace: String::from("default"),
root_cert: None,
timeout: Some(DEFAULT_TIMEOUT),
accept_invalid_certs: false,
Expand Down Expand Up @@ -97,7 +97,7 @@ impl Config {
})?;
let cluster_url = cluster_url.parse::<http::Uri>()?;

let default_ns = incluster_config::load_default_ns()
let default_namespace = incluster_config::load_default_ns()
.map_err(Box::new)
.map_err(ConfigError::InvalidInClusterNamespace)?;

Expand All @@ -109,7 +109,7 @@ impl Config {

Ok(Self {
cluster_url,
default_ns,
default_namespace,
root_cert: Some(root_cert),
timeout: Some(DEFAULT_TIMEOUT),
accept_invalid_certs: false,
Expand Down Expand Up @@ -144,7 +144,7 @@ impl Config {
async fn new_from_loader(loader: ConfigLoader) -> Result<Self> {
let cluster_url = loader.cluster.server.parse::<http::Uri>()?;

let default_ns = loader
let default_namespace = loader
.current_context
.namespace
.clone()
Expand Down Expand Up @@ -175,7 +175,7 @@ impl Config {

Ok(Self {
cluster_url,
default_ns,
default_namespace,
root_cert,
timeout: Some(DEFAULT_TIMEOUT),
accept_invalid_certs,
Expand Down

0 comments on commit 79e7e10

Please sign in to comment.