Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MaximumOverIntervalGauge #469

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ procfs = { version = "^0.14", optional = true, default-features = false }
criterion = "0.4"
getopts = "^0.2"
hyper = { version = "^0.14", features = ["server", "http1", "tcp"] }
mock_instant = { version = "0.2", features = ["sync"] }
tokio = { version = "^1.0", features = ["macros", "rt-multi-thread"] }

[build-dependencies]
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ mod encoder;
mod errors;
mod gauge;
mod histogram;
mod maximum_over_interval_gauge;
mod metrics;
mod pulling_gauge;
#[cfg(feature = "push")]
mod push;
mod registry;
Expand Down Expand Up @@ -218,7 +220,9 @@ pub use self::gauge::{Gauge, GaugeVec, IntGauge, IntGaugeVec};
pub use self::histogram::DEFAULT_BUCKETS;
pub use self::histogram::{exponential_buckets, linear_buckets};
pub use self::histogram::{Histogram, HistogramOpts, HistogramTimer, HistogramVec};
pub use self::maximum_over_interval_gauge::MaximumOverIntervalGauge;
pub use self::metrics::Opts;
pub use self::pulling_gauge::PullingGauge;
#[cfg(feature = "push")]
pub use self::push::{
hostname_grouping_key, push_add_collector, push_add_metrics, push_collector, push_metrics,
Expand Down
212 changes: 212 additions & 0 deletions src/maximum_over_interval_gauge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use std::{
sync::{
atomic::{AtomicI64, Ordering},
Arc,
},
time::Duration,
};

#[cfg(test)]
use mock_instant::Instant;

#[cfg(not(test))]
use std::time::Instant;

use crate::{core::Collector, Error, PullingGauge};

/// A prometheus gauge that exposes the maximum value of a gauge over an interval.
///
/// Used to expose instantaneous values that tend to move a lot within a small interval.
///
/// # Examples
/// ```
/// # use std::time::Duration;
/// # use prometheus::{Registry, MaximumOverIntervalGauge};
///
/// let registry = Registry::new();
/// let gauge = MaximumOverIntervalGauge::new(
/// "maximum_queue_size_30s",
/// "The high watermark queue size in the last 30 seconds.",
/// Duration::from_secs(30)
/// ).unwrap();
/// registry.register(Box::new(gauge.clone()));
///
/// gauge.add(30);
/// gauge.sub(10);
///
/// // For the next 30 seconds, the metric will be 30 as that was the maximum value.
/// // Afterwards, it will drop to 10.
/// ```
#[derive(Clone, Debug)]
pub struct MaximumOverIntervalGauge {
// The current real-time value.
value: Arc<AtomicI64>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and everywhere else, gauges typically operate on f64 values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was purely an i64 because we weren't clear on how to implement an atomic f64 (nor did we care enough). I just discovered your awesome https://github.com/tikv/rust-prometheus/blob/master/src/atomic64.rs#L89 setup -- will switch this over!

// The maximum value in the current interval.
maximum_value: Arc<AtomicI64>,

// The length of a given interval.
interval_duration: Duration,
// The time at which the current interval will expose.
interval_expiry: Arc<RwLock<Instant>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually use an ArcSwap for this in our internal codebase. I've switched this over to an RwLock here to avoid taking on the ArcSwap crate dependency.


gauge: PullingGauge,
}

impl MaximumOverIntervalGauge {
/// Create a new [`MaximumOverIntervalGauge`].
pub fn new<S1: Into<String>, S2: Into<String>>(
name: S1,
help: S2,
interval: Duration,
) -> Result<Self, Error> {
let maximum_value = Arc::new(AtomicI64::new(0));

Ok(Self {
value: Arc::new(AtomicI64::new(0)),
maximum_value: maximum_value.clone(),

interval_expiry: Arc::new(RwLock::new(Instant::now() + interval)),
interval_duration: interval,
gauge: PullingGauge::new(
name,
help,
Box::new(move || maximum_value.load(Ordering::Relaxed) as f64),
)?,
})
}

/// Increments the gauge by 1.
pub fn inc(&self) {
self.apply_delta(1);
}

/// Decrements the gauge by 1.
pub fn dec(&self) {
self.apply_delta(-1);
}

/// Add the given value to the gauge.
///
/// (The value can be negative, resulting in a decrement of the gauge.)
pub fn add(&self, v: i64) {
self.apply_delta(v);
}

/// Subtract the given value from the gauge.
///
/// (The value can be negative, resulting in an increment of the gauge.)
pub fn sub(&self, v: i64) {
self.apply_delta(-v);
}

fn apply_delta(&self, delta: i64) {
let previous_value = self.value.fetch_add(delta, Ordering::Relaxed);
let new_value = previous_value + delta;

let now = Instant::now();
let interval_expiry = self.interval_expiry.upgradable_read();
let loaded_interval_expiry = *interval_expiry;

// Check whether we've crossed into the new interval.
if loaded_interval_expiry < now {
// There's a possible optimization here of using try_upgrade in a loop. Need to write
// benchmarks to verify.
let mut interval_expiry = RwLockUpgradableReadGuard::upgrade(interval_expiry);

// Did we get to be the thread that actually started the new interval? Other threads
// could have updated the value before we got the exclusive lock.
if *interval_expiry == loaded_interval_expiry {
*interval_expiry = now + self.interval_duration;
self.maximum_value.store(new_value, Ordering::Relaxed);

return;
}
}

// Set the maximum_value to the max of the current value & previous max.
self.maximum_value.fetch_max(new_value, Ordering::Relaxed);
}
}

impl Collector for MaximumOverIntervalGauge {
fn desc(&self) -> Vec<&crate::core::Desc> {
self.gauge.desc()
}

fn collect(&self) -> Vec<crate::proto::MetricFamily> {
// Apply a delta of '0' to ensure that the reset-value-if-interval-expired-logic kicks in.
self.apply_delta(0);

self.gauge.collect()
}
}

#[cfg(test)]
mod test {
use mock_instant::MockClock;

use super::*;

static INTERVAL: Duration = Duration::from_secs(30);

#[test]
fn test_correct_behaviour() {
let gauge = MaximumOverIntervalGauge::new(
"test_counter".to_string(),
"This won't help you".to_string(),
INTERVAL,
)
.unwrap();

assert_metric_value(&gauge, 0.0);

gauge.add(5);

assert_metric_value(&gauge, 5.0);

gauge.dec();

// The value should still be five after we decreased it as the max within the interval was 5.
assert_metric_value(&gauge, 5.0);

MockClock::advance(INTERVAL + Duration::from_secs(1));

// The value should be 4 now as the next interval has started.
assert_metric_value(&gauge, 4.0);
}

#[test]
fn test_cloning() {
let gauge = MaximumOverIntervalGauge::new(
"test_counter".to_string(),
"This won't help you".to_string(),
INTERVAL,
)
.unwrap();

let same_gauge = gauge.clone();

assert_metric_value(&gauge, 0.0);

gauge.add(5);

// Read from the cloned gauge to veriy that they share data.
assert_metric_value(&same_gauge, 5.0);
}

fn assert_metric_value(gauge: &MaximumOverIntervalGauge, val: f64) {
let result = gauge.collect();

let metric_family = result
.first()
.expect("expected one MetricFamily to be returned");

let metric = metric_family
.get_metric()
.first()
.expect("expected one Metric to be returned");

assert_eq!(val, metric.get_gauge().get_value());
}
}
101 changes: 101 additions & 0 deletions src/pulling_gauge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::{collections::HashMap, fmt, sync::Arc};

use crate::{
core::Collector,
proto::{Gauge, Metric, MetricFamily, MetricType},
};
use protobuf::RepeatedField;

/// A [Gauge] that returns the value from a provided function on every collect run.
///
/// This metric is the equivalant of Go's
/// <https://pkg.go.dev/github.com/prometheus/client_golang@v1.11.0/prometheus#GaugeFunc>
///
/// # Examples
/// ```
/// # use prometheus::{Registry, PullingGauge};
/// # // We are stubbing out std::thread::available_parallelism since it's not available in the
/// # // oldest Rust version that we support.
/// # fn available_parallelism() -> f64 { 0.0 }
///
/// let registry = Registry::new();
/// let gauge = PullingGauge::new(
/// "available_parallelism",
/// "The available parallelism, usually the numbers of logical cores.",
/// Box::new(|| available_parallelism())
/// ).unwrap();
/// registry.register(Box::new(gauge));
/// ```
#[derive(Clone)]
pub struct PullingGauge {
desc: crate::core::Desc,
value: Arc<Box<dyn Fn() -> f64 + Send + Sync>>,
}

impl fmt::Debug for PullingGauge {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PullingGauge")
.field("desc", &self.desc)
.field("value", &"<opaque>")
.finish()
}
}

