From 01a696c3d0085df9753f2e8948b87e2911a60a22 Mon Sep 17 00:00:00 2001 From: Adam Fontenot Date: Wed, 10 May 2023 02:16:48 -0400 Subject: [PATCH 1/4] refactor estimator to use steps/sec instead of secs/step --- src/state.rs | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/src/state.rs b/src/state.rs index 0452f58f..4387ba16 100644 --- a/src/state.rs +++ b/src/state.rs @@ -275,8 +275,16 @@ impl ProgressState { }; let pos = self.pos.pos.load(Ordering::Relaxed); - let t = self.est.seconds_per_step(); - secs_to_duration(t * len.saturating_sub(pos) as f64) + + let sps = self.est.steps_per_second(); + + // Infinite duration should only ever happen at the beginning, so in this case it's okay to + // just show an ETA of 0 until progress starts to occur. + if sps == 0.0 { + return Duration::new(0, 0); + } + + secs_to_duration(len.saturating_sub(pos) as f64 / sps) } /// The expected total duration (that is, elapsed time + expected ETA) @@ -290,10 +298,7 @@ impl ProgressState { /// The number of steps per second pub fn per_sec(&self) -> f64 { if let Status::InProgress = self.status { - match 1.0 / self.est.seconds_per_step() { - per_sec if per_sec.is_nan() => 0.0, - per_sec => per_sec, - } + self.est.steps_per_second() } else { let len = self.len.unwrap_or_else(|| self.pos()); len as f64 / self.started.elapsed().as_secs_f64() @@ -426,9 +431,9 @@ impl Estimator { } /// Average time per step in seconds, using rolling buffer of last 15 steps - fn seconds_per_step(&self) -> f64 { + fn steps_per_second(&self) -> f64 { let len = self.len(); - self.steps[0..len].iter().sum::() / len as f64 + len as f64 / self.steps[0..len].iter().sum::() } fn len(&self) -> usize { @@ -583,7 +588,7 @@ mod tests { // https://github.com/rust-lang/rust-clippy/issues/10281 #[allow(clippy::uninlined_format_args)] #[test] - fn test_time_per_step() { + fn test_steps_per_second() { let test_rate = |items_per_second| { let mut now = Instant::now(); let mut est = Estimator::new(now); @@ -594,19 +599,20 @@ mod tests { now += Duration::from_secs(1); est.record(pos, now); } - let avg_seconds_per_step = est.seconds_per_step(); + let avg_steps_per_second = est.steps_per_second(); - assert!(avg_seconds_per_step > 0.0); - assert!(avg_seconds_per_step.is_finite()); + assert!(avg_steps_per_second > 0.0); + assert!(avg_steps_per_second.is_finite()); - let expected_rate = 1.0 / items_per_second as f64; - let absolute_error = (avg_seconds_per_step - expected_rate).abs(); + let expected_rate = items_per_second as f64; + let absolute_error = (avg_steps_per_second - expected_rate).abs(); + let relative_error = absolute_error / expected_rate; assert!( - absolute_error < f64::EPSILON, - "Expected rate: {}, actual: {}, absolute error: {}", + relative_error < 1.0 / 1e9, + "Expected rate: {}, actual: {}, relative error: {}", expected_rate, - avg_seconds_per_step, - absolute_error + avg_steps_per_second, + relative_error ); }; From 7fc86206ac8e82d9252fe39feedbc1fcd6a6dad0 Mon Sep 17 00:00:00 2001 From: Adam Fontenot Date: Sun, 28 May 2023 11:28:23 -0400 Subject: [PATCH 2/4] Refactor estimator's prev tuple into separate elements It's easier to read separate state tracking fields for steps and time than to store them together in a single tuple. No functional change intended. --- src/state.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/state.rs b/src/state.rs index 4387ba16..9e36aeec 100644 --- a/src/state.rs +++ b/src/state.rs @@ -384,7 +384,8 @@ pub(crate) struct Estimator { steps: [f64; 16], pos: u8, full: bool, - prev: (u64, Instant), + prev_steps: u64, + prev_time: Instant, } impl Estimator { @@ -393,22 +394,23 @@ impl Estimator { steps: [0.0; 16], pos: 0, full: false, - prev: (0, now), + prev_steps: 0, + prev_time: now, } } - fn record(&mut self, new: u64, now: Instant) { - let delta = new.saturating_sub(self.prev.0); - if delta == 0 || now < self.prev.1 { + fn record(&mut self, new_steps: u64, now: Instant) { + let delta = new_steps.saturating_sub(self.prev_steps); + if delta == 0 || now < self.prev_time { // Reset on backwards seek to prevent breakage from seeking to the end for length determination // See https://github.com/console-rs/indicatif/issues/480 - if new < self.prev.0 { + if new_steps < self.prev_steps { self.reset(now); } return; } - let elapsed = now - self.prev.1; + let elapsed = now - self.prev_time; let divisor = delta as f64; let mut batch = 0.0; if divisor != 0.0 { @@ -421,13 +423,15 @@ impl Estimator { self.full = true; } - self.prev = (new, now); + self.prev_steps = new_steps; + self.prev_time = now; } pub(crate) fn reset(&mut self, now: Instant) { self.pos = 0; self.full = false; - self.prev = (0, now); + self.prev_steps = 0; + self.prev_time = now; } /// Average time per step in seconds, using rolling buffer of last 15 steps @@ -448,7 +452,8 @@ impl fmt::Debug for Estimator { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Estimate") .field("steps", &&self.steps[..self.len()]) - .field("prev", &self.prev) + .field("prev_steps", &self.prev_steps) + .field("prev_time", &self.prev_time) .finish() } } From dc3aa4090ca3e8d278d6b21272e3dbe4bb87ea25 Mon Sep 17 00:00:00 2001 From: Adam Fontenot Date: Sat, 3 Jun 2023 17:25:32 -0400 Subject: [PATCH 3/4] Switch Estimator to use an double exponential time-based weighting This is an implementation of an exponentially weighted running average with a tuning parameter (currently a constant in this implementation): * the `ews` parameter is a number of seconds. Progress the Estimator has observed that is older than this value will receive a total weight of 0.1 in the rate estimation, and newer values will receive a total of 0.9. The default is 15 seconds. This implementation does double smoothing by applying the running average to itself. The result avoids undesirable instantaneous movements in the estimate when large updates occur. The exponential estimator works by keeping a running tally, where an existing tally that has aged `t` seconds is reweighted such that weight ^ (ewa / age) = 0.1 For instance, data aged 5 seconds with a 15 second weight parameter would receive `weight = 0.1 ^ (5/15) = 0.464`. If it then ages another 10 seconds, it would receive `weight = 0.1 ^ (10/15) = 0.215`. After being multiplied by both weights, it would have a weight of `0.464 * 0.215 = 0.1`, as expected. A couple of basic features are also implemented for higher quality estimates: * We divide out any weight given to data before the estimator was initialized. This is called "normalization" in the code, because it renormalizes the weights in the weighted average to sum to 1. * When returning an estimate, we include the time since the last updated was received as non-progress, which means that the estimator does not freeze when progress stalls. --- src/state.rs | 262 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 193 insertions(+), 69 deletions(-) diff --git a/src/state.rs b/src/state.rs index 9e36aeec..43a5095c 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,9 +1,9 @@ use std::borrow::Cow; +use std::io; use std::sync::Arc; use std::time::Duration; #[cfg(not(target_arch = "wasm32"))] use std::time::Instant; -use std::{fmt, io}; #[cfg(target_arch = "wasm32")] use instant::Instant; @@ -230,13 +230,14 @@ pub struct ProgressState { impl ProgressState { pub(crate) fn new(len: Option, pos: Arc) -> Self { + let now = Instant::now(); Self { pos, len, tick: 0, status: Status::InProgress, - started: Instant::now(), - est: Estimator::new(Instant::now()), + started: now, + est: Estimator::new(now), message: TabExpandedString::NoTabs("".into()), prefix: TabExpandedString::NoTabs("".into()), } @@ -276,7 +277,7 @@ impl ProgressState { let pos = self.pos.pos.load(Ordering::Relaxed); - let sps = self.est.steps_per_second(); + let sps = self.est.steps_per_second(Instant::now()); // Infinite duration should only ever happen at the beginning, so in this case it's okay to // just show an ETA of 0 until progress starts to occur. @@ -298,7 +299,7 @@ impl ProgressState { /// The number of steps per second pub fn per_sec(&self) -> f64 { if let Status::InProgress = self.status { - self.est.steps_per_second() + self.est.steps_per_second(Instant::now()) } else { let len = self.len.unwrap_or_else(|| self.pos()); len as f64 / self.started.elapsed().as_secs_f64() @@ -376,85 +377,131 @@ impl TabExpandedString { } } -/// Estimate the number of seconds per step +/// Double-smoothed exponentially weighted estimator +/// +/// This uses an exponentially weighted *time-based* estimator, meaning that it exponentially +/// downweights old data based on its age. The rate at which this occurs is currently a constant +/// value of 15 seconds for 90% weighting. This means that all data older than 15 seconds has a +/// collective weight of 0.1 in the estimate, and all data older than 30 seconds has a collective +/// weight of 0.01, and so on. /// -/// Ring buffer with constant capacity. Used by `ProgressBar`s to display `{eta}`, -/// `{eta_precise}`, and `{*_per_sec}`. +/// The primary value exposed by `Estimator` is `steps_per_second`. This value is doubly-smoothed, +/// meaning that is the result of using an exponentially weighted estimator (as described above) to +/// estimate the value of another exponentially weighted estimator, which estimates the value of +/// the raw data. +/// +/// The purpose of this extra smoothing step is to reduce instantaneous fluctations in the estimate +/// when large updates are received. Without this, estimates might have a large spike followed by a +/// slow asymptotic approach to zero (until the next spike). +#[derive(Debug)] pub(crate) struct Estimator { - steps: [f64; 16], - pos: u8, - full: bool, + smoothed_steps_per_sec: f64, + double_smoothed_steps_per_sec: f64, prev_steps: u64, prev_time: Instant, + start_time: Instant, } impl Estimator { fn new(now: Instant) -> Self { Self { - steps: [0.0; 16], - pos: 0, - full: false, + smoothed_steps_per_sec: 0.0, + double_smoothed_steps_per_sec: 0.0, prev_steps: 0, prev_time: now, + start_time: now, } } fn record(&mut self, new_steps: u64, now: Instant) { - let delta = new_steps.saturating_sub(self.prev_steps); - if delta == 0 || now < self.prev_time { + // sanity check: don't record data if time or steps have not advanced + if new_steps <= self.prev_steps || now <= self.prev_time { // Reset on backwards seek to prevent breakage from seeking to the end for length determination // See https://github.com/console-rs/indicatif/issues/480 if new_steps < self.prev_steps { + self.prev_steps = new_steps; self.reset(now); } return; } - let elapsed = now - self.prev_time; - let divisor = delta as f64; - let mut batch = 0.0; - if divisor != 0.0 { - batch = duration_to_secs(elapsed) / divisor; - }; + let delta_steps = new_steps - self.prev_steps; + let delta_t = duration_to_secs(now - self.prev_time); - self.steps[self.pos as usize] = batch; - self.pos = (self.pos + 1) % 16; - if !self.full && self.pos == 0 { - self.full = true; - } + // the rate of steps we saw in this update + let new_steps_per_second = delta_steps as f64 / delta_t; + + // update the estimate: a weighted average of the old estimate and new data + let weight = estimator_weight(delta_t); + self.smoothed_steps_per_sec = + self.smoothed_steps_per_sec * weight + new_steps_per_second * (1.0 - weight); + + // An iterative estimate like `smoothed_steps_per_sec` is supposed to be an exponentially + // weighted average from t=0 back to t=-inf; Since we initialize it to 0, we neglect the + // (non-existent) samples in the weighted average prior to the first one, so the resulting + // average must be normalized. We normalize the single estimate here in order to use it as + // a source for the double smoothed estimate. See comment on normalization in + // `steps_per_second` for details. + let delta_t_start = duration_to_secs(now - self.start_time); + let total_weight = 1.0 - estimator_weight(delta_t_start); + let normalized_smoothed_steps_per_sec = self.smoothed_steps_per_sec / total_weight; + + // determine the double smoothed value (EWA smoothing of the single EWA) + self.double_smoothed_steps_per_sec = self.double_smoothed_steps_per_sec * weight + + normalized_smoothed_steps_per_sec * (1.0 - weight); self.prev_steps = new_steps; self.prev_time = now; } + /// Reset the state of the estimator. Once reset, estimates will not depend on any data prior + /// to `now`. This does not reset the stored position of the progress bar. pub(crate) fn reset(&mut self, now: Instant) { - self.pos = 0; - self.full = false; - self.prev_steps = 0; - self.prev_time = now; - } + self.smoothed_steps_per_sec = 0.0; + self.double_smoothed_steps_per_sec = 0.0; - /// Average time per step in seconds, using rolling buffer of last 15 steps - fn steps_per_second(&self) -> f64 { - let len = self.len(); - len as f64 / self.steps[0..len].iter().sum::() - } - - fn len(&self) -> usize { - match self.full { - true => 16, - false => self.pos as usize, - } - } -} - -impl fmt::Debug for Estimator { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Estimate") - .field("steps", &&self.steps[..self.len()]) - .field("prev_steps", &self.prev_steps) - .field("prev_time", &self.prev_time) - .finish() + // only reset prev_time, not prev_steps + self.prev_time = now; + self.start_time = now; + } + + /// Average time per step in seconds, using double exponential smoothing + fn steps_per_second(&self, now: Instant) -> f64 { + // Because the value stored in the Estimator is only updated when the Estimator receives an + // update, this value will become stuck if progress stalls. To return an accurate estimate, + // we determine how much time has passed since the last update, and treat this as a + // pseudo-update with 0 steps. + let delta_t = duration_to_secs(now - self.prev_time); + let reweight = estimator_weight(delta_t); + + // Normalization of estimates: + // + // The raw estimate is a single value (smoothed_steps_per_second) that is iteratively + // updated. At each update, the previous value of the estimate is downweighted according to + // its age, receiving the iterative weight W(t) = 0.1 ^ (t/15). + // + // Since W(Sum(t_n)) = Prod(W(t_n)), the total weight of a sample after a series of + // iterative steps is simply W(t_e) - W(t_b), where t_e is the time since the end of the + // sample, and t_b is the time since the beginning. The resulting estimate is therefore a + // weighted average with sample weights W(t_e) - W(t_b). + // + // Notice that the weighting function generates sample weights that sum to 1 only when the + // sample times span from t=0 to t=inf; but this is not the case. We have a first sample + // with finite, positive t_b = t_f. In the raw estimate, we handle times prior to t_f by + // setting an initial value of 0, meaning that these (non-existent) samples have no weight. + // + // Therefore, the raw estimate must be normalized by dividing it by the sum of the weights + // in the weighted average. This sum is just W(0) - W(t_f), where t_f is the time since the + // first sample, and W(0) = 1. + let delta_t_start = duration_to_secs(now - self.start_time); + let total_weight = 1.0 - estimator_weight(delta_t_start); + + // Generate updated values for `smoothed_steps_per_sec` and `double_smoothed_steps_per_sec` + // (sps and dsps) without storing them. Note that we normalize sps when using it as a + // source to update dsps, and then normalize dsps itself before returning it. + let sps = self.smoothed_steps_per_sec * reweight / total_weight; + let dsps = self.double_smoothed_steps_per_sec * reweight + sps * (1.0 - reweight); + dsps / total_weight } } @@ -566,6 +613,35 @@ impl Default for ProgressFinish { } } +/// Get the appropriate dilution weight for Estimator data given the data's age (in seconds) +/// +/// Whenever an update occurs, we will create a new estimate using a weight `w_i` like so: +/// +/// ```math +/// = * w_i + * (1 - w_i) +/// ``` +/// +/// In other words, the new estimate is a weighted average of the previous estimate and the new +/// data. We want to choose weights such that for any set of samples where `t_0, t_1, ...` are +/// the durations of the samples: +/// +/// ```math +/// Sum(t_i) = ews ==> Prod(w_i) = 0.1 +/// ``` +/// +/// With this constraint it is easy to show that +/// +/// ```math +/// w_i = 0.1 ^ (t_i / ews) +/// ``` +/// +/// Notice that the constraint implies that estimates are independent of the durations of the +/// samples, a very useful feature. +fn estimator_weight(age: f64) -> f64 { + const EXPONENTIAL_WEIGHTING_SECONDS: f64 = 15.0; + 0.1_f64.powf(age / EXPONENTIAL_WEIGHTING_SECONDS) +} + fn duration_to_secs(d: Duration) -> f64 { d.as_secs() as f64 + f64::from(d.subsec_nanos()) / 1_000_000_000f64 } @@ -599,23 +675,22 @@ mod tests { let mut est = Estimator::new(now); let mut pos = 0; - for _ in 0..est.steps.len() { + for _ in 0..20 { pos += items_per_second; now += Duration::from_secs(1); est.record(pos, now); } - let avg_steps_per_second = est.steps_per_second(); + let avg_steps_per_second = est.steps_per_second(now); assert!(avg_steps_per_second > 0.0); assert!(avg_steps_per_second.is_finite()); - let expected_rate = items_per_second as f64; - let absolute_error = (avg_steps_per_second - expected_rate).abs(); - let relative_error = absolute_error / expected_rate; + let absolute_error = (avg_steps_per_second - items_per_second as f64).abs(); + let relative_error = absolute_error / items_per_second as f64; assert!( relative_error < 1.0 / 1e9, "Expected rate: {}, actual: {}, relative error: {}", - expected_rate, + items_per_second, avg_steps_per_second, relative_error ); @@ -633,24 +708,50 @@ mod tests { } #[test] - fn test_duration_stuff() { - let duration = Duration::new(42, 100_000_000); - let secs = duration_to_secs(duration); - assert_eq!(secs_to_duration(secs), duration); + fn test_double_exponential_ave() { + let mut now = Instant::now(); + let mut est = Estimator::new(now); + let mut pos = 0; + + // note: this is the default weight set in the Estimator + let weight = 15; + + for _ in 0..weight { + pos += 1; + now += Duration::from_secs(1); + est.record(pos, now); + } + now += Duration::from_secs(weight); + + // The first level EWA: + // -> 90% weight @ 0 eps, 9% weight @ 1 eps, 1% weight @ 0 eps + // -> then normalized by deweighting the 1% weight (before -30 seconds) + let single_target = 0.09 / 0.99; + + // The second level EWA: + // -> same logic as above, but using the first level EWA as the source + let double_target = (0.9 * single_target + 0.09) / 0.99; + assert_eq!(est.steps_per_second(now), double_target); } #[test] fn test_estimator_rewind_position() { - let now = Instant::now(); + let mut now = Instant::now(); let mut est = Estimator::new(now); - est.record(0, now); + + now += Duration::from_secs(1); est.record(1, now); - assert_eq!(est.len(), 1); - // Should not panic. + + // should not panic + now += Duration::from_secs(1); est.record(0, now); - // Assert that the state of the estimator reset on rewind - assert_eq!(est.len(), 0); + // check that reset occurred (estimator at 1 event per sec) + now += Duration::from_secs(1); + est.record(1, now); + assert_eq!(est.steps_per_second(now), 1.0); + + // check that progress bar handles manual seeking let pb = ProgressBar::hidden(); pb.set_length(10); pb.set_position(1); @@ -659,6 +760,29 @@ mod tests { pb.set_position(0); } + #[test] + fn test_reset_eta() { + let mut now = Instant::now(); + let mut est = Estimator::new(now); + + // two per second, then reset + now += Duration::from_secs(1); + est.record(2, now); + est.reset(now); + + // now one per second, and verify + now += Duration::from_secs(1); + est.record(3, now); + assert_eq!(est.steps_per_second(now), 1.0); + } + + #[test] + fn test_duration_stuff() { + let duration = Duration::new(42, 100_000_000); + let secs = duration_to_secs(duration); + assert_eq!(secs_to_duration(secs), duration); + } + #[test] fn test_atomic_position_large_time_difference() { let atomic_position = AtomicPosition::new(); From 1d0668d1f8017b61396c7a6479101ea16969bb39 Mon Sep 17 00:00:00 2001 From: Adam Fontenot Date: Sat, 3 Jun 2023 17:39:19 -0400 Subject: [PATCH 4/4] bump version to 0.17.5 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 412a8195..8c841d22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "indicatif" description = "A progress bar and cli reporting library for Rust" -version = "0.17.4" +version = "0.17.5" keywords = ["cli", "progress", "pb", "colors", "progressbar"] categories = ["command-line-interface"] license = "MIT"