Skip to content

Commit

Permalink
src/histogram: Make Histogram::observe atomic across collects (#314)
Browse files Browse the repository at this point in the history
* src/histogram: Add test ensuring Histogram::observe is atomic

If an observe and a collect operation interleave, the latter should not
expose a snapshot of the histogram that does not uphold all histogram
invariants. For example for the invariant that the overall observation
counter should equal the sum of all bucket counters: Say that an
`observe` increases the overall counter but before updating a specific
bucket counter a collect operation snapshots the histogram.

This commits adds a basic unit test to test that the above is not
happening.

Signed-off-by: Max Inden <mail@max-inden.de>

* src/{histogram,atomic64}: Make Histogram::observe atomic across collects

A histogram supports two main execution paths:

1. `observe` which increases the overall observation counter, updates
the observation sum and increases a single bucket counter.

2. `proto` (aka. collecting the metric, from now on referred to as the
collect operation) which snapshots the state of the histogram and
exposes it as a Protobuf struct.

If an observe and a collect operation interleave, the latter could be
exposing a snapshot of the histogram that does not uphold all histogram
invariants. For example for the invariant that the overall observation
counter should equal the sum of all bucket counters: Say that an
`observe` increases the overall counter but before updating a specific
bucket counter a collect operation snapshots the histogram.

This commits adjusts the `HistogramCore` implementation to make such
race conditions impossible. It introduces the notion of shards, one hot
shard for `observe` operations to record their observation and one cold
shard for collect operations to collect a consistent snapshot of the
histogram.

`observe` operations hit the hot shard and record their observation.
Collect operations switch hot and cold, wait for all `observe` calls to
finish on the previously hot now cold shard and then expose the
consistent snapshot.

Signed-off-by: Max Inden <mail@max-inden.de>

* benches/histogram: Add benchmark for concurrent observe and collect

Add a basic benchmark test which spawns 4 threads in the background
continuously calling `observe` 1_000 times and then `collect`. At the
same time call `observe` within the `Bencher::iter` closure to measure
impact of background threads on `observe` call.

Signed-off-by: Max Inden <mail@max-inden.de>

* src/histogram: Account for missing take without protobuf feature

Signed-off-by: Max Inden <mail@max-inden.de>

* src/histogram,benches/histogram: Run Rust fmt

Signed-off-by: Max Inden <mail@max-inden.de>

* src/histogram: Drop collect lock guard explicitly

Rusts drop semantics can be confusing sometimes. E.g. `let _ = l.lock()`
would drop the lock guard immediately whereas `let _guard = l.lock()`
would drop the guard in LIFO order at the end of the current scope.

Instead of relying on the above guarantee with `let _guard`, drop the
mutex guard explicitely hopefully making this less error prone in the
future.

Signed-off-by: Max Inden <mail@max-inden.de>

* src/histogram: Remove underscore prefix from used variable

Signed-off-by: Max Inden <mail@max-inden.de>

* src/{atomic,histogram}: Make swap take ordering and tighten usage

Signed-off-by: Max Inden <mail@max-inden.de>

* src/histogram: Test invariant that sum == count with observe(1.0)

Signed-off-by: Max Inden <mail@max-inden.de>

* src/atomic64: Adjust swap doc comment

Signed-off-by: Max Inden <mail@max-inden.de>

* src/histogram: Remove pub from shard related structs and fn

Signed-off-by: Max Inden <mail@max-inden.de>

Co-authored-by: Luca Bruno <luca.bruno@coreos.com>
  • Loading branch information
mxinden and lucab committed Jul 14, 2020
1 parent ab3d4aa commit 4fdff69
Show file tree
Hide file tree
Showing 3 changed files with 409 additions and 30 deletions.
34 changes: 33 additions & 1 deletion benches/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

extern crate test;

use prometheus::{Histogram, HistogramOpts, HistogramVec};
use prometheus::{core::Collector, Histogram, HistogramOpts, HistogramVec};
use std::sync::{atomic, Arc};
use std::thread;
use test::Bencher;

#[bench]
Expand Down Expand Up @@ -99,3 +101,33 @@ fn bench_local_histogram_coarse_timer(b: &mut Bencher) {
b.iter(|| local.start_coarse_timer());
local.flush();
}

#[bench]
fn concurrent_observe_and_collect(b: &mut Bencher) {
let signal_exit = Arc::new(atomic::AtomicBool::new(false));
let opts = HistogramOpts::new("test_name", "test help").buckets(vec![1.0]);
let histogram = Histogram::with_opts(opts).unwrap();

let mut handlers = vec![];

for _ in 0..4 {
let histogram = histogram.clone();
let signal_exit = signal_exit.clone();
handlers.push(thread::spawn(move || {
while !signal_exit.load(atomic::Ordering::Relaxed) {
for _ in 0..1_000 {
histogram.observe(1.0);
}

histogram.collect();
}
}));
}

b.iter(|| histogram.observe(1.0));

signal_exit.store(true, atomic::Ordering::Relaxed);
for handler in handlers {
handler.join().unwrap();
}
}
26 changes: 25 additions & 1 deletion src/atomic64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ impl Atomic for AtomicF64 {
}
}

impl AtomicF64 {
/// Store the value, returning the previous value.
pub fn swap(&self, val: f64, ordering: Ordering) -> f64 {
u64_to_f64(self.inner.swap(f64_to_u64(val), ordering))
}
}

/// A atomic signed integer.
#[derive(Debug)]
pub struct AtomicI64 {
Expand Down Expand Up @@ -188,7 +195,7 @@ impl Atomic for AtomicU64 {

#[inline]
fn inc_by(&self, delta: Self::T) {
self.inner.fetch_add(delta, Ordering::Relaxed);
self.inc_by_with_ordering(delta, Ordering::Relaxed);
}

#[inline]
Expand All @@ -197,6 +204,23 @@ impl Atomic for AtomicU64 {
}
}

impl AtomicU64 {
/// Get the value with the provided memory ordering.
pub fn compare_and_swap(&self, current: u64, new: u64, ordering: Ordering) -> u64 {
self.inner.compare_and_swap(current, new, ordering)
}

/// Increment the value by a given amount with the provided memory ordering.
pub fn inc_by_with_ordering(&self, delta: u64, ordering: Ordering) {
self.inner.fetch_add(delta, ordering);
}

/// Stores a value into the atomic integer, returning the previous value.
pub fn swap(&self, val: u64, ordering: Ordering) -> u64 {
self.inner.swap(val, ordering)
}
}

#[cfg(test)]
mod test {
use std::f64::consts::PI;
Expand Down

0 comments on commit 4fdff69

Please sign in to comment.