Skip to content

Commit

Permalink
Switch Estimator to use an double exponential time-based weighting
Browse files Browse the repository at this point in the history
This is an implementation of an exponentially weighted running average
with a tuning parameter (currently a constant in this implementation):

    * the `ews` parameter is a number of seconds. Progress the Estimator
    has observed that is older than this value will receive a total
    weight of 0.1 in the rate estimation, and newer values will receive a
    total of 0.9. The default is 15 seconds.

This implementation does double smoothing by applying the running
average to itself. The result avoids undesirable instantaneous movements
in the estimate when large updates occur.

The exponential estimator works by keeping a running tally, where an
existing tally that has aged `t` seconds is reweighted such that

    weight ^ (ewa / age) = 0.1

For instance, data aged 5 seconds with a 15 second weight parameter
would receive `weight = 0.1 ^ (5/15) = 0.464`. If it then ages another
10 seconds, it would receive `weight = 0.1 ^ (10/15) = 0.215`. After
being multiplied by both weights, it would have a weight of
`0.464 * 0.215 = 0.1`, as expected.

A couple of basic features are also implemented for higher quality
estimates:

    * We divide out any weight given to data before the estimator was
    initialized, since these values are meaningless. This is called
    "debiasing" since it removes the estimator's bias toward the initial
    value.

    * When returning an estimate, we include the time since the last
    updated was received as non-progress, which means that the estimator
    does not freeze when progress stalls.
  • Loading branch information
afontenot committed May 28, 2023
1 parent 4f49789 commit e31cbf3
Showing 1 changed file with 187 additions and 69 deletions.
256 changes: 187 additions & 69 deletions src/state.rs
@@ -1,9 +1,9 @@
use std::borrow::Cow;
use std::io;
use std::sync::Arc;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;
use std::{fmt, io};

