Skip to content

Commit

Permalink
Merge pull request #540 from kazk/polish-tls-support
Browse files Browse the repository at this point in the history
  • Loading branch information
kazk committed Jun 5, 2021
2 parents 463205c + 38782b5 commit afd032d
Show file tree
Hide file tree
Showing 37 changed files with 1,054 additions and 619 deletions.
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -157,12 +157,12 @@ Kube has basic support ([with caveats](https://github.com/clux/kube-rs/issues?q=

```toml
[dependencies]
kube = { version = "0.55.0", default-features = false, features = ["rustls-tls"] }
kube-runtime = { version = "0.55.0", default-features = false, features = ["rustls-tls"] }
kube = { version = "0.55.0", default-features = false, features = ["client", "rustls-tls"] }
kube-runtime = { version = "0.55.0" }
k8s-openapi = { version = "0.11.0", default-features = false, features = ["v1_20"] }
```

This will pull in `hyper-rustls` and `tokio-rustls`.
This will pull in `rustls` and `hyper-rustls`.

## musl-libc
Kube will work with [distroless](https://github.com/clux/controller-rs/blob/master/Dockerfile), [scratch](https://github.com/constellation-rs/constellation/blob/27dc89d0d0e34896fd37d638692e7dfe60a904fc/Dockerfile), and `alpine` (it's also possible to use alpine as a builder [with some caveats](https://github.com/clux/kube-rs/issues/331#issuecomment-715962188)).
Expand Down
26 changes: 21 additions & 5 deletions examples/Cargo.toml
Expand Up @@ -13,17 +13,20 @@ edition = "2018"
default = ["native-tls", "schema", "kubederive", "ws"]
kubederive = ["kube/derive"] # by default import kube-derive with its default features
schema = ["kube-derive/schema"] # crd_derive_no_schema shows how to opt out
native-tls = ["kube/native-tls", "kube-runtime/native-tls"]
rustls-tls = ["kube/rustls-tls", "kube-runtime/rustls-tls"]
native-tls = ["kube/client", "kube/native-tls"]
rustls-tls = ["kube/client", "kube/rustls-tls"]
ws = ["kube/ws"]

[dependencies]
tokio-util = "0.6.0"

[dev-dependencies]
anyhow = "1.0.37"
env_logger = "0.8.2"
futures = "0.3.8"
kube = { path = "../kube", version = "^0.55.0", default-features = false, features = ["admission"] }
kube-derive = { path = "../kube-derive", version = "^0.55.0", default-features = false } # only needed to opt out of schema
kube-runtime = { path = "../kube-runtime", version = "^0.55.0", default-features = false }
kube-runtime = { path = "../kube-runtime", version = "^0.55.0" }
kube-core = { path = "../kube-core", version = "^0.55.0", default-features = false }
k8s-openapi = { version = "0.11.0", features = ["v1_20"], default-features = false }
log = "0.4.11"
Expand All @@ -43,6 +46,9 @@ tracing-subscriber = "0.2"
warp = { version = "0.3", features = ["tls"] }
http = "0.2.3"
json-patch = "0.2.6"
tower = { version = "0.4.6" }
tower-http = { version = "0.1.0", features = ["trace", "decompression-gzip"] }
hyper = { version = "0.14.2", features = ["client", "http1", "stream", "tcp"] }

[[example]]
name = "configmapgen_controller"
Expand Down Expand Up @@ -160,5 +166,15 @@ path = "secret_reflector.rs"
name = "admission_controller"
path = "admission_controller.rs"

[dependencies]
tokio-util = "0.6.0"
[[example]]
name = "custom_client"
path = "custom_client.rs"

[[example]]
name = "custom_client_tls"
path = "custom_client_tls.rs"
required-features = ["native-tls", "rustls-tls"]

[[example]]
name = "custom_client_trace"
path = "custom_client_trace.rs"
31 changes: 31 additions & 0 deletions examples/custom_client.rs
@@ -0,0 +1,31 @@
// Minimal custom client example.
use k8s_openapi::api::core::v1::ConfigMap;
use tower::ServiceBuilder;

use kube::{
api::{Api, ListParams},
client::ConfigExt,
Client, Config,
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
tracing_subscriber::fmt::init();

let config = Config::infer().await?;
let https = config.native_tls_https_connector()?;
let client = Client::new(
ServiceBuilder::new()
.layer(config.base_uri_layer())
.option_layer(config.auth_layer()?)
.service(hyper::Client::builder().build(https)),
);

let cms: Api<ConfigMap> = Api::namespaced(client, "default");
for cm in cms.list(&ListParams::default()).await? {
println!("{:?}", cm);
}

Ok(())
}
44 changes: 44 additions & 0 deletions examples/custom_client_tls.rs
@@ -0,0 +1,44 @@
// Custom client supporting both native-tls and rustls-tls
// Must enable `rustls-tls` feature to run this.
// Run with `USE_RUSTLS=1` to pick rustls.
use k8s_openapi::api::core::v1::ConfigMap;
use tower::ServiceBuilder;

use kube::{
api::{Api, ListParams},
client::ConfigExt,
Client, Config,
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
tracing_subscriber::fmt::init();

let config = Config::infer().await?;

// Pick TLS at runtime
let use_rustls = std::env::var("USE_RUSTLS").map(|s| s == "1").unwrap_or(false);
let client = if use_rustls {
let https = config.rustls_https_connector()?;
Client::new(
ServiceBuilder::new()
.layer(config.base_uri_layer())
.service(hyper::Client::builder().build(https)),
)
} else {
let https = config.native_tls_https_connector()?;
Client::new(
ServiceBuilder::new()
.layer(config.base_uri_layer())
.service(hyper::Client::builder().build(https)),
)
};

let cms: Api<ConfigMap> = Api::namespaced(client, "default");
for cm in cms.list(&ListParams::default()).await? {
println!("{:?}", cm);
}

Ok(())
}
65 changes: 65 additions & 0 deletions examples/custom_client_trace.rs
@@ -0,0 +1,65 @@
// Custom client example with TraceLayer.
use std::time::Duration;

use http::{Request, Response};
use hyper::Body;
use k8s_openapi::api::core::v1::ConfigMap;
use tower::ServiceBuilder;
use tower_http::{decompression::DecompressionLayer, trace::TraceLayer};
use tracing::Span;

use kube::{
api::{Api, ListParams},
client::ConfigExt,
Client, Config,
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug,custom_client_trace=debug");
tracing_subscriber::fmt::init();

let config = Config::infer().await?;
let https = config.native_tls_https_connector()?;
let client = Client::new(
ServiceBuilder::new()
.layer(config.base_uri_layer())
// Add `DecompressionLayer` to make request headers interesting.
.layer(DecompressionLayer::new())
.layer(
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-client
TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
tracing::debug_span!(
"HTTP",
http.method = %request.method(),
http.url = %request.uri(),
http.status_code = tracing::field::Empty,
otel.name = %format!("HTTP {}", request.method()),
otel.kind = "client",
otel.status_code = tracing::field::Empty,
)
})
.on_request(|request: &Request<Body>, _span: &Span| {
tracing::debug!("payload: {:?} headers: {:?}", request.body(), request.headers())
})
.on_response(|response: &Response<Body>, latency: Duration, span: &Span| {
let status = response.status();
span.record("http.status_code", &status.as_u16());
if status.is_client_error() || status.is_server_error() {
span.record("otel.status_code", &"ERROR");
}
tracing::debug!("finished in {}ms", latency.as_millis())
}),
)
.service(hyper::Client::builder().build(https)),
);

let cms: Api<ConfigMap> = Api::namespaced(client, "default");
for cm in cms.list(&ListParams::default()).await? {
println!("{:?}", cm);
}

Ok(())
}
25 changes: 12 additions & 13 deletions examples/pod_api.rs
@@ -1,4 +1,3 @@
#[macro_use] extern crate log;
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Pod;
use serde_json::json;
Expand All @@ -11,15 +10,15 @@ use kube::{
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
env_logger::init();
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or("default".into());

// Manage pods
let pods: Api<Pod> = Api::namespaced(client, &namespace);

// Create Pod blog
info!("Creating Pod instance blog");
tracing::info!("Creating Pod instance blog");
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
Expand All @@ -37,7 +36,7 @@ async fn main() -> anyhow::Result<()> {
Ok(o) => {
let name = o.name();
assert_eq!(p.name(), name);
info!("Created {}", name);
tracing::info!("Created {}", name);
// wait for it..
std::thread::sleep(std::time::Duration::from_millis(5_000));
}
Expand All @@ -52,28 +51,28 @@ async fn main() -> anyhow::Result<()> {
let mut stream = pods.watch(&lp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => info!("Added {}", o.name()),
WatchEvent::Added(o) => tracing::info!("Added {}", o.name()),
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
let phase = s.phase.clone().unwrap_or_default();
info!("Modified: {} with phase: {}", o.name(), phase);
tracing::info!("Modified: {} with phase: {}", o.name(), phase);
}
WatchEvent::Deleted(o) => info!("Deleted {}", o.name()),
WatchEvent::Error(e) => error!("Error {}", e),
WatchEvent::Deleted(o) => tracing::info!("Deleted {}", o.name()),
WatchEvent::Error(e) => tracing::error!("Error {}", e),
_ => {}
}
}

// Verify we can get it
info!("Get Pod blog");
tracing::info!("Get Pod blog");
let p1cpy = pods.get("blog").await?;
if let Some(spec) = &p1cpy.spec {
info!("Got blog pod with containers: {:?}", spec.containers);
tracing::info!("Got blog pod with containers: {:?}", spec.containers);
assert_eq!(spec.containers[0].name, "blog");
}

// Replace its spec
info!("Patch Pod blog");
tracing::info!("Patch Pod blog");
let patch = json!({
"metadata": {
"resourceVersion": p1cpy.resource_version(),
Expand All @@ -88,14 +87,14 @@ async fn main() -> anyhow::Result<()> {

let lp = ListParams::default().fields(&format!("metadata.name={}", "blog")); // only want results for our pod
for p in pods.list(&lp).await? {
info!("Found Pod: {}", p.name());
tracing::info!("Found Pod: {}", p.name());
}

// Delete it
let dp = DeleteParams::default();
pods.delete("blog", &dp).await?.map_left(|pdel| {
assert_eq!(pdel.name(), "blog");
info!("Deleting blog pod started: {:?}", pdel);
tracing::info!("Deleting blog pod started: {:?}", pdel);
});

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion kube-core/Cargo.toml
Expand Up @@ -21,7 +21,7 @@ serde = { version = "1.0.118", features = ["derive"] }
serde_json = "1.0.61"
thiserror = "1.0.23"
once_cell = "1.7.2"
url = "2.2.0"
form_urlencoded = "1.0.1"
http = "0.2.2"
json-patch = { version = "0.2.6", optional = true }

Expand Down
2 changes: 1 addition & 1 deletion kube-core/src/params.rs
Expand Up @@ -287,7 +287,7 @@ impl PatchParams {
Ok(())
}

pub(crate) fn populate_qp(&self, qp: &mut url::form_urlencoded::Serializer<String>) {
pub(crate) fn populate_qp(&self, qp: &mut form_urlencoded::Serializer<String>) {
if self.dry_run {
qp.append_pair("dryRun", "All");
}
Expand Down

0 comments on commit afd032d

Please sign in to comment.