diff --git a/src/state.rs b/src/state.rs index db9d118b..37bf4a50 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,9 +1,9 @@ use std::borrow::Cow; +use std::io; use std::sync::Arc; use std::time::Duration; #[cfg(not(target_arch = "wasm32"))] use std::time::Instant; -use std::{fmt, io}; #[cfg(target_arch = "wasm32")] use instant::Instant; @@ -224,13 +224,14 @@ pub struct ProgressState { impl ProgressState { pub(crate) fn new(len: Option, pos: Arc) -> Self { + let now = Instant::now(); Self { pos, len, tick: 0, status: Status::InProgress, - started: Instant::now(), - est: Estimator::new(Instant::now()), + started: now, + est: Estimator::new(now), message: TabExpandedString::NoTabs("".into()), prefix: TabExpandedString::NoTabs("".into()), } @@ -270,7 +271,7 @@ impl ProgressState { let pos = self.pos.pos.load(Ordering::Relaxed); - let sps = self.est.steps_per_second(); + let sps = self.est.steps_per_second(Instant::now()); // Infinite duration should only ever happen at the beginning, so in this case it's okay to // just show an ETA of 0 until progress starts to occur. @@ -292,7 +293,7 @@ impl ProgressState { /// The number of steps per second pub fn per_sec(&self) -> f64 { if let Status::InProgress = self.status { - self.est.steps_per_second() + self.est.steps_per_second(Instant::now()) } else { let len = self.len.unwrap_or_else(|| self.pos()); len as f64 / self.started.elapsed().as_secs_f64() @@ -370,80 +371,103 @@ impl TabExpandedString { } } -/// Estimate the number of seconds per step +/// Double-smoothed exponentially weighted estimator +/// +/// This uses an exponentially weighted *time-based* estimator, meaning that it exponentially +/// downweights old data based on its age. The rate at which this occurs is currently a constant +/// value of 15 seconds for 90% weighting. This means that all data older than 15 seconds has a +/// collective weight of 0.1 in the estimate, and all data older than 30 seconds has a collective +/// weight of 0.01, and so on. /// -/// Ring buffer with constant capacity. Used by `ProgressBar`s to display `{eta}`, -/// `{eta_precise}`, and `{*_per_sec}`. +/// The primary value exposed by `Estimator` is `steps_per_second`. This value is doubly-smoothed, +/// meaning that is the result of using an exponentially weighted estimator (as described above) to +/// estimate the value of another exponentially weighted estimator, which estimates the value of +/// the raw data. +/// +/// The purpose of this extra smoothing step is to reduce instantaneous fluctations in the estimate +/// when large updates are received. Without this, estimates might have a large spike followed by a +/// slow asymptotic approach to zero (until the next spike). +#[derive(Debug)] pub(crate) struct Estimator { - steps: [f64; 16], - pos: u8, - full: bool, + smoothed_steps_per_sec: f64, + double_smoothed_steps_per_sec: f64, prev: (u64, Instant), + start: Instant, } impl Estimator { fn new(now: Instant) -> Self { Self { - steps: [0.0; 16], - pos: 0, - full: false, + smoothed_steps_per_sec: 0.0, + double_smoothed_steps_per_sec: 0.0, prev: (0, now), + start: now, } } fn record(&mut self, new: u64, now: Instant) { - let delta = new.saturating_sub(self.prev.0); - if delta == 0 || now < self.prev.1 { + // sanity check: don't record data if time or steps have not advanced + if new <= self.prev.0 || now < self.prev.1 { // Reset on backwards seek to prevent breakage from seeking to the end for length determination // See https://github.com/console-rs/indicatif/issues/480 if new < self.prev.0 { + self.prev.0 = new; self.reset(now); } return; } - let elapsed = now - self.prev.1; - let divisor = delta as f64; - let mut batch = 0.0; - if divisor != 0.0 { - batch = duration_to_secs(elapsed) / divisor; - }; + let delta_steps = new - self.prev.0; + let delta_t = duration_to_secs(now - self.prev.1); - self.steps[self.pos as usize] = batch; - self.pos = (self.pos + 1) % 16; - if !self.full && self.pos == 0 { - self.full = true; - } + // the rate of steps we saw in this update + let new_steps_per_second = delta_steps as f64 / delta_t; + + // update the estimate: a weighted average of the old estimate and new data + let weight = estimator_weight(delta_t); + self.smoothed_steps_per_sec = + self.smoothed_steps_per_sec * weight + new_steps_per_second * (1.0 - weight); + + // correct for bias in initial Estimator value (see steps_per_second comment) + let delta_t_start = duration_to_secs(now - self.start); + let debias = 1.0 - estimator_weight(delta_t_start); + + // determine the double smoothing value (same estimation on top of single value) + self.double_smoothed_steps_per_sec = self.double_smoothed_steps_per_sec * weight + + self.smoothed_steps_per_sec * (1.0 - weight) / debias; self.prev = (new, now); } + /// Reset the state of the estimator. Once reset, estimates will not depend on any data prior + /// to `now`. This does not reset the stored position of the progress bar. pub(crate) fn reset(&mut self, now: Instant) { - self.pos = 0; - self.full = false; - self.prev = (0, now); - } + self.smoothed_steps_per_sec = 0.0; + self.double_smoothed_steps_per_sec = 0.0; - /// Average time per step in seconds, using rolling buffer of last 15 steps - fn steps_per_second(&self) -> f64 { - let len = self.len(); - len as f64 / self.steps[0..len].iter().sum::() + // keep the previous position so we can determine the step delta; only reset the estimator + self.prev = (self.prev.0, now); + self.start = now; } - fn len(&self) -> usize { - match self.full { - true => 16, - false => self.pos as usize, - } - } -} + /// Average time per step in seconds, using double exponential smoothing + fn steps_per_second(&self, now: Instant) -> f64 { + // Because the value stored in the Estimator is only updated when the Estimator receives an + // update, this value will become stuck if progress stalls. To return an accurate estimate, + // we determine how much time has passed since the last update, and treat this as a + // pseudo-update with 0 steps. + let delta_t = duration_to_secs(now - self.prev.1); + let reweight = estimator_weight(delta_t); -impl fmt::Debug for Estimator { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Estimate") - .field("steps", &&self.steps[..self.len()]) - .field("prev", &self.prev) - .finish() + // The Estimator is initialized with zero values, which are included in the underlying + // estimate. We can determine the total weight of data with an age older than the start time + // of the Estimator, and divide this out to get an unbiased estimate. + let delta_t_start = duration_to_secs(now - self.start); + let debias = 1.0 - estimator_weight(delta_t_start); + + let sps = self.smoothed_steps_per_sec * reweight / debias; + let dsps = self.double_smoothed_steps_per_sec * reweight + sps * (1.0 - reweight); + dsps / debias } } @@ -555,6 +579,35 @@ impl Default for ProgressFinish { } } +/// Get the appropriate dilution weight for Estimator data given the data's age (in seconds) +/// +/// Whenever an update occurs, we will create a new estimate using a weight `w_i` like so: +/// +/// ```math +/// = * w_i + * (1 - w_i) +/// ``` +/// +/// In other words, the new estimate is a weighted average of the previous estimate and the new +/// data. We want to choose weights such that for any set of samples where `t_0, t_1, ...` are +/// the durations of the samples: +/// +/// ```math +/// Sum(t_i) = ews ==> Prod(w_i) = 0.1 +/// ``` +/// +/// With this constraint it is easy to show that +/// +/// ```math +/// w_i = 0.1 ^ (t_i / ews) +/// ``` +/// +/// Notice that the constraint implies that estimates are independent of the durations of the +/// samples, a very useful feature. +fn estimator_weight(age: f64) -> f64 { + const EXPONENTIAL_WEIGHTING_SECONDS: f64 = 15.0; + 0.1_f64.powf(age / EXPONENTIAL_WEIGHTING_SECONDS) +} + fn duration_to_secs(d: Duration) -> f64 { d.as_secs() as f64 + f64::from(d.subsec_nanos()) / 1_000_000_000f64 } @@ -588,23 +641,22 @@ mod tests { let mut est = Estimator::new(now); let mut pos = 0; - for _ in 0..est.steps.len() { + for _ in 0..20 { pos += items_per_second; now += Duration::from_secs(1); est.record(pos, now); } - let avg_steps_per_second = est.steps_per_second(); + let avg_steps_per_second = est.steps_per_second(now); assert!(avg_steps_per_second > 0.0); assert!(avg_steps_per_second.is_finite()); - let expected_rate = items_per_second as f64; - let absolute_error = (avg_steps_per_second - expected_rate).abs(); - let relative_error = absolute_error / expected_rate; + let absolute_error = (avg_steps_per_second - items_per_second as f64).abs(); + let relative_error = absolute_error / items_per_second as f64; assert!( relative_error < 1.0 / 1e9, "Expected rate: {}, actual: {}, relative error: {}", - expected_rate, + items_per_second, avg_steps_per_second, relative_error ); @@ -622,24 +674,50 @@ mod tests { } #[test] - fn test_duration_stuff() { - let duration = Duration::new(42, 100_000_000); - let secs = duration_to_secs(duration); - assert_eq!(secs_to_duration(secs), duration); + fn test_double_exponential_ave() { + let mut now = Instant::now(); + let mut est = Estimator::new(now); + let mut pos = 0; + + // note: this is the default weight set in the Estimator + let weight = 15; + + for _ in 0..weight { + pos += 1; + now += Duration::from_secs(1); + est.record(pos, now); + } + now += Duration::from_secs(weight); + + // The first level EWA: + // -> 90% weight @ 0 eps, 9% weight @ 1 eps, 1% weight @ 0 eps + // -> then debiased by deweighting the 1% weight (before -30 seconds) + let single_target = 0.09 / 0.99; + + // The second level EWA: + // -> same logic as above, but using the first level EWA as the source + let double_target = (0.9 * single_target + 0.09) / 0.99; + assert_eq!(est.steps_per_second(now), double_target); } #[test] fn test_estimator_rewind_position() { - let now = Instant::now(); + let mut now = Instant::now(); let mut est = Estimator::new(now); - est.record(0, now); + + now += Duration::from_secs(1); est.record(1, now); - assert_eq!(est.len(), 1); - // Should not panic. + + // should not panic + now += Duration::from_secs(1); est.record(0, now); - // Assert that the state of the estimator reset on rewind - assert_eq!(est.len(), 0); + // check that reset occurred (estimator at 1 event per sec) + now += Duration::from_secs(1); + est.record(1, now); + assert_eq!(est.steps_per_second(now), 1.0); + + // check that progress bar handles manual seeking let pb = ProgressBar::hidden(); pb.set_length(10); pb.set_position(1); @@ -648,6 +726,29 @@ mod tests { pb.set_position(0); } + #[test] + fn test_reset_eta() { + let mut now = Instant::now(); + let mut est = Estimator::new(now); + + // two per second, then reset + now += Duration::from_secs(1); + est.record(2, now); + est.reset(now); + + // now one per second, and verify + now += Duration::from_secs(1); + est.record(3, now); + assert_eq!(est.steps_per_second(now), 1.0); + } + + #[test] + fn test_duration_stuff() { + let duration = Duration::new(42, 100_000_000); + let secs = duration_to_secs(duration); + assert_eq!(secs_to_duration(secs), duration); + } + #[test] fn test_atomic_position_large_time_difference() { let atomic_position = AtomicPosition::new(); @@ -656,3 +757,4 @@ mod tests { atomic_position.allow(later); } } +