Skip to content

Commit

Permalink
Split custom client examples
Browse files Browse the repository at this point in the history
  • Loading branch information
kazk committed Jun 2, 2021
1 parent 79f4844 commit 9505fcb
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 51 deletions.
8 changes: 8 additions & 0 deletions examples/Cargo.toml
Expand Up @@ -172,3 +172,11 @@ path = "admission_controller.rs"
[[example]]
name = "custom_client"
path = "custom_client.rs"

[[example]]
name = "custom_client_tls"
path = "custom_client_tls.rs"

[[example]]
name = "custom_client_trace"
path = "custom_client_trace.rs"
70 changes: 19 additions & 51 deletions examples/custom_client.rs
@@ -1,21 +1,11 @@
// Run with `cargo run --example custom_client --no-default-features --features native-tls,rustls-tls`
#[macro_use] extern crate log;
// Minimal custom client example.
use futures::{StreamExt, TryStreamExt};
use hyper::client::HttpConnector;
use hyper_rustls::HttpsConnector as RustlsHttpsConnector;
use hyper_tls::HttpsConnector;
use k8s_openapi::api::core::v1::Pod;
use serde_json::json;
use tokio_native_tls::TlsConnector;
use tower::ServiceBuilder;

use tower_http::{
decompression::DecompressionLayer,
trace::{DefaultMakeSpan, DefaultOnRequest, DefaultOnResponse, TraceLayer},
LatencyUnit,
};
use tracing::Level;

use kube::{
api::{Api, DeleteParams, ListParams, PostParams, ResourceExt, WatchEvent},
service::SetBaseUriLayer,
Expand All @@ -24,44 +14,22 @@ use kube::{

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug,tower_http=trace");
env_logger::init();
std::env::set_var("RUST_LOG", "info,kube=debug");
tracing_subscriber::fmt::init();

let config = Config::infer().await?;
let cluster_url = config.cluster_url.clone();
let common = ServiceBuilder::new()
.layer(SetBaseUriLayer::new(cluster_url))
.layer(DecompressionLayer::new())
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::new().include_headers(true))
.on_request(DefaultOnRequest::new().level(Level::INFO))
.on_response(
DefaultOnResponse::new()
.level(Level::INFO)
.latency_unit(LatencyUnit::Micros),
),
)
.into_inner();
let mut http = HttpConnector::new();
http.enforce_http(false);

// Pick TLS at runtime
let use_rustls = std::env::var("USE_RUSTLS").map(|s| s == "1").unwrap_or(false);
let client = if use_rustls {
let https =
RustlsHttpsConnector::from((http, std::sync::Arc::new(config.rustls_tls_client_config()?)));
let inner = ServiceBuilder::new()
.layer(common)
.service(hyper::Client::builder().build(https));
Client::new(inner)
} else {
let https = HttpsConnector::from((http, TlsConnector::from(config.native_tls_connector()?)));
let inner = ServiceBuilder::new()
.layer(common)
.service(hyper::Client::builder().build(https));
Client::new(inner)
// Create HttpsConnector using `native_tls::TlsConnector` based on `Config`.
let https = {
let tls = tokio_native_tls::TlsConnector::from(config.native_tls_connector()?);
let mut http = HttpConnector::new();
http.enforce_http(false);
HttpsConnector::from((http, tls))
};
let client = Client::new(
ServiceBuilder::new()
.layer(SetBaseUriLayer::new(config.cluster_url))
.service(hyper::Client::builder().build(https)),
);

// Manage pods
let pods: Api<Pod> = Api::namespaced(client, "default");
Expand All @@ -78,7 +46,7 @@ async fn main() -> anyhow::Result<()> {
Ok(o) => {
let name = o.name();
assert_eq!(p.name(), name);
info!("Created {}", name);
tracing::info!("Created {}", name);
std::thread::sleep(std::time::Duration::from_millis(5_000));
}
Err(kube::Error::Api(ae)) => assert_eq!(ae.code, 409), // if you skipped delete, for instance
Expand All @@ -92,14 +60,14 @@ async fn main() -> anyhow::Result<()> {
let mut stream = pods.watch(&lp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => info!("Added {}", o.name()),
WatchEvent::Added(o) => tracing::info!("Added {}", o.name()),
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
let phase = s.phase.clone().unwrap_or_default();
info!("Modified: {} with phase: {}", o.name(), phase);
tracing::info!("Modified: {} with phase: {}", o.name(), phase);
}
WatchEvent::Deleted(o) => info!("Deleted {}", o.name()),
WatchEvent::Error(e) => error!("Error {}", e),
WatchEvent::Deleted(o) => tracing::info!("Deleted {}", o.name()),
WatchEvent::Error(e) => tracing::error!("Error {}", e),
_ => {}
}
}
Expand Down
104 changes: 104 additions & 0 deletions examples/custom_client_tls.rs
@@ -0,0 +1,104 @@
// Custom client supporting both native-tls and rustls-tls
// Run with `USE_RUSTLS=1` to pick rustls.
use std::sync::Arc;

use futures::{StreamExt, TryStreamExt};
use hyper::client::HttpConnector;
use k8s_openapi::api::core::v1::Pod;
use serde_json::json;
use tower::ServiceBuilder;

use kube::{
api::{Api, DeleteParams, ListParams, PostParams, ResourceExt, WatchEvent},
service::SetBaseUriLayer,
Client, Config,
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
tracing_subscriber::fmt::init();

let config = Config::infer().await?;

// Pick TLS at runtime
let use_rustls = std::env::var("USE_RUSTLS").map(|s| s == "1").unwrap_or(false);
let client = if use_rustls {
let https = {
let rustls_config = Arc::new(config.rustls_tls_client_config()?);
let mut http = HttpConnector::new();
http.enforce_http(false);
hyper_rustls::HttpsConnector::from((http, rustls_config))
};
Client::new(
ServiceBuilder::new()
.layer(SetBaseUriLayer::new(config.cluster_url))
.service(hyper::Client::builder().build(https)),
)
} else {
let https = {
let tls = tokio_native_tls::TlsConnector::from(config.native_tls_connector()?);
let mut http = HttpConnector::new();
http.enforce_http(false);
hyper_tls::HttpsConnector::from((http, tls))
};
Client::new(
ServiceBuilder::new()
.layer(SetBaseUriLayer::new(config.cluster_url))
.service(hyper::Client::builder().build(https)),
)
};

// Manage pods
let pods: Api<Pod> = Api::namespaced(client, "default");
// Create pod
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": { "name": "example" },
"spec": { "containers": [{ "name": "example", "image": "alpine" }] }
}))?;

