diff --git a/Cargo.lock b/Cargo.lock index 593b5dc091..a4dcfbbd90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -54,6 +54,12 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" +[[package]] +name = "arrayvec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" + [[package]] name = "async-stream" version = "0.2.1" @@ -240,6 +246,16 @@ dependencies = [ "build_const", ] +[[package]] +name = "crossbeam-channel" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" +dependencies = [ + "crossbeam-utils", + "maybe-uninit", +] + [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -308,6 +324,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fac2277e84e5e858483756647a9d0aa8d9a2b7cba517fd84325a0aaa69a0909" dependencies = [ "libc", + "miniz-sys", "miniz_oxide_c_api", ] @@ -518,6 +535,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "hdrhistogram" +version = "7.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3c22708574c44e924720c5b3a116326c688e6d532f438c77c007ec8768644f9" +dependencies = [ + "base64 0.12.3", + "byteorder", + "crossbeam-channel", + "flate2", + "nom 5.1.2", + "num-traits 0.2.6", +] + [[package]] name = "heck" version = "0.3.0" @@ -736,6 +767,19 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lexical-core" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db65c6da02e61f55dae90a0ae427b2a5f6b3e8db09f58d10efab23af92592616" +dependencies = [ + "arrayvec", + "bitflags", + "cfg-if 0.1.10", + "ryu", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.76" @@ -1160,10 +1204,13 @@ version = "0.1.0" dependencies = [ "deflate", "futures 0.3.5", + "hdrhistogram", "http 0.2.1", "hyper", "indexmap", + "parking_lot", "quickcheck", + "tokio", "tracing", ] @@ -1609,6 +1656,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "100aabe6b8ff4e4a7e32c1c13523379802df0772b82466207ac25b013f193376" +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "memchr" version = "0.1.11" @@ -1633,6 +1686,16 @@ dependencies = [ "libmimalloc-sys", ] +[[package]] +name = "miniz-sys" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9e3ae51cea1576ceba0dde3d484d30e6e5b86dee0b2d412fe3a16a15c98202" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "miniz_oxide" version = "0.1.2" @@ -1742,6 +1805,17 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf51a729ecf40266a2368ad335a5fdde43471f545a967109cd62146ecf8b66ff" +[[package]] +name = "nom" +version = "5.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb4262d26ed83a1c0a33a38fe2bb15797329c85770da05e6b828ddb782627af" +dependencies = [ + "lexical-core", + "memchr 2.3.3", + "version_check", +] + [[package]] name = "num-integer" version = "0.1.39" @@ -1908,7 +1982,7 @@ checksum = "6ab1427f3d2635891f842892dda177883dca0639e05fe66796a62c9d2f23b49c" dependencies = [ "byteorder", "libc", - "nom", + "nom 2.2.1", "rustc_version", ] @@ -2394,6 +2468,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "string" version = "0.2.0" @@ -2913,6 +2993,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f2c54fe5e8d6907c60dc6fba532cc8529245d97ff4e26cb490cb462de114ba4" +[[package]] +name = "version_check" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" + [[package]] name = "want" version = "0.3.0" diff --git a/linkerd/metrics/Cargo.toml b/linkerd/metrics/Cargo.toml index ec69a9c767..e317c0d0df 100644 --- a/linkerd/metrics/Cargo.toml +++ b/linkerd/metrics/Cargo.toml @@ -7,15 +7,20 @@ publish = false [features] default = [] +summary = ["hdrhistogram", "parking_lot", "tokio"] test_util = [] [dependencies] deflate = { version = "0.7.18", features = ["gzip"] } futures = "0.3" +hdrhistogram = { version = "7.1", optional = true } http = "0.2" hyper = "0.13.7" indexmap = "1.0" +parking_lot = { version = "0.11", optional = true } +tokio = { version = "0.2", features = ["time"], optional = true } tracing = "0.1.2" [dev-dependencies] quickcheck = { version = "0.9", default-features = false } +tokio = { version = "0.2", features = ["macros", "test-util", "time"] } diff --git a/linkerd/metrics/src/histogram.rs b/linkerd/metrics/src/histogram.rs index 6f3dfdfedf..0fdbdf72e8 100644 --- a/linkerd/metrics/src/histogram.rs +++ b/linkerd/metrics/src/histogram.rs @@ -45,9 +45,6 @@ pub enum Bucket { #[derive(Debug)] pub struct Bounds(pub &'static [Bucket]); -/// Helper that lazily formats metric keys as {0}_{1}. -struct Key(A, B); - /// Helper that lazily formats an `{K}="{V}"`" label. struct Label(K, V); @@ -189,11 +186,10 @@ impl, F: Factor> FmtMetric for Histogram { let total = Counter::::new(); for (le, count) in self { total.add(count.into()); - total.fmt_metric_labeled(f, Key(&name, "bucket"), Label("le", le))?; + total.fmt_metric_labeled(f, format_args!("{}_bucket", &name), Label("le", le))?; } - total.fmt_metric(f, Key(&name, "count"))?; - self.sum.fmt_metric(f, Key(&name, "sum"))?; - + total.fmt_metric(f, format_args!("{}_count", &name))?; + self.sum.fmt_metric(f, format_args!("{}_sum", &name))?; Ok(()) } @@ -210,23 +206,19 @@ impl, F: Factor> FmtMetric for Histogram { let total = Counter::::new(); for (le, count) in self { total.add(count.into()); - total.fmt_metric_labeled(f, Key(&name, "bucket"), (&labels, Label("le", le)))?; + total.fmt_metric_labeled( + f, + format_args!("{}_bucket", &name), + (&labels, Label("le", le)), + )?; } - total.fmt_metric_labeled(f, Key(&name, "count"), &labels)?; - self.sum.fmt_metric_labeled(f, Key(&name, "sum"), &labels)?; - + total.fmt_metric_labeled(f, format_args!("{}_count", &name), &labels)?; + self.sum + .fmt_metric_labeled(f, format_args!("{}_sum", &name), &labels)?; Ok(()) } } -// ===== impl Key ===== - -impl fmt::Display for Key { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}_{}", self.0, self.1) - } -} - // ===== impl Label ===== impl FmtLabels for Label { diff --git a/linkerd/metrics/src/lib.rs b/linkerd/metrics/src/lib.rs index afbba44ad7..25b2ed9bc8 100644 --- a/linkerd/metrics/src/lib.rs +++ b/linkerd/metrics/src/lib.rs @@ -9,6 +9,8 @@ pub mod latency; mod prom; mod scopes; mod serve; +#[cfg(feature = "summary")] +mod summary; pub use self::counter::Counter; pub use self::gauge::Gauge; @@ -16,14 +18,16 @@ pub use self::histogram::Histogram; pub use self::prom::{FmtLabels, FmtMetric, FmtMetrics, Metric}; pub use self::scopes::Scopes; pub use self::serve::Serve; +#[cfg(feature = "summary")] +pub use self::summary::Summary; #[macro_export] macro_rules! metrics { { $( $name:ident : $kind:ty { $help:expr } ),+ } => { $( #[allow(non_upper_case_globals)] - const $name: ::linkerd2_metrics::Metric<'static, &str, $kind> = - ::linkerd2_metrics::Metric { + const $name: $crate::Metric<'static, &str, $kind> = + $crate::Metric { name: stringify!($name), help: $help, _p: ::std::marker::PhantomData, diff --git a/linkerd/metrics/src/prom.rs b/linkerd/metrics/src/prom.rs index 2c889cb6ec..cf15c42454 100644 --- a/linkerd/metrics/src/prom.rs +++ b/linkerd/metrics/src/prom.rs @@ -94,6 +94,16 @@ impl<'a, N: fmt::Display, M: FmtMetric> Metric<'a, N, M> { metric.fmt_metric(f, &self.name) } + /// Formats a single metric with labels. + pub fn fmt_metric_labeled( + &self, + f: &mut fmt::Formatter<'_>, + metric: &M, + labels: &L, + ) -> fmt::Result { + metric.fmt_metric_labeled(f, &self.name, labels) + } + /// Formats a single metric across labeled scopes. pub fn fmt_scopes<'s, L, S: 's, I, F>( &self, diff --git a/linkerd/metrics/src/summary.rs b/linkerd/metrics/src/summary.rs new file mode 100644 index 0000000000..e8c422ace9 --- /dev/null +++ b/linkerd/metrics/src/summary.rs @@ -0,0 +1,386 @@ +// This module is inspired by hdrhistogram-go, which is distributed under the +// MIT license. Copyright (c) 2014 Coda Hale + +use crate::{Counter, Factor, FmtLabels, FmtMetric}; +pub use hdrhistogram::{AdditionError, CreationError, Histogram, RecordError}; +use parking_lot::{MappedMutexGuard, Mutex, MutexGuard}; +use std::fmt; +use tokio::time; + +/// Summarizes a distribution of values at fixed quantiles over a sliding window. +#[derive(Debug)] +pub struct Summary { + rotate_interval: time::Duration, + + windows: Mutex, + + /// Instead of allocating a new histogram each time a report is formatted, we + /// hold a single report and reset/repopulate it from the active windows. + report: Mutex>, + quantiles: Box<[f64]>, + + /// Count is tracked independently of the histogams so that rotated values + /// are included. + count: Counter, + sum: Counter, +} + +#[derive(Debug)] +struct Windows { + /// A ring buffer of active histograms window. Every `rotate_interval` the + /// index is advanced and the oldest histogram is reset and reused for new + /// values. `window_idx` always refers to the index of the newest active + /// histogram and `window_idx + 1 % windows.len()` is the oldest active + /// histogram. + active: Vec>, + idx: usize, + next_rotate: time::Instant, +} + +/// Helper that lazily formats `quantile` labels. +struct FmtQuantile<'q>(&'q f64); + +// === impl Summary === + +impl Summary { + const DEFAULT_QUANTILES: [f64; 7] = [0.0, 0.50, 0.75, 0.90, 0.99, 0.999, 1.0]; + + /// Creates a new summary with the specified number of windows. Values are + /// are included for at most `lifetime`. + /// + /// Histograms are automatically resized to accomodate values, + pub fn new_resizable( + n_windows: u32, + lifetime: time::Duration, + sigfig: u8, + ) -> Result { + let h = Histogram::new(sigfig)?; + Ok(Self::new_inner(n_windows, lifetime, h)) + } + + /// Creates a new summary with the specified number of windows. Values are + /// are included for at most `lifetime`. + /// + /// Histograms are **not** automatically resized to accomodate values, + pub fn new_with_max( + n_windows: u32, + lifetime: time::Duration, + high: u64, + sigfig: u8, + ) -> Result { + let h = Histogram::new_with_max(high, sigfig)?; + Ok(Self::new_inner(n_windows, lifetime, h)) + } + + fn new_inner(n_windows: u32, lifetime: time::Duration, histogram: Histogram) -> Self { + debug_assert!(n_windows > 0); + let rotate_interval = lifetime / n_windows; + + let windows = { + let mut active = Vec::with_capacity(n_windows as usize); + for _ in 0..n_windows { + active.push(histogram.clone()); + } + let next_rotate = time::Instant::now() + rotate_interval; + Mutex::new(Windows { + active, + idx: 0, + next_rotate, + }) + }; + + let report = Mutex::new(histogram); + + Self { + windows, + rotate_interval, + + report, + quantiles: Box::new(Self::DEFAULT_QUANTILES.clone()), + + count: Counter::new(), + sum: Counter::new(), + } + } + + /// Overrides the default quantiles to be reported. + pub fn with_quantiles(mut self, qs: impl IntoIterator) -> Self { + self.quantiles = qs.into_iter().collect(); + self + } + + /// Record a value in the current histogram. + /// + /// Histograms are rotated as needed. + #[inline] + pub fn record(&self, v: u64) -> Result<(), RecordError> { + self.record_n(v, 1) + } + + /// Record values in the current histogram. + /// + /// Histograms are rotated as needed. + #[inline] + pub fn record_n(&self, v: u64, n: usize) -> Result<(), RecordError> { + self.rotated_window_mut().record_n(v, n as u64)?; + self.sum.add(v * n as u64); + self.count.add(n as u64); + Ok(()) + } + + /// Record a value in the current histogram. + /// + /// Histograms are rotated as needed. If the value exceeds the maximum, it is + /// clamped to the upper bound. This should not be used with resizable + /// summaries. + #[inline] + pub fn saturating_record(&self, v: u64) { + self.saturating_record_n(v, 1); + } + + /// Record values in the current histogram. + /// + /// Histograms are rotated as needed. Values that exceed the maximum are + /// clamped to the upper bound. This should not be used with resizable + /// summaries. + #[inline] + pub fn saturating_record_n(&self, v: u64, n: usize) { + self.rotated_window_mut().saturating_record_n(v, n as u64); + self.sum.add(v * n as u64); + self.count.add(n as u64); + } + + /// Get a mutable reference to the current histogram, rotating windows as + /// necessary. + #[inline] + fn rotated_window_mut(&self) -> MappedMutexGuard<'_, Histogram> { + let mut w = self.windows.write(); + let now = time::Instant::now(); + if now >= w.next_rotate { + // Advance windows per elapsed time. If the more than one interval + // has elapsed, clear out intermediate windows so reports only + // include values in the max lifetime. + let rotations = + ((now - w.next_rotate).as_millis() / self.rotate_interval.as_millis()) + 1; + for _ in 0..rotations { + let i = (w.idx + 1) % w.active.len(); + w.active[i].reset(); + w.idx = i; + } + w.next_rotate = now + self.rotate_interval; + } + + MutexGuard::map(w, |w| &mut w.active[w.idx]) + } + + /// Lock the inner report, clear it, and repopulate from the active windows. + fn lock_report(&self) -> MutexGuard<'_, Histogram> { + let mut report = self.report.lock(); + // Remove all values from the merged + report.reset(); + let windows = self.windows.read(); + for w in windows.active.iter() { + // The report histogram must have been created with the same + // configuration as the other histograms, so they all either share a + // max value or are all resizable. + report.add(w).expect("Histograms must merge"); + } + report + } +} + +impl FmtMetric for Summary { + const KIND: &'static str = "summary"; + + fn fmt_metric(&self, f: &mut fmt::Formatter<'_>, name: N) -> fmt::Result { + { + let report = self.lock_report(); + for q in self.quantiles.iter() { + let v = Counter::::from(report.value_at_quantile(*q)); + v.fmt_metric_labeled(f, &name, FmtQuantile(q))?; + } + } + self.count.fmt_metric(f, format_args!("{}_count", name))?; + self.sum.fmt_metric(f, format_args!("{}_sum", name))?; + Ok(()) + } + + fn fmt_metric_labeled( + &self, + f: &mut fmt::Formatter<'_>, + name: N, + labels: L, + ) -> fmt::Result + where + N: fmt::Display, + L: FmtLabels, + { + { + let report = self.lock_report(); + for q in self.quantiles.iter() { + let v = Counter::::from(report.value_at_quantile(*q)); + v.fmt_metric_labeled(f, &name, (FmtQuantile(q), &labels))?; + } + } + self.count + .fmt_metric_labeled(f, format_args!("{}_count", name), &labels)?; + self.sum + .fmt_metric_labeled(f, format_args!("{}_sum", name), &labels)?; + Ok(()) + } +} + +// === impl FmtQuantile === + +impl<'q> FmtLabels for FmtQuantile<'q> { + fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "quantile=\"{}\"", self.0) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{FmtMetrics, MillisAsSeconds}; + use tokio::time; + + crate::metrics! { + basic: Summary { "A simple summary" }, + scaled: Summary { "A summary of millis as seconds" } + } + + struct Fmt { + basic: Summary, + scaled: Summary, + } + + impl FmtMetrics for Fmt { + fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + struct Label; + impl FmtLabels for Label { + fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "k=\"v\"") + } + } + + basic.fmt_help(f)?; + basic.fmt_metric(f, &self.basic)?; + scaled.fmt_help(f)?; + scaled.fmt_metric_labeled(f, &self.scaled, &Label)?; + Ok(()) + } + } + + #[test] + fn fmt() { + let output = { + let mut f = Fmt { + basic: Summary::new_resizable(2, time::Duration::from_secs(2), 5).unwrap(), + scaled: Summary::new_resizable(2, time::Duration::from_secs(2), 5).unwrap(), + }; + + record(&mut f.basic).unwrap(); + record(&mut f.scaled).unwrap(); + f.as_display() + .to_string() + .lines() + .map(|s| s.to_string()) + .collect::>() + }; + + for (output, expected) in output.iter().zip(EXPECTED) { + assert_eq!(output, expected); + } + assert_eq!(output.len(), EXPECTED.len()); + + fn record(s: &mut Summary) -> Result<(), RecordError> { + s.record_n(1, 2500)?; + s.record_n(2, 2500)?; + s.record_n(10, 2500)?; + s.record_n(100, 1500)?; + s.record_n(1000, 900)?; + s.record_n(10000, 90)?; + s.record_n(100000, 9)?; + s.record(100001)?; + Ok(()) + } + + const EXPECTED: &'static [&'static str] = &[ + "# HELP basic A simple summary", + "# TYPE basic summary", + "basic{quantile=\"0\"} 1", + "basic{quantile=\"0.5\"} 2", + "basic{quantile=\"0.75\"} 10", + "basic{quantile=\"0.9\"} 100", + "basic{quantile=\"0.99\"} 1000", + "basic{quantile=\"0.999\"} 10000", + "basic{quantile=\"1\"} 100001", + "basic_count 10000", + "basic_sum 2982501", + "# HELP scaled A summary of millis as seconds", + "# TYPE scaled summary", + "scaled{quantile=\"0\",k=\"v\"} 0.001", + "scaled{quantile=\"0.5\",k=\"v\"} 0.002", + "scaled{quantile=\"0.75\",k=\"v\"} 0.01", + "scaled{quantile=\"0.9\",k=\"v\"} 0.1", + "scaled{quantile=\"0.99\",k=\"v\"} 1", + "scaled{quantile=\"0.999\",k=\"v\"} 10", + "scaled{quantile=\"1\",k=\"v\"} 100.001", + "scaled_count{k=\"v\"} 10000", + "scaled_sum{k=\"v\"} 2982.501", + ]; + } + + #[tokio::test] + async fn windows() { + time::pause(); + + const WINDOWS: u32 = 2; + const ROTATE_INTERVAL: time::Duration = time::Duration::from_secs(10); + let s = Summary::<()>::new_resizable(WINDOWS, WINDOWS * ROTATE_INTERVAL, 5).unwrap(); + + s.record_n(1, 5_000).unwrap(); + s.record_n(2, 5_000).unwrap(); + { + let h = s.lock_report(); + assert_eq!(h.value_at_quantile(0.5), 1); + assert_eq!(h.len(), 10000); + } + assert_eq!(s.count.value(), 10000.0); + assert_eq!(s.sum.value(), 15_000.0); + + time::advance(ROTATE_INTERVAL).await; + + s.record_n(1, 4999).unwrap(); + s.record_n(3, 5001).unwrap(); + { + let h = s.lock_report(); + assert_eq!(h.value_at_quantile(0.5), 2); + assert_eq!(h.len(), 20000); + } + assert_eq!(s.count.value(), 20000.0); + assert_eq!(s.sum.value(), 35_002.0); + + time::advance(ROTATE_INTERVAL).await; + + s.record_n(4, 10_000).unwrap(); + { + let h = s.lock_report(); + assert_eq!(h.value_at_quantile(0.5), 3); + assert_eq!(h.len(), 20000); + } + assert_eq!(s.count.value(), 30000.0); + assert_eq!(s.sum.value(), 75_002.0); + + time::advance(2 * ROTATE_INTERVAL).await; + + s.record_n(1, 10_000).unwrap(); + { + let h = s.lock_report(); + assert_eq!(h.value_at_quantile(0.5), 1); + assert_eq!(h.len(), 10000); + } + assert_eq!(s.count.value(), 40000.0); + assert_eq!(s.sum.value(), 85_002.0); + } +}