Skip to content

Commit

Permalink
Add Summary metric
Browse files Browse the repository at this point in the history
Signed-off-by: Palash Nigam <npalash25@gmail.com>
  • Loading branch information
palash25 committed Nov 21, 2022
1 parent 6cd0dba commit 29661c0
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -24,6 +24,7 @@ prometheus-client-derive-encode = { version = "0.3.0", path = "derive-encode" }
prost = { version = "0.11.0", optional = true }
prost-types = { version = "0.11.0", optional = true }
void = { version = "1.0", optional = true }
quantiles = "0.7.1"

[dev-dependencies]
async-std = { version = "1", features = ["attributes"] }
Expand Down
80 changes: 80 additions & 0 deletions src/encoding/text.rs
Expand Up @@ -29,6 +29,7 @@ use crate::metrics::exemplar::{CounterWithExemplar, Exemplar, HistogramWithExemp
use crate::metrics::family::{Family, MetricConstructor};
use crate::metrics::gauge::{self, Gauge};
use crate::metrics::histogram::Histogram;
use crate::metrics::summary::Summary;
use crate::metrics::info::Info;
use crate::metrics::{MetricType, TypedMetric};
use crate::registry::{Registry, Unit};
Expand Down Expand Up @@ -186,6 +187,7 @@ impl Encode for MetricType {
MetricType::Histogram => "histogram",
MetricType::Info => "info",
MetricType::Unknown => "unknown",
MetricType::Summary => "summary",
};

writer.write_all(t.as_bytes())?;
Expand Down Expand Up @@ -323,6 +325,23 @@ impl<'a> BucketEncoder<'a> {
})
}

/// Encode a quantile. Used for the [`Summary`] metric type.
pub fn encode_quantile(&mut self, quantile: f64) -> Result<ValueEncoder, std::io::Error> {
if self.opened_curly_brackets {
self.writer.write_all(b",")?;
} else {
self.writer.write_all(b"{")?;
}

self.writer.write_all(b"quantile=\"")?;
quantile.encode(self.writer)?;
self.writer.write_all(b"\"}")?;

Ok(ValueEncoder {
writer: self.writer,
})
}