let pp = PostParams::default();
match pods.create(&pp, &p).await {
Ok(o) => {
let name = o.name();
assert_eq!(p.name(), name);
tracing::info!("Created {}", name);
std::thread::sleep(std::time::Duration::from_millis(5_000));
}
Err(kube::Error::Api(ae)) => assert_eq!(ae.code, 409), // if you skipped delete, for instance
Err(e) => return Err(e.into()),
}

// Watch it phase for a few seconds
let lp = ListParams::default()
.fields(&format!("metadata.name={}", "example"))
.timeout(10);
let mut stream = pods.watch(&lp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => tracing::info!("Added {}", o.name()),
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
let phase = s.phase.clone().unwrap_or_default();
tracing::info!("Modified: {} with phase: {}", o.name(), phase);
}
WatchEvent::Deleted(o) => tracing::info!("Deleted {}", o.name()),
WatchEvent::Error(e) => tracing::error!("Error {}", e),
_ => {}
}
}

if let Some(spec) = &pods.get("example").await?.spec {
assert_eq!(spec.containers[0].name, "example");
}

pods.delete("example", &DeleteParams::default())
.await?
.map_left(|pdel| {
assert_eq!(pdel.name(), "example");
});

Ok(())
}
116 changes: 116 additions & 0 deletions examples/custom_client_trace.rs
@@ -0,0 +1,116 @@
// Custom client example with TraceLayer.
use std::time::Duration;

use futures::{StreamExt, TryStreamExt};
use http::{Request, Response};
use hyper::{client::HttpConnector, Body};
use hyper_tls::HttpsConnector;
use k8s_openapi::api::core::v1::Pod;
use serde_json::json;
use tower::ServiceBuilder;
use tower_http::{decompression::DecompressionLayer, trace::TraceLayer};
use tracing::Span;

use kube::{
api::{Api, DeleteParams, ListParams, PostParams, ResourceExt, WatchEvent},
service::SetBaseUriLayer,
Client, Config,
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug,custom_client_trace=debug");
tracing_subscriber::fmt::init();

let config = Config::infer().await?;
// Create HttpsConnector using `native_tls::TlsConnector` based on `Config`.
let https = {
let tls = tokio_native_tls::TlsConnector::from(config.native_tls_connector()?);
let mut http = HttpConnector::new();
http.enforce_http(false);
HttpsConnector::from((http, tls))
};
let client = Client::new(
ServiceBuilder::new()
.layer(SetBaseUriLayer::new(config.cluster_url))
// Add `DecompressionLayer` to make request headers interesting.
.layer(DecompressionLayer::new())
.layer(
TraceLayer::new_for_http()
.make_span_with(|_request: &Request<Body>| {
tracing::debug_span!(
"kube",
http.method = tracing::field::Empty,
http.url = tracing::field::Empty,
http.status_code = tracing::field::Empty,
)
})
.on_request(|request: &Request<Body>, span: &Span| {
span.record("http.method", &tracing::field::display(request.method()));
span.record("http.url", &tracing::field::display(request.uri()));
tracing::debug!("payload: {:?} headers: {:?}", request.body(), request.headers())
})
.on_response(|response: &Response<Body>, latency: Duration, span: &Span| {
span.record(
"http.status_code",
&tracing::field::display(response.status().as_u16()),
);
tracing::debug!("latency: {:?}", latency)
}),
)
.service(hyper::Client::builder().build(https)),
);

// Manage pods
let pods: Api<Pod> = Api::namespaced(client, "default");
// Create pod
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": { "name": "example" },
"spec": { "containers": [{ "name": "example", "image": "alpine" }] }
}))?;

let pp = PostParams::default();
match pods.create(&pp, &p).await {
Ok(o) => {
let name = o.name();
assert_eq!(p.name(), name);
tracing::info!("Created {}", name);
std::thread::sleep(std::time::Duration::from_millis(5_000));
}
Err(kube::Error::Api(ae)) => assert_eq!(ae.code, 409), // if you skipped delete, for instance
Err(e) => return Err(e.into()),
}

// Watch it phase for a few seconds
let lp = ListParams::default()
.fields(&format!("metadata.name={}", "example"))
.timeout(10);
let mut stream = pods.watch(&lp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => tracing::info!("Added {}", o.name()),
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
let phase = s.phase.clone().unwrap_or_default();
tracing::info!("Modified: {} with phase: {}", o.name(), phase);
}
WatchEvent::Deleted(o) => tracing::info!("Deleted {}", o.name()),
WatchEvent::Error(e) => tracing::error!("Error {}", e),
_ => {}
}
}

if let Some(spec) = &pods.get("example").await?.spec {
assert_eq!(spec.containers[0].name, "example");
}

pods.delete("example", &DeleteParams::default())
.await?
.map_left(|pdel| {
assert_eq!(pdel.name(), "example");
});

Ok(())
}

0 comments on commit 9505fcb

Please sign in to comment.