Skip to content

Commit

Permalink
Add Summary metric type
Browse files Browse the repository at this point in the history
Signed-off-by: Palash Nigam <npalash25@gmail.com>
  • Loading branch information
palash25 committed Oct 14, 2022
1 parent 69e6674 commit 1455fcc
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -18,6 +18,7 @@ dtoa = "1.0"
itoa = "1.0"
owning_ref = "0.4"
prometheus-client-derive-text-encode = { version = "0.3.0", path = "derive-text-encode" }
quantiles = "0.7.1"

[dev-dependencies]
async-std = { version = "1", features = ["attributes"] }
Expand Down
78 changes: 78 additions & 0 deletions src/encoding/text.rs
Expand Up @@ -29,6 +29,7 @@ use crate::metrics::exemplar::{CounterWithExemplar, Exemplar, HistogramWithExemp
use crate::metrics::family::{Family, MetricConstructor};
use crate::metrics::gauge::{self, Gauge};
use crate::metrics::histogram::Histogram;
use crate::metrics::summary::Summary;
use crate::metrics::info::Info;
use crate::metrics::{MetricType, TypedMetric};
use crate::registry::{Registry, Unit};
Expand Down Expand Up @@ -184,6 +185,7 @@ impl Encode for MetricType {
MetricType::Histogram => "histogram",
MetricType::Info => "info",
MetricType::Unknown => "unknown",
MetricType::Summary => "summary",
};

writer.write_all(t.as_bytes())?;
Expand Down Expand Up @@ -331,6 +333,23 @@ impl<'a> BucketEncoder<'a> {
})
}

/// Encode a quantile. Used for the [`Summary`] metric type.
pub fn encode_quantile(&mut self, quantile: f64) -> Result<ValueEncoder, std::io::Error> {
if self.opened_curly_brackets {
self.writer.write_all(b",")?;
} else {
self.writer.write_all(b"{")?;
}

self.writer.write_all(b"quantile=\"")?;
quantile.encode(self.writer)?;
self.writer.write_all(b"\"}")?;

Ok(ValueEncoder {
writer: self.writer,
})
}

