Skip to content

Commit

Permalink
Use atomic state to track position
Browse files Browse the repository at this point in the history
  • Loading branch information
djc committed Mar 13, 2022
1 parent cbafae1 commit ea1f733
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 32 deletions.
25 changes: 20 additions & 5 deletions src/progress_bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};

use crate::draw_target::ProgressDrawTarget;
use crate::state::{BarState, ProgressFinish, Reset, Ticker};
use crate::state::{AtomicPosition, BarState, ProgressFinish, Reset, Ticker};
use crate::style::ProgressStyle;
use crate::ProgressState;
use crate::{ProgressBarIter, ProgressIterator};
Expand All @@ -18,6 +18,7 @@ use crate::{ProgressBarIter, ProgressIterator};
#[derive(Clone)]
pub struct ProgressBar {
state: Arc<Mutex<BarState>>,
pos: Arc<AtomicPosition>,
}

impl fmt::Debug for ProgressBar {
Expand Down Expand Up @@ -46,8 +47,10 @@ impl ProgressBar {

/// Creates a new progress bar with a given length and draw target
pub fn with_draw_target(len: u64, draw_target: ProgressDrawTarget) -> ProgressBar {
let pos = Arc::new(AtomicPosition::default());
ProgressBar {
state: Arc::new(Mutex::new(BarState::new(len, draw_target))),
state: Arc::new(Mutex::new(BarState::new(len, draw_target, pos.clone()))),
pos,
}
}

Expand Down Expand Up @@ -138,7 +141,11 @@ impl ProgressBar {

/// Advances the position of the progress bar by `delta`
pub fn inc(&self, delta: u64) {
self.state().inc(Instant::now(), delta)
self.pos.inc(delta);
let now = Instant::now();
if self.pos.allow(now) {
self.state().tick(now);
}
}

/// A quick convenience check if the progress bar is hidden
Expand Down Expand Up @@ -173,7 +180,11 @@ impl ProgressBar {

/// Sets the position of the progress bar
pub fn set_position(&self, pos: u64) {
self.state().set_position(Instant::now(), pos)
self.pos.set(pos);
let now = Instant::now();
if self.pos.allow(now) {
self.state().tick(now);
}
}

/// Sets the length of the progress bar
Expand Down Expand Up @@ -206,6 +217,7 @@ impl ProgressBar {
pub fn downgrade(&self) -> WeakProgressBar {
WeakProgressBar {
state: Arc::downgrade(&self.state),
pos: Arc::downgrade(&self.pos),
}
}

Expand Down Expand Up @@ -464,6 +476,7 @@ impl ProgressBar {
#[derive(Clone, Default)]
pub struct WeakProgressBar {
state: Weak<Mutex<BarState>>,
pos: Weak<AtomicPosition>,
}

impl WeakProgressBar {
Expand All @@ -479,7 +492,9 @@ impl WeakProgressBar {
///
/// [`ProgressBar`]: struct.ProgressBar.html
pub fn upgrade(&self) -> Option<ProgressBar> {
self.state.upgrade().map(|state| ProgressBar { state })
let state = self.state.upgrade()?;
let pos = self.pos.upgrade()?;
Some(ProgressBar { state, pos })
}
}

Expand Down
111 changes: 87 additions & 24 deletions src/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::borrow::Cow;
use std::fmt;
use std::io;
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::thread;
use std::time::{Duration, Instant};
Expand All @@ -17,12 +18,12 @@ pub(crate) struct BarState {
}

impl BarState {
pub(crate) fn new(len: u64, draw_target: ProgressDrawTarget) -> Self {
pub(crate) fn new(len: u64, draw_target: ProgressDrawTarget, pos: Arc<AtomicPosition>) -> Self {
Self {
draw_target,
on_finish: ProgressFinish::default(),
style: ProgressStyle::default_bar(),
state: ProgressState::new(len),
state: ProgressState::new(len, pos),
ticker: None,
}
}
Expand All @@ -32,13 +33,13 @@ impl BarState {
pub(crate) fn finish_using_style(&mut self, now: Instant, finish: ProgressFinish) {
self.state.status = Status::DoneVisible;
match finish {
ProgressFinish::AndLeave => self.state.pos = self.state.len,
ProgressFinish::AndLeave => self.state.pos.set(self.state.len),
ProgressFinish::WithMessage(msg) => {
self.state.pos = self.state.len;
self.state.pos.set(self.state.len);
self.style.message = msg;
}
ProgressFinish::AndClear => {
self.state.pos = self.state.len;
self.state.pos.set(self.state.len);
self.state.status = Status::DoneHidden;
}
ProgressFinish::Abandon => {}
Expand All @@ -60,7 +61,7 @@ impl BarState {
}

if let Reset::All = mode {
self.state.pos = 0;
self.state.pos.reset(now);
self.state.status = Status::InProgress;
let _ = self.draw(false, now);
}
Expand All @@ -71,16 +72,6 @@ impl BarState {
self.tick(now);
}

pub(crate) fn set_position(&mut self, now: Instant, new: u64) {
self.state.pos = new;
self.tick(now);
}

pub(crate) fn inc(&mut self, now: Instant, delta: u64) {
self.state.pos = self.state.pos.saturating_add(delta);
self.tick(now);
}

pub(crate) fn set_length(&mut self, now: Instant, len: u64) {
self.state.len = len;
self.tick(now);
Expand All @@ -106,7 +97,8 @@ impl BarState {
self.state.tick = self.state.tick.saturating_add(1);
}

self.state.est.record(self.state.pos, now);
let pos = self.state.pos.pos.load(Ordering::Relaxed);
self.state.est.record(pos, now);
let _ = self.draw(false, now);
}

Expand Down Expand Up @@ -181,7 +173,7 @@ pub(crate) enum Reset {
/// The state of a progress bar at a moment in time.
#[non_exhaustive]
pub struct ProgressState {
pos: u64,
pos: Arc<AtomicPosition>,
len: u64,
pub(crate) tick: u64,
pub(crate) started: Instant,
Expand All @@ -190,9 +182,9 @@ pub struct ProgressState {
}

impl ProgressState {
pub(crate) fn new(len: u64) -> Self {
pub(crate) fn new(len: u64, pos: Arc<AtomicPosition>) -> Self {
Self {
pos: 0,
pos,
len,
tick: 0,
status: Status::InProgress,
Expand All @@ -212,7 +204,8 @@ impl ProgressState {

/// Returns the completion as a floating-point number between 0 and 1
pub fn fraction(&self) -> f32 {
let pct = match (self.pos, self.len) {
let pos = self.pos.pos.load(Ordering::Relaxed);
let pct = match (pos, self.len) {
(_, 0) => 1.0,
(0, _) => 0.0,
(pos, len) => pos as f32 / len as f32,
Expand All @@ -225,8 +218,10 @@ impl ProgressState {
if self.len == !0 || self.is_finished() {
return Duration::new(0, 0);
}

let pos = self.pos.pos.load(Ordering::Relaxed);
let t = self.est.seconds_per_step();
secs_to_duration(t * self.len.saturating_sub(self.pos) as f64)
secs_to_duration(t * self.len.saturating_sub(pos) as f64)
}

/// The expected total duration (that is, elapsed time + expected ETA)
Expand All @@ -253,11 +248,11 @@ impl ProgressState {
}

pub fn pos(&self) -> u64 {
self.pos
self.pos.pos.load(Ordering::Relaxed)
}

pub fn set_pos(&mut self, pos: u64) {
self.pos = pos;
self.pos.set(pos);
}

#[allow(clippy::len_without_is_empty)]
Expand Down Expand Up @@ -391,6 +386,74 @@ impl fmt::Debug for Estimator {
}
}

pub(crate) struct AtomicPosition {
pub(crate) pos: AtomicU64,
capacity: AtomicU8,
prev: AtomicU64,
start: Instant,
}

impl AtomicPosition {
pub(crate) fn allow(&self, now: Instant) -> bool {
let mut capacity = self.capacity.load(Ordering::Acquire);
// `prev` is the number of ms after `self.started` we last returned `true`, in ns
let prev = self.prev.load(Ordering::Acquire);
// `elapsed` is the number of ns since `self.started`
let elapsed = (now - self.start).as_nanos() as u64;
// `diff` is the number of ns since we last returned `true`
let diff = elapsed - prev;

// If `capacity` is 0 and not enough time (1ms) has passed since `prev`
// to add new capacity, return `false`. The goal of this method is to
// make this decision as efficient as possible.
if capacity == 0 && diff < INTERVAL {
return false;
}

// We now calculate `new`, the number of ms, in ns, since we last returned `true`,
// and `remainder`, which represents a number of ns less than 1ms which we cannot
// convert into capacity now, so we're saving it for later. We do this by
// substracting this from `elapsed` before storing it into `self.prev`.
let (new, remainder) = ((diff / INTERVAL), (diff % INTERVAL));
// We add `new` to `capacity`, subtract one for returning `true` from here,
// then make sure it does not exceed a maximum of `MAX_BURST`.
capacity = Ord::min(MAX_BURST, capacity + new as u8 - 1);

// Then, we just store `capacity` and `prev` atomically for the next iteration
self.capacity.store(capacity, Ordering::Release);
self.prev.store(elapsed - remainder, Ordering::Release);
true
}

fn reset(&self, now: Instant) {
self.set(0);
let elapsed = (now - self.start).as_millis() as u64;
self.prev.store(elapsed, Ordering::Release);
}

pub(crate) fn inc(&self, delta: u64) {
self.pos.fetch_add(delta, Ordering::SeqCst);
}

pub(crate) fn set(&self, pos: u64) {
self.pos.store(pos, Ordering::Release);
}
}

impl Default for AtomicPosition {
fn default() -> Self {
Self {
pos: AtomicU64::new(0),
capacity: AtomicU8::new(MAX_BURST),
prev: AtomicU64::new(0),
start: Instant::now(),
}
}
}

const INTERVAL: u64 = 1_000_000;
const MAX_BURST: u8 = 10;

/// Behavior of a progress bar when it is finished
///
/// This is invoked when a [`ProgressBar`] or [`ProgressBarIter`] completes and
Expand Down
9 changes: 6 additions & 3 deletions src/style.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,12 +660,14 @@ enum Alignment {
#[cfg(test)]
mod tests {
use super::*;
use crate::state::ProgressState;
use crate::state::{AtomicPosition, ProgressState};
use std::sync::Arc;

#[test]
fn test_expand_template() {
const WIDTH: u16 = 80;
let state = ProgressState::new(10);
let pos = Arc::new(AtomicPosition::default());
let state = ProgressState::new(10, pos);
let mut buf = Vec::new();

let mut style = ProgressStyle::default_bar();
Expand All @@ -688,7 +690,8 @@ mod tests {
set_colors_enabled(true);

const WIDTH: u16 = 80;
let state = ProgressState::new(10);
let pos = Arc::new(AtomicPosition::default());
let state = ProgressState::new(10, pos);
let mut buf = Vec::new();

let mut style = ProgressStyle::default_bar();
Expand Down

0 comments on commit ea1f733

Please sign in to comment.