/// Signal that the metric type has no bucket.
pub fn no_bucket(&mut self) -> Result<ValueEncoder, std::io::Error> {
if self.opened_curly_brackets {
Expand Down Expand Up @@ -579,6 +598,41 @@ fn encode_histogram_with_maybe_exemplars<S: Encode>(
Ok(())
}


/////////////////////////////////////////////////////////////////////////////////
// Summary

impl EncodeMetric for Summary {
fn encode(&self, mut encoder: Encoder) -> Result<(), std::io::Error> {
let (sum, count, quantiles) = self.get();

encoder
.encode_suffix("sum")?
.no_bucket()?
.encode_value(sum)?
.no_exemplar()?;
encoder
.encode_suffix("count")?
.no_bucket()?
.encode_value(count)?
.no_exemplar()?;

for (_, (quantile, result)) in quantiles.iter().enumerate() {
let mut bucket_encoder = encoder.no_suffix()?;
let mut value_encoder = bucket_encoder.encode_quantile(*quantile)?;
let mut exemplar_encoder = value_encoder.encode_value(*result)?;
exemplar_encoder.no_exemplar()?
}

Result::Ok(())
}

fn metric_type(&self) -> MetricType {
Self::TYPE
}
}


/////////////////////////////////////////////////////////////////////////////////
// Info

Expand Down Expand Up @@ -818,6 +872,32 @@ mod tests {
parse_with_python_client(String::from_utf8(encoded).unwrap());
}

#[test]
fn encode_summary() {
let mut registry = Registry::default();
let summary = Summary::new(3, 10, vec![0.5, 0.9, 0.99], 0.0);
registry.register("my_summary", "My summary", summary.clone());
summary.observe(0.10);
summary.observe(0.20);
summary.observe(0.30);

let mut encoded = Vec::new();

encode(&mut encoded, &registry).unwrap();

let expected = "# HELP my_summary My summary.\n".to_owned()
+ "# TYPE my_summary summary\n"
+ "my_summary_sum 0.6000000000000001\n"
+ "my_summary_count 3\n"
+ "my_summary{quantile=\"0.5\"} 0.2\n"
+ "my_summary{quantile=\"0.9\"} 0.3\n"
+ "my_summary{quantile=\"0.99\"} 0.3\n"
+ "# EOF\n";
assert_eq!(expected, String::from_utf8(encoded.clone()).unwrap());

parse_with_python_client(String::from_utf8(encoded).unwrap());
}

fn parse_with_python_client(input: String) {
pyo3::prepare_freethreaded_python();

Expand Down
2 changes: 2 additions & 0 deletions src/metrics.rs
Expand Up @@ -5,6 +5,7 @@ pub mod exemplar;
pub mod family;
pub mod gauge;
pub mod histogram;
pub mod summary;
pub mod info;

/// A metric that is aware of its Open Metrics metric type.
Expand All @@ -22,6 +23,7 @@ pub enum MetricType {
Histogram,
Info,
Unknown,
Summary,
// Not (yet) supported metric types.
//
// GaugeHistogram,
Expand Down
151 changes: 151 additions & 0 deletions src/metrics/summary.rs
@@ -0,0 +1,151 @@
//! Module implementing an Open Metrics histogram.
//!
//! See [`Summary`] for details.

use super::{MetricType, TypedMetric};
//use owning_ref::OwningRef;
//use std::iter::{self, once};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use quantiles::ckms::CKMS;

/// Open Metrics [`Summary`] to measure distributions of discrete events.
#[derive(Debug)]
pub struct Summary {
target_quantile: Vec<f64>,
target_error: f64,
max_age_buckets: u64,
max_age_seconds: u64,
stream_duration: Duration,
inner: Arc<Mutex<InnerSummary>>,
}

impl Clone for Summary {
fn clone(&self) -> Self {
Summary {
target_quantile: self.target_quantile.clone(),
target_error: self.target_error,
max_age_buckets: self.max_age_buckets,
max_age_seconds: self.max_age_seconds,
stream_duration: self.stream_duration,
inner: self.inner.clone(),
}
}
}

#[derive(Debug)]
pub(crate) struct InnerSummary {
sum: f64,
count: u64,
quantile_streams: Vec<CKMS<f64>>,
// head_stream is like a cursor which carries the index
// of the stream in the quantile_streams that we want to query.
head_stream_idx: u64,
// timestamp at which the head_stream_idx was last rotated.
last_rotated_timestamp: Instant,
}

impl Summary {
/// Create a new [`Summary`].
pub fn new(max_age_buckets: u64, max_age_seconds: u64, target_quantile: Vec<f64>, target_error: f64) -> Self {
let mut streams: Vec<CKMS<f64>> = Vec::new();
for _ in 0..max_age_buckets {
streams.push(CKMS::new(target_error));
}

let stream_duration = Duration::from_secs(max_age_seconds / max_age_buckets);
let last_rotated_timestamp = Instant::now();

if target_quantile.iter().any(|&x| x > 1.0 || x < 0.0) {
panic!("Quantile value out of range");
}

Summary{
max_age_buckets,
max_age_seconds,
stream_duration,
target_quantile,
target_error,
inner: Arc::new(Mutex::new(InnerSummary {
sum: Default::default(),
count: Default::default(),
quantile_streams: streams,
head_stream_idx: 0,
last_rotated_timestamp,
}))
}
}

/// Observe the given value.
pub fn observe(&self, v: f64) {
self.rotate_buckets();

let mut inner = self.inner.lock().unwrap();
inner.sum += v;
inner.count += 1;

// insert quantiles into all streams/buckets.
for stream in inner.quantile_streams.iter_mut() {
stream.insert(v);
}
}

/// Retrieve the values of the summary metric.
pub fn get(&self) -> (f64, u64, Vec<(f64, f64)>) {
self.rotate_buckets();

let inner = self.inner.lock().unwrap();
let sum = inner.sum;
let count = inner.count;
let mut quantile_values: Vec<(f64, f64)> = Vec::new();

for q in self.target_quantile.iter() {
match inner.quantile_streams[inner.head_stream_idx as usize].query(*q) {
Some((_, v)) => quantile_values.push((*q, v)),
None => continue,
};
}
(sum, count, quantile_values)
}

fn rotate_buckets(&self) {
let mut inner = self.inner.lock().unwrap();
if inner.last_rotated_timestamp.elapsed() >= self.stream_duration {
inner.last_rotated_timestamp = Instant::now();
if inner.head_stream_idx == self.max_age_buckets {
inner.head_stream_idx = 0;
} else {
inner.head_stream_idx += 1;
}
};
}
}

impl TypedMetric for Summary {
const TYPE: MetricType = MetricType::Summary;
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn summary() {
let summary = Summary::new(5, 10, vec![0.5, 0.9, 0.99], 0.01);
summary.observe(1.0);
summary.observe(5.0);
summary.observe(10.0);

let (s, c, q) = summary.get();
assert_eq!(16.0, s);
assert_eq!(3, c);
assert_eq!(vec![(0.5, 5.0), (0.9, 10.0), (0.99, 10.0)], q);
}

#[test]
#[should_panic(expected="Quantile value out of range")]
fn summary_panic() {
Summary::new(5, 10, vec![1.0, 5.0, 9.0], 0.01);
}
}

0 comments on commit 29661c0

Please sign in to comment.