/// Signal that the metric type has no bucket.
pub fn no_bucket(&mut self) -> Result<ValueEncoder, std::io::Error> {
if self.opened_curly_brackets {
Expand Down Expand Up @@ -580,6 +599,40 @@ fn encode_histogram_with_maybe_exemplars<S: Encode>(
Ok(())
}

/////////////////////////////////////////////////////////////////////////////////
// Summary

impl EncodeMetric for Summary {
fn encode(&self, mut encoder: Encoder) -> Result<(), std::io::Error> {
let (sum, count, quantiles) = self.get();

encoder
.encode_suffix("sum")?
.no_bucket()?
.encode_value(sum)?
.no_exemplar()?;
encoder
.encode_suffix("count")?
.no_bucket()?
.encode_value(count)?
.no_exemplar()?;

for (_, (quantile, result)) in quantiles.iter().enumerate() {
let mut bucket_encoder = encoder.no_suffix()?;
let mut value_encoder = bucket_encoder.encode_quantile(*quantile)?;
let mut exemplar_encoder = value_encoder.encode_value(*result)?;
exemplar_encoder.no_exemplar()?
}

Result::Ok(())
}

fn metric_type(&self) -> MetricType {
Self::TYPE
}
}


/////////////////////////////////////////////////////////////////////////////////
// Info

Expand All @@ -603,6 +656,7 @@ where
}
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -819,6 +873,30 @@ mod tests {
parse_with_python_client(String::from_utf8(encoded).unwrap());
}

#[test]
fn encode_summary() {
let mut registry = Registry::default();
let summary = Summary::new(3, 10, vec![0.5, 0.9, 0.99], 0.0);
registry.register("my_summary", "My summary", summary.clone());
summary.observe(0.10);
summary.observe(0.20);
summary.observe(0.30);

let mut encoded = Vec::new();

encode(&mut encoded, &registry).unwrap();

let expected = "# HELP my_summary My summary.\n".to_owned()
+ "# TYPE my_summary summary\n"
+ "my_summary_sum 0.6000000000000001\n"
+ "my_summary_count 3\n"
+ "my_summary{quantile=\"0.5\"} 0.2\n"
+ "my_summary{quantile=\"0.9\"} 0.3\n"
+ "my_summary{quantile=\"0.99\"} 0.3\n"
+ "# EOF\n";
assert_eq!(expected, String::from_utf8(encoded.clone()).unwrap());
}

fn parse_with_python_client(input: String) {
pyo3::prepare_freethreaded_python();

Expand Down
3 changes: 2 additions & 1 deletion src/metrics.rs
Expand Up @@ -5,6 +5,7 @@ pub mod exemplar;
pub mod family;
pub mod gauge;
pub mod histogram;
pub mod summary;
pub mod info;

/// A metric that is aware of its Open Metrics metric type.
Expand All @@ -19,9 +20,9 @@ pub enum MetricType {
Histogram,
Info,
Unknown,
Summary,
// Not (yet) supported metric types.
//
// GaugeHistogram,
// StateSet,
// Summary
}
2 changes: 1 addition & 1 deletion src/metrics/counter.rs
Expand Up @@ -185,7 +185,7 @@ mod tests {
// Map infinite, subnormal and NaN to 0.0.
.map(|f| if f.is_normal() { f } else { 0.0 })
.collect();
let sum = fs.iter().sum();
let sum: f64 = fs.iter().sum();
let counter = Counter::<f64, AtomicU64>::default();
for f in fs {
counter.inc_by(f);
Expand Down
117 changes: 117 additions & 0 deletions src/metrics/summary.rs
@@ -0,0 +1,117 @@
//! Module implementing an Open Metrics histogram.
//!
//! See [`Summary`] for details.

use super::{MetricType, TypedMetric};
//use owning_ref::OwningRef;
//use std::iter::{self, once};
use std::sync::{Arc, Mutex};

use quantiles::ckms::CKMS;

/// Open Metrics [`Summary`] to measure distributions of discrete events.
pub struct Summary {
target_quantile: Vec<f64>,
target_error: f64,
max_age_buckets: u64,
max_age_seconds: u64,
inner: Arc<Mutex<InnerSummary>>,
}

impl Clone for Summary {
fn clone(&self) -> Self {
Summary {
target_quantile: self.target_quantile.clone(),
target_error: self.target_error,
max_age_buckets: self.max_age_buckets,
max_age_seconds: self.max_age_seconds,
inner: self.inner.clone(),
}
}
}

pub(crate) struct InnerSummary {
sum: f64,
count: u64,
quantile_streams: Vec<CKMS<f64>>,
// head_stream is like a cursor which carries the index
// of the stream in the quantile_streams that we want to query
head_stream: u64,
}

impl Summary {
pub fn new(max_age_buckets: u64, max_age_seconds: u64, target_quantile: Vec<f64>, target_error: f64) -> Self {
let mut streams: Vec<CKMS<f64>> = Vec::new();
for _ in 0..max_age_buckets {
streams.push(CKMS::new(target_error));
}

Summary{
max_age_buckets,
max_age_seconds,
target_quantile,
target_error,
inner: Arc::new(Mutex::new(InnerSummary {
sum: Default::default(),
count: Default::default(),
quantile_streams: streams,
head_stream: 0,
}))
}
}

pub fn observe(&self, v: f64) {
let mut inner = self.inner.lock().unwrap();
inner.sum += v;
inner.count += 1;

// insert quantiles into all streams/buckets.
for stream in inner.quantile_streams.iter_mut() {
stream.insert(v);
}
}

pub fn get(&self) -> (f64, u64, Vec<(f64, f64)>) {
let inner = self.inner.lock().unwrap();
let sum = inner.sum;
let count = inner.count;
let head = inner.head_stream;
let mut quantile_values: Vec<(f64, f64)> = Vec::new();

// TODO: add stream rotation
for q in self.target_quantile.iter() {
match inner.quantile_streams[head as usize].query(*q) {
Some((_, v)) => quantile_values.push((*q, v)),
None => continue, // TODO fix this
};
}
(sum, count, quantile_values)
}
}

// TODO: should this type impl Default like Counter?

impl TypedMetric for Summary {
const TYPE: MetricType = MetricType::Summary;
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn basic() {
let summary = Summary::new(5, 10, vec![0.5, 0.9, 0.99], 0.01);
summary.observe(5.0);
summary.observe(15.0);
summary.observe(25.0);

let (s, c, q) = summary.get();
assert_eq!(45.0, s);
assert_eq!(3, c);

for elem in q.iter() {
println!("Vec<{}, {}>", elem.0, elem.1);
}
}
}

0 comments on commit 1455fcc

Please sign in to comment.