#[cfg(target_arch = "wasm32")]
use instant::Instant;
Expand Down Expand Up @@ -224,13 +224,14 @@ pub struct ProgressState {

impl ProgressState {
pub(crate) fn new(len: Option<u64>, pos: Arc<AtomicPosition>) -> Self {
let now = Instant::now();
Self {
pos,
len,
tick: 0,
status: Status::InProgress,
started: Instant::now(),
est: Estimator::new(Instant::now()),
started: now,
est: Estimator::new(now),
message: TabExpandedString::NoTabs("".into()),
prefix: TabExpandedString::NoTabs("".into()),
}
Expand Down Expand Up @@ -270,7 +271,7 @@ impl ProgressState {

let pos = self.pos.pos.load(Ordering::Relaxed);

let sps = self.est.steps_per_second();
let sps = self.est.steps_per_second(Instant::now());

// Infinite duration should only ever happen at the beginning, so in this case it's okay to
// just show an ETA of 0 until progress starts to occur.
Expand All @@ -292,7 +293,7 @@ impl ProgressState {
/// The number of steps per second
pub fn per_sec(&self) -> f64 {
if let Status::InProgress = self.status {
self.est.steps_per_second()
self.est.steps_per_second(Instant::now())
} else {
let len = self.len.unwrap_or_else(|| self.pos());
len as f64 / self.started.elapsed().as_secs_f64()
Expand Down Expand Up @@ -370,85 +371,125 @@ impl TabExpandedString {
}
}

/// Estimate the number of seconds per step
/// Double-smoothed exponentially weighted estimator
///
/// This uses an exponentially weighted *time-based* estimator, meaning that it exponentially
/// downweights old data based on its age. The rate at which this occurs is currently a constant
/// value of 15 seconds for 90% weighting. This means that all data older than 15 seconds has a
/// collective weight of 0.1 in the estimate, and all data older than 30 seconds has a collective
/// weight of 0.01, and so on.
///
/// Ring buffer with constant capacity. Used by `ProgressBar`s to display `{eta}`,
/// `{eta_precise}`, and `{*_per_sec}`.
/// The primary value exposed by `Estimator` is `steps_per_second`. This value is doubly-smoothed,
/// meaning that is the result of using an exponentially weighted estimator (as described above) to
/// estimate the value of another exponentially weighted estimator, which estimates the value of
/// the raw data.
///
/// The purpose of this extra smoothing step is to reduce instantaneous fluctations in the estimate
/// when large updates are received. Without this, estimates might have a large spike followed by a
/// slow asymptotic approach to zero (until the next spike).
#[derive(Debug)]
pub(crate) struct Estimator {
steps: [f64; 16],
pos: u8,
full: bool,
smoothed_steps_per_sec: f64,
double_smoothed_steps_per_sec: f64,
prev_steps: u64,
prev_time: Instant,
start_time: Instant,
}

impl Estimator {
fn new(now: Instant) -> Self {
Self {
steps: [0.0; 16],
pos: 0,
full: false,
smoothed_steps_per_sec: 0.0,
double_smoothed_steps_per_sec: 0.0,
prev_steps: 0,
prev_time: now,
start_time: now,
}
}

fn record(&mut self, new_steps: u64, now: Instant) {
let delta = new_steps.saturating_sub(self.prev_steps);
if delta == 0 || now < self.prev_time {
// sanity check: don't record data if time or steps have not advanced
if new_steps <= self.prev_steps || now <= self.prev_time {
// Reset on backwards seek to prevent breakage from seeking to the end for length determination
// See https://github.com/console-rs/indicatif/issues/480
if new_steps < self.prev_steps {
self.prev_steps = new_steps;
self.reset(now);
}
return;
}

let elapsed = now - self.prev_time;
let divisor = delta as f64;
let mut batch = 0.0;
if divisor != 0.0 {
batch = duration_to_secs(elapsed) / divisor;
};
let delta_steps = new_steps - self.prev_steps;
let delta_t = duration_to_secs(now - self.prev_time);

self.steps[self.pos as usize] = batch;
self.pos = (self.pos + 1) % 16;
if !self.full && self.pos == 0 {
self.full = true;
}
// the rate of steps we saw in this update
let new_steps_per_second = delta_steps as f64 / delta_t;

// update the estimate: a weighted average of the old estimate and new data
let weight = estimator_weight(delta_t);
self.smoothed_steps_per_sec =
self.smoothed_steps_per_sec * weight + new_steps_per_second * (1.0 - weight);

// Get an unbiased estimate of `smoothed_steps_per_sec` to serve as the data source for the
// double smoothed estimate. See comment on debiasing in `steps_per_second` for details.
let delta_t_start = duration_to_secs(now - self.start_time);
let debias = 1.0 - estimator_weight(delta_t_start);
let debiased_smoothed_steps_per_sec = self.smoothed_steps_per_sec / debias;

// determine the double smoothed value (EWA smoothing of the single EWA)
self.double_smoothed_steps_per_sec = self.double_smoothed_steps_per_sec * weight
+ debiased_smoothed_steps_per_sec * (1.0 - weight);

self.prev_steps = new_steps;
self.prev_time = now;
}

/// Reset the state of the estimator. Once reset, estimates will not depend on any data prior
/// to `now`. This does not reset the stored position of the progress bar.
pub(crate) fn reset(&mut self, now: Instant) {
self.pos = 0;
self.full = false;
self.prev_steps = 0;
self.prev_time = now;
}
self.smoothed_steps_per_sec = 0.0;
self.double_smoothed_steps_per_sec = 0.0;

/// Average time per step in seconds, using rolling buffer of last 15 steps
fn steps_per_second(&self) -> f64 {
let len = self.len();
len as f64 / self.steps[0..len].iter().sum::<f64>()
}

fn len(&self) -> usize {
match self.full {
true => 16,
false => self.pos as usize,
}
}
}

impl fmt::Debug for Estimator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Estimate")
.field("steps", &&self.steps[..self.len()])
.field("prev_steps", &self.prev_steps)
.field("prev_time", &self.prev_time)
.finish()
// only reset prev_time, not prev_steps
self.prev_time = now;
self.start_time = now;
}

/// Average time per step in seconds, using double exponential smoothing
fn steps_per_second(&self, now: Instant) -> f64 {
// Because the value stored in the Estimator is only updated when the Estimator receives an
// update, this value will become stuck if progress stalls. To return an accurate estimate,
// we determine how much time has passed since the last update, and treat this as a
// pseudo-update with 0 steps.
let delta_t = duration_to_secs(now - self.prev_time);
let reweight = estimator_weight(delta_t);

// Debiasing:
//
// Our exponentially weighted estimate is a single value (smoothed_steps_per_second) that
// is iteratively updated. At each update, the previous value of the estimate is
// re-weighted according its age. At any point in time, the raw value of this estimate
// reflects the assumption that it contains properly weighted sample values going back
// indefinitely in time. But this assumption is false.
//
// The value is initialized with some value when the estimator starts. The raw value of the
// estimator treats this as an appropriately weighted sample average across all times
// before t=0. Of course, the value is actually arbitrary. In other words, because the raw
// estimate gives a positive weight to this initial value, the resulting estimate will be
// *biased* towards the initial value.
//
// A debiased estimate is the result of correcting the raw estimate by assigning 0 weight
// to the initial value. We can do this with a simple trick: set the initial value to 0,
// and then divide the raw estimate by the estimator weight for all time *since* t=0.
let delta_t_start = duration_to_secs(now - self.start_time);
let debias = 1.0 - estimator_weight(delta_t_start);

// Generate updated values for `smoothed_steps_per_sec` and `double_smoothed_steps_per_sec`
// (sps and dsps) without storing them. Note that we debias sps when using it as a source
// to update dsps, and then debias dsps itself before returning it.
let sps = self.smoothed_steps_per_sec * reweight / debias;
let dsps = self.double_smoothed_steps_per_sec * reweight + sps * (1.0 - reweight);
dsps / debias
}
}

Expand Down Expand Up @@ -560,6 +601,35 @@ impl Default for ProgressFinish {
}
}

/// Get the appropriate dilution weight for Estimator data given the data's age (in seconds)
///
/// Whenever an update occurs, we will create a new estimate using a weight `w_i` like so:
///
/// ```math
/// <new estimate> = <previous estimate> * w_i + <new data> * (1 - w_i)
/// ```
///
/// In other words, the new estimate is a weighted average of the previous estimate and the new
/// data. We want to choose weights such that for any set of samples where `t_0, t_1, ...` are
/// the durations of the samples:
///
/// ```math
/// Sum(t_i) = ews ==> Prod(w_i) = 0.1
/// ```
///
/// With this constraint it is easy to show that
///
/// ```math
/// w_i = 0.1 ^ (t_i / ews)
/// ```
///
/// Notice that the constraint implies that estimates are independent of the durations of the
/// samples, a very useful feature.
fn estimator_weight(age: f64) -> f64 {
const EXPONENTIAL_WEIGHTING_SECONDS: f64 = 15.0;
0.1_f64.powf(age / EXPONENTIAL_WEIGHTING_SECONDS)
}

fn duration_to_secs(d: Duration) -> f64 {
d.as_secs() as f64 + f64::from(d.subsec_nanos()) / 1_000_000_000f64
}
Expand Down Expand Up @@ -593,23 +663,22 @@ mod tests {
let mut est = Estimator::new(now);
let mut pos = 0;

for _ in 0..est.steps.len() {
for _ in 0..20 {
pos += items_per_second;
now += Duration::from_secs(1);
est.record(pos, now);
}
let avg_steps_per_second = est.steps_per_second();
let avg_steps_per_second = est.steps_per_second(now);

assert!(avg_steps_per_second > 0.0);
assert!(avg_steps_per_second.is_finite());

let expected_rate = items_per_second as f64;
let absolute_error = (avg_steps_per_second - expected_rate).abs();
let relative_error = absolute_error / expected_rate;
let absolute_error = (avg_steps_per_second - items_per_second as f64).abs();
let relative_error = absolute_error / items_per_second as f64;
assert!(
relative_error < 1.0 / 1e9,
"Expected rate: {}, actual: {}, relative error: {}",
expected_rate,
items_per_second,
avg_steps_per_second,
relative_error
);
Expand All @@ -627,24 +696,50 @@ mod tests {
}

#[test]
fn test_duration_stuff() {
let duration = Duration::new(42, 100_000_000);
let secs = duration_to_secs(duration);
assert_eq!(secs_to_duration(secs), duration);
fn test_double_exponential_ave() {
let mut now = Instant::now();
let mut est = Estimator::new(now);
let mut pos = 0;

// note: this is the default weight set in the Estimator
let weight = 15;

for _ in 0..weight {
pos += 1;
now += Duration::from_secs(1);
est.record(pos, now);
}
now += Duration::from_secs(weight);

// The first level EWA:
// -> 90% weight @ 0 eps, 9% weight @ 1 eps, 1% weight @ 0 eps
// -> then debiased by deweighting the 1% weight (before -30 seconds)
let single_target = 0.09 / 0.99;

// The second level EWA:
// -> same logic as above, but using the first level EWA as the source
let double_target = (0.9 * single_target + 0.09) / 0.99;
assert_eq!(est.steps_per_second(now), double_target);
}

#[test]
fn test_estimator_rewind_position() {
let now = Instant::now();
let mut now = Instant::now();
let mut est = Estimator::new(now);
est.record(0, now);

now += Duration::from_secs(1);
est.record(1, now);
assert_eq!(est.len(), 1);
// Should not panic.

// should not panic
now += Duration::from_secs(1);
est.record(0, now);
// Assert that the state of the estimator reset on rewind
assert_eq!(est.len(), 0);

// check that reset occurred (estimator at 1 event per sec)
now += Duration::from_secs(1);
est.record(1, now);
assert_eq!(est.steps_per_second(now), 1.0);

// check that progress bar handles manual seeking
let pb = ProgressBar::hidden();
pb.set_length(10);
pb.set_position(1);
Expand All @@ -653,6 +748,29 @@ mod tests {
pb.set_position(0);
}

#[test]
fn test_reset_eta() {
let mut now = Instant::now();
let mut est = Estimator::new(now);

// two per second, then reset
now += Duration::from_secs(1);
est.record(2, now);
est.reset(now);

// now one per second, and verify
now += Duration::from_secs(1);
est.record(3, now);
assert_eq!(est.steps_per_second(now), 1.0);
}

#[test]
fn test_duration_stuff() {
let duration = Duration::new(42, 100_000_000);
let secs = duration_to_secs(duration);
assert_eq!(secs_to_duration(secs), duration);
}

#[test]
fn test_atomic_position_large_time_difference() {
let atomic_position = AtomicPosition::new();
Expand Down

0 comments on commit e31cbf3

Please sign in to comment.