impl PullingGauge {
/// Create a new [`PullingGauge`].
pub fn new<S1: Into<String>, S2: Into<String>>(
name: S1,
help: S2,
value: Box<dyn Fn() -> f64 + Send + Sync>,
) -> crate::Result<Self> {
Ok(PullingGauge {
value: Arc::new(value),
desc: crate::core::Desc::new(name.into(), help.into(), Vec::new(), HashMap::new())?,
})
}

fn metric(&self) -> Metric {
let mut gauge = Gauge::default();
let getter = &self.value;
gauge.set_value(getter());

let mut metric = Metric::default();
metric.set_gauge(gauge);

metric
}
}

impl Collector for PullingGauge {
fn desc(&self) -> Vec<&crate::core::Desc> {
vec![&self.desc]
}

fn collect(&self) -> Vec<crate::proto::MetricFamily> {
let mut m = MetricFamily::default();
m.set_name(self.desc.fq_name.clone());
m.set_help(self.desc.help.clone());
m.set_field_type(MetricType::GAUGE);
m.set_metric(RepeatedField::from_vec(vec![self.metric()]));
vec![m]
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::metrics::Collector;

#[test]
fn test_pulling_gauge() {
const VALUE: f64 = 10.0;

let gauge =
PullingGauge::new("test_gauge", "Purely for testing", Box::new(|| VALUE)).unwrap();

let metrics = gauge.collect();
assert_eq!(metrics.len(), 1);

assert_eq!(VALUE, metrics[0].get_metric()[0].get_gauge().get_value());
}
}