Skip to content

Commit

Permalink
Switch to custom tracing with callbacks
Browse files Browse the repository at this point in the history
`kube=debug` can be used as before.
  • Loading branch information
kazk committed Jun 2, 2021
1 parent c361079 commit 5d52d12
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 24 deletions.
27 changes: 13 additions & 14 deletions examples/pod_api.rs
@@ -1,4 +1,3 @@
#[macro_use] extern crate log;
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Pod;
use serde_json::json;
Expand All @@ -10,16 +9,16 @@ use kube::{

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug,tower_http=debug");
env_logger::init();
std::env::set_var("RUST_LOG", "info,kube=debug");
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or("default".into());

// Manage pods
let pods: Api<Pod> = Api::namespaced(client, &namespace);

// Create Pod blog
info!("Creating Pod instance blog");
tracing::info!("Creating Pod instance blog");
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
Expand All @@ -37,7 +36,7 @@ async fn main() -> anyhow::Result<()> {
Ok(o) => {
let name = o.name();
assert_eq!(p.name(), name);
info!("Created {}", name);
tracing::info!("Created {}", name);
// wait for it..
std::thread::sleep(std::time::Duration::from_millis(5_000));
}
Expand All @@ -52,28 +51,28 @@ async fn main() -> anyhow::Result<()> {
let mut stream = pods.watch(&lp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => info!("Added {}", o.name()),
WatchEvent::Added(o) => tracing::info!("Added {}", o.name()),
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
let phase = s.phase.clone().unwrap_or_default();
info!("Modified: {} with phase: {}", o.name(), phase);
tracing::info!("Modified: {} with phase: {}", o.name(), phase);
}
WatchEvent::Deleted(o) => info!("Deleted {}", o.name()),
WatchEvent::Error(e) => error!("Error {}", e),
WatchEvent::Deleted(o) => tracing::info!("Deleted {}", o.name()),
WatchEvent::Error(e) => tracing::error!("Error {}", e),
_ => {}
}
}

// Verify we can get it
info!("Get Pod blog");
tracing::info!("Get Pod blog");
let p1cpy = pods.get("blog").await?;
if let Some(spec) = &p1cpy.spec {
info!("Got blog pod with containers: {:?}", spec.containers);
tracing::info!("Got blog pod with containers: {:?}", spec.containers);
assert_eq!(spec.containers[0].name, "blog");
}

// Replace its spec
info!("Patch Pod blog");
tracing::info!("Patch Pod blog");
let patch = json!({
"metadata": {
"resourceVersion": p1cpy.resource_version(),
Expand All @@ -88,14 +87,14 @@ async fn main() -> anyhow::Result<()> {

let lp = ListParams::default().fields(&format!("metadata.name={}", "blog")); // only want results for our pod
for p in pods.list(&lp).await? {
info!("Found Pod: {}", p.name());
tracing::info!("Found Pod: {}", p.name());
}

// Delete it
let dp = DeleteParams::default();
pods.delete("blog", &dp).await?.map_left(|pdel| {
assert_eq!(pdel.name(), "blog");
info!("Deleting blog pod started: {:?}", pdel);
tracing::info!("Deleting blog pod started: {:?}", pdel);
});

Ok(())
Expand Down
54 changes: 44 additions & 10 deletions kube/src/client/mod.rs
Expand Up @@ -28,11 +28,8 @@ use tokio_util::{
};
use tower::{buffer::Buffer, util::BoxService, BoxError, Layer, Service, ServiceBuilder, ServiceExt};
use tower_http::{
map_response_body::MapResponseBodyLayer,
trace::{DefaultOnRequest, DefaultOnResponse, TraceLayer},
LatencyUnit,
classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer,
};
use tracing::Level;

use crate::{
api::WatchEvent,
Expand Down Expand Up @@ -401,6 +398,11 @@ impl TryFrom<Config> for Client {

/// Convert [`Config`] into a [`Client`]
fn try_from(config: Config) -> Result<Self> {
use std::time::Duration;

use http::header::HeaderMap;
use tracing::Span;

let cluster_url = config.cluster_url.clone();
let mut default_headers = config.headers.clone();
let timeout = config.timeout;
Expand Down Expand Up @@ -470,13 +472,45 @@ impl TryFrom<Config> for Client {
.layer(common)
.option_layer(maybe_auth)
.layer(
// TODO Add OTEL attributes? https://github.com/clux/kube-rs/issues/457
// - https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
// - https://docs.rs/tracing-opentelemetry/0.13.0/tracing_opentelemetry/#semantic-conventions
// - https://docs.rs/tracing-opentelemetry/0.13.0/tracing_opentelemetry/#special-fields
TraceLayer::new_for_http()
.on_request(DefaultOnRequest::new().level(Level::DEBUG))
.on_response(
DefaultOnResponse::new()
.level(Level::DEBUG)
.latency_unit(LatencyUnit::Millis),
),
.make_span_with(|req: &Request<hyper::Body>| {
tracing::debug_span!(
"HTTP",
http.method = %req.method(),
http.uri = %req.uri(),
http.status_code = tracing::field::Empty,
)
})
.on_request(|_req: &Request<hyper::Body>, _span: &Span| tracing::debug!("requesting"))
.on_response(|res: &Response<hyper::Body>, latency: Duration, span: &Span| {
span.record("http.status_code", &res.status().as_u16());
tracing::debug!("finished in {}ms", latency.as_millis())
})
// Explicitly disable `on_body_chunk`. The default does nothing.
.on_body_chunk(())
.on_eos(|_: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
tracing::debug!("stream ended after {}ms", stream_duration.as_millis())
})
.on_failure(|ec: ServerErrorsFailureClass, latency: Duration, span: &Span| {
// Called when
// - Calling the inner service errored
// - Polling `Body` errored
// - the response was classified as failure (5xx)
// - End of stream was classified as failure
match ec {
ServerErrorsFailureClass::StatusCode(status) => {
span.record("http.status_code", &status.as_u16());
tracing::error!("failed in {}ms {}", latency.as_millis(), status)
}
ServerErrorsFailureClass::Error(err) => {
tracing::error!("failed in {}ms {}", latency.as_millis(), err)
}
}
}),
)
.service(client);
Ok(Self::new_with_default_ns(inner, default_ns))
Expand Down

0 comments on commit 5d52d12

Please sign in to comment.