From 5f1e523a244e9f1dcc0f4036eefbbe633d091c0c Mon Sep 17 00:00:00 2001 From: AJ Bagwell Date: Mon, 17 May 2021 11:46:13 +0100 Subject: [PATCH 1/3] Extract MultiProgress::clear to it's own method --- src/progress_bar.rs | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/progress_bar.rs b/src/progress_bar.rs index 121054b4..d010314a 100644 --- a/src/progress_bar.rs +++ b/src/progress_bar.rs @@ -757,20 +757,27 @@ impl MultiProgress { } if clear { - let mut state = self.state.write().unwrap(); - state.draw_target.apply_draw_state(ProgressDrawState { - lines: vec![], - orphan_lines: 0, - finished: true, - force_draw: true, - move_cursor, - })?; + self.clear()?; } self.joining.store(false, Ordering::Release); Ok(()) } + + pub fn clear(&self) -> io::Result<()> { + let mut state = self.state.write().unwrap(); + let move_cursor = state.move_cursor; + state.draw_target.apply_draw_state(ProgressDrawState { + lines: vec![], + orphan_lines: 0, + finished: true, + force_draw: true, + move_cursor, + })?; + + Ok(()) + } } /// A weak reference to a `ProgressBar`. From 969336c87a2382803344517c07c6a9127379489b Mon Sep 17 00:00:00 2001 From: AJ Bagwell Date: Mon, 17 May 2021 11:55:25 +0100 Subject: [PATCH 2/3] Remove MultiProgress::join() Instead of drawing from a thread blocked on MultiProgress::join(), draw directly from the threads updating the ProgressBars in the MultiProgress, using write access to MultiProgress's state to synchronize terminal updates. --- examples/finebars.rs | 43 +++++---- examples/morebars.rs | 31 +++---- examples/multi-tree-ext.rs | 78 ++++++++-------- examples/multi-tree.rs | 5 +- examples/multi.rs | 8 +- examples/yarnish.rs | 39 ++++---- src/progress_bar.rs | 179 +++++-------------------------------- src/state.rs | 103 ++++++++++++++++----- tests/multi-autodrop.rs | 33 ++----- 9 files changed, 209 insertions(+), 310 deletions(-) diff --git a/examples/finebars.rs b/examples/finebars.rs index e92d9086..d8f05bbf 100644 --- a/examples/finebars.rs +++ b/examples/finebars.rs @@ -15,24 +15,29 @@ fn main() { let m = MultiProgress::new(); - for s in styles.iter() { - let pb = m.add(ProgressBar::new(512)); - pb.set_style( - ProgressStyle::default_bar() - .template(&format!("{{prefix:.bold}}▕{{bar:.{}}}▏{{msg}}", s.2)) - .progress_chars(s.1), - ); - pb.set_prefix(s.0); - let wait = Duration::from_millis(thread_rng().gen_range(10..30)); - thread::spawn(move || { - for i in 0..512 { - pb.inc(1); - pb.set_message(format!("{:3}%", 100 * i / 512)); - thread::sleep(wait); - } - pb.finish_with_message("100%"); - }); - } + let handles: Vec<_> = styles + .iter() + .map(|s| { + let pb = m.add(ProgressBar::new(512)); + pb.set_style( + ProgressStyle::default_bar() + .template(&format!("{{prefix:.bold}}▕{{bar:.{}}}▏{{msg}}", s.2)) + .progress_chars(s.1), + ); + pb.set_prefix(s.0); + let wait = Duration::from_millis(thread_rng().gen_range(10..30)); + thread::spawn(move || { + for i in 0..512 { + pb.inc(1); + pb.set_message(format!("{:3}%", 100 * i / 512)); + thread::sleep(wait); + } + pb.finish_with_message("100%"); + }) + }) + .collect(); - m.join().unwrap(); + for h in handles { + let _ = h.join(); + } } diff --git a/examples/morebars.rs b/examples/morebars.rs index cbbe64de..06d19b89 100644 --- a/examples/morebars.rs +++ b/examples/morebars.rs @@ -11,23 +11,18 @@ fn main() { let pb = m.add(ProgressBar::new(5)); pb.set_style(sty.clone()); - let m2 = m.clone(); - let _ = thread::spawn(move || { - // make sure we show up at all. otherwise no rendering - // event. - pb.tick(); - for _ in 0..5 { - let pb2 = m2.add(ProgressBar::new(128)); - pb2.set_style(sty.clone()); - for _ in 0..128 { - pb2.inc(1); - thread::sleep(Duration::from_millis(5)); - } - pb2.finish(); - pb.inc(1); + // make sure we show up at all. otherwise no rendering + // event. + pb.tick(); + for _ in 0..5 { + let pb2 = m.add(ProgressBar::new(128)); + pb2.set_style(sty.clone()); + for _ in 0..128 { + pb2.inc(1); + thread::sleep(Duration::from_millis(5)); } - pb.finish_with_message("done"); - }); - - m.join().unwrap(); + pb2.finish(); + pb.inc(1); + } + pb.finish_with_message("done"); } diff --git a/examples/multi-tree-ext.rs b/examples/multi-tree-ext.rs index 8fb0c7f0..5ffd069e 100644 --- a/examples/multi-tree-ext.rs +++ b/examples/multi-tree-ext.rs @@ -189,51 +189,47 @@ pub fn main() { let mut items: Vec<&Item> = Vec::with_capacity(ELEMENTS.len()); let mp2 = Arc::clone(&mp); - let _ = thread::spawn(move || { - let mut rng = ThreadRng::default(); - pb_main.tick(); - loop { - match get_action(&mut rng, &items) { - Action::Stop => { - // all elements were exhausted - pb_main.finish(); - return; + let mut rng = ThreadRng::default(); + pb_main.tick(); + loop { + match get_action(&mut rng, &items) { + Action::Stop => { + // all elements were exhausted + pb_main.finish(); + return; + } + Action::ModifyTree(elem_idx) => match &ELEMENTS[elem_idx] { + Elem::AddItem(item) => { + let pb = mp2.insert(item.index + 1, item.progress_bar.clone()); + pb.set_prefix(" ".repeat(item.indent)); + pb.set_message(&item.key); + items.insert(item.index, &item); + } + Elem::RemoveItem(Index(index)) => { + let item = items.remove(*index); + let pb = &item.progress_bar; + mp2.remove(pb); + pb_main.inc(pb.length() - pb.position()); } - Action::ModifyTree(elem_idx) => match &ELEMENTS[elem_idx] { - Elem::AddItem(item) => { - let pb = mp2.insert(item.index + 1, item.progress_bar.clone()); - pb.set_prefix(" ".repeat(item.indent)); - pb.set_message(&item.key); - items.insert(item.index, &item); - } - Elem::RemoveItem(Index(index)) => { - let item = items.remove(*index); - let pb = &item.progress_bar; - mp2.remove(pb); - pb_main.inc(pb.length() - pb.position()); - } - }, - Action::IncProgressBar(item_idx) => { - let item = &items[item_idx]; - item.progress_bar.inc(1); - let pos = item.progress_bar.position(); - let len = item.progress_bar.length(); - if pos >= len { - item.progress_bar.set_style(sty_fin.clone()); - item.progress_bar.finish_with_message(format!( - "{} {}", - style("✔").green(), - item.key - )); - } - pb_main.inc(1); + }, + Action::IncProgressBar(item_idx) => { + let item = &items[item_idx]; + item.progress_bar.inc(1); + let pos = item.progress_bar.position(); + let len = item.progress_bar.length(); + if pos >= len { + item.progress_bar.set_style(sty_fin.clone()); + item.progress_bar.finish_with_message(format!( + "{} {}", + style("✔").green(), + item.key + )); } + pb_main.inc(1); } - thread::sleep(Duration::from_millis(20)); } - }); - - mp.join().unwrap(); + thread::sleep(Duration::from_millis(20)); + } } /// The function guarantees to return the action, that is valid for the current tree. diff --git a/examples/multi-tree.rs b/examples/multi-tree.rs index a738c668..459e6b09 100644 --- a/examples/multi-tree.rs +++ b/examples/multi-tree.rs @@ -135,9 +135,8 @@ fn main() { } thread::sleep(Duration::from_millis(15)); } - }); - - mp.join().unwrap(); + }) + .join(); println!("==============================="); println!("the tree should be the same as:"); diff --git a/examples/multi.rs b/examples/multi.rs index 42ae6625..55a4c950 100644 --- a/examples/multi.rs +++ b/examples/multi.rs @@ -11,7 +11,7 @@ fn main() { let pb = m.add(ProgressBar::new(128)); pb.set_style(sty.clone()); - let _ = thread::spawn(move || { + let h1 = thread::spawn(move || { for i in 0..128 { pb.set_message(format!("item #{}", i + 1)); pb.inc(1); @@ -22,7 +22,7 @@ fn main() { let pb = m.add(ProgressBar::new(128)); pb.set_style(sty.clone()); - let _ = thread::spawn(move || { + let h2 = thread::spawn(move || { for _ in 0..3 { pb.set_position(0); for i in 0..128 { @@ -45,5 +45,7 @@ fn main() { pb.finish_with_message("done"); }); - m.join_and_clear().unwrap(); + let _ = h1.join(); + let _ = h2.join(); + m.clear().unwrap(); } diff --git a/examples/yarnish.rs b/examples/yarnish.rs index 19b56f25..4abb161a 100644 --- a/examples/yarnish.rs +++ b/examples/yarnish.rs @@ -69,24 +69,29 @@ pub fn main() { PAPER ); let m = MultiProgress::new(); - for i in 0..4 { - let count = rng.gen_range(30..80); - let pb = m.add(ProgressBar::new(count)); - pb.set_style(spinner_style.clone()); - pb.set_prefix(format!("[{}/?]", i + 1)); - let _ = thread::spawn(move || { - let mut rng = rand::thread_rng(); - let pkg = PACKAGES.choose(&mut rng).unwrap(); - for _ in 0..count { - let cmd = COMMANDS.choose(&mut rng).unwrap(); - pb.set_message(format!("{}: {}", pkg, cmd)); - pb.inc(1); - thread::sleep(Duration::from_millis(rng.gen_range(25..200))); - } - pb.finish_with_message("waiting..."); - }); + let handles: Vec<_> = (0..4u32) + .map(|i| { + let count = rng.gen_range(30..80); + let pb = m.add(ProgressBar::new(count)); + pb.set_style(spinner_style.clone()); + pb.set_prefix(format!("[{}/?]", i + 1)); + thread::spawn(move || { + let mut rng = rand::thread_rng(); + let pkg = PACKAGES.choose(&mut rng).unwrap(); + for _ in 0..count { + let cmd = COMMANDS.choose(&mut rng).unwrap(); + pb.set_message(format!("{}: {}", pkg, cmd)); + pb.inc(1); + thread::sleep(Duration::from_millis(rng.gen_range(25..200))); + } + pb.finish_with_message("waiting..."); + }) + }) + .collect(); + for h in handles { + let _ = h.join(); } - m.join_and_clear().unwrap(); + m.clear().unwrap(); println!("{} Done in {}", SPARKLE, HumanDuration(started.elapsed())); } diff --git a/src/progress_bar.rs b/src/progress_bar.rs index d010314a..36977f93 100644 --- a/src/progress_bar.rs +++ b/src/progress_bar.rs @@ -1,14 +1,13 @@ use std::borrow::Cow; use std::fmt; use std::io; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex, RwLock, Weak}; +use std::sync::RwLock; +use std::sync::{Arc, Mutex, Weak}; use std::thread; use std::time::{Duration, Instant}; use crate::state::{ - MultiObject, MultiProgressState, ProgressDrawState, ProgressDrawTarget, ProgressDrawTargetKind, + MultiProgressState, ProgressDrawState, ProgressDrawTarget, ProgressDrawTargetKind, ProgressState, Status, }; use crate::style::ProgressStyle; @@ -507,21 +506,11 @@ impl ProgressBar { } /// Manages multiple progress bars from different threads. +#[derive(Debug)] pub struct MultiProgress { state: Arc>, - joining: AtomicBool, - tx: Sender<(usize, ProgressDrawState)>, - rx: Receiver<(usize, ProgressDrawState)>, } -impl fmt::Debug for MultiProgress { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MultiProgress").finish() - } -} - -unsafe impl Sync for MultiProgress {} - impl Default for MultiProgress { fn default() -> MultiProgress { MultiProgress::with_draw_target(ProgressDrawTarget::stderr()) @@ -540,18 +529,14 @@ impl MultiProgress { /// Creates a new multi progress object with the given draw target. pub fn with_draw_target(draw_target: ProgressDrawTarget) -> MultiProgress { - let (tx, rx) = channel(); MultiProgress { state: Arc::new(RwLock::new(MultiProgressState { - objects: Vec::new(), + draw_states: Vec::new(), free_set: Vec::new(), ordering: vec![], draw_target, move_cursor: false, })), - joining: AtomicBool::new(false), - tx, - rx, } } @@ -592,20 +577,15 @@ impl MultiProgress { } fn push(&self, pos: Option, pb: ProgressBar) -> ProgressBar { - let new = MultiObject { - done: false, - draw_state: None, - }; - let mut state = self.state.write().unwrap(); let idx = match state.free_set.pop() { Some(idx) => { - state.objects[idx] = Some(new); + state.draw_states[idx] = None; idx } None => { - state.objects.push(Some(new)); - state.objects.len() - 1 + state.draw_states.push(None); + state.draw_states.len() - 1 } }; @@ -614,11 +594,15 @@ impl MultiProgress { _ => state.ordering.push(idx), } + assert!( + state.len() == state.ordering.len(), + "Draw state is inconsistent" + ); + pb.set_draw_target(ProgressDrawTarget { kind: ProgressDrawTargetKind::Remote { state: self.state.clone(), idx, - chan: Mutex::new(self.tx.clone()), }, }); pb @@ -643,128 +627,6 @@ impl MultiProgress { self.state.write().unwrap().remove_idx(idx); } - /// Waits for all progress bars to report that they are finished. - /// - /// You need to call this as this will request the draw instructions - /// from the remote progress bars. Not calling this will deadlock - /// your program. - pub fn join(&self) -> io::Result<()> { - self.join_impl(false) - } - - /// Works like `join` but clears the progress bar in the end. - pub fn join_and_clear(&self) -> io::Result<()> { - self.join_impl(true) - } - - fn join_impl(&self, clear: bool) -> io::Result<()> { - if self.joining.load(Ordering::Acquire) { - panic!("Already joining!"); - } - self.joining.store(true, Ordering::Release); - - let move_cursor = self.state.read().unwrap().move_cursor; - // Max amount of grouped together updates at once. This is meant - // to ensure there isn't a situation where continuous updates prevent - // any actual draws happening. - const MAX_GROUP_SIZE: usize = 32; - let mut recv_peek = None; - let mut grouped = 0usize; - let mut orphan_lines: Vec = Vec::new(); - let mut force_draw = false; - while !self.state.read().unwrap().is_done() { - let (idx, draw_state) = if let Some(peeked) = recv_peek.take() { - peeked - } else { - self.rx.recv().unwrap() - }; - force_draw |= draw_state.finished || draw_state.force_draw; - - let mut state = self.state.write().unwrap(); - if draw_state.finished { - if let Some(ref mut obj) = &mut state.objects[idx] { - obj.done = true; - } - if draw_state.lines.is_empty() { - // `finish_and_clear` was called - state.remove_idx(idx); - } - } - - // Split orphan lines out of the draw state, if any - let lines = if draw_state.orphan_lines > 0 { - let split = draw_state.lines.split_at(draw_state.orphan_lines); - orphan_lines.extend_from_slice(split.0); - split.1.to_vec() - } else { - draw_state.lines - }; - - let draw_state = ProgressDrawState { - lines, - orphan_lines: 0, - ..draw_state - }; - - if let Some(ref mut obj) = &mut state.objects[idx] { - obj.draw_state = Some(draw_state); - } - - // the rest from here is only drawing, we can skip it. - if state.draw_target.is_hidden() { - continue; - } - - debug_assert!(recv_peek.is_none()); - if grouped >= MAX_GROUP_SIZE { - // Can't group any more draw calls, proceed to just draw - grouped = 0; - } else if let Ok(state) = self.rx.try_recv() { - // Only group draw calls if there is another draw already queued - recv_peek = Some(state); - grouped += 1; - continue; - } else { - // No more draws queued, proceed to just draw - grouped = 0; - } - - let mut lines = vec![]; - - // Make orphaned lines appear at the top, so they can be properly - // forgotten. - let orphan_lines_count = orphan_lines.len(); - lines.append(&mut orphan_lines); - - for index in state.ordering.iter() { - if let Some(obj) = &state.objects[*index] { - if let Some(ref draw_state) = obj.draw_state { - lines.extend_from_slice(&draw_state.lines[..]); - } - } - } - - let finished = state.is_done(); - state.draw_target.apply_draw_state(ProgressDrawState { - lines, - orphan_lines: orphan_lines_count, - force_draw: force_draw || orphan_lines_count > 0, - move_cursor, - finished, - })?; - - force_draw = false; - } - - if clear { - self.clear()?; - } - - self.joining.store(false, Ordering::Release); - - Ok(()) - } - pub fn clear(&self) -> io::Result<()> { let mut state = self.state.write().unwrap(); let move_cursor = state.move_cursor; @@ -849,7 +711,6 @@ mod tests { let pb = mpb.add(ProgressBar::new(1)); pb.set_draw_delta(2); drop(pb); - mpb.join().unwrap(); } #[test] @@ -859,7 +720,6 @@ mod tests { pb.set_draw_delta(2); pb.abandon(); drop(pb); - mpb.join().unwrap(); } #[test] @@ -914,17 +774,19 @@ mod tests { let state = mp.state.read().unwrap(); // the removed place for p1 is reused - assert_eq!(state.objects.len(), 4); - assert_eq!(state.objects.iter().filter(|o| o.is_some()).count(), 3); + assert_eq!(state.draw_states.len(), 4); + assert_eq!(state.len(), 3); // free_set may contain 1 or 2 match state.free_set.last() { Some(1) => { assert_eq!(state.ordering, vec![0, 2, 3]); + assert!(state.draw_states[1].is_none()); assert_eq!(extract_index(&p4), 2); } Some(2) => { assert_eq!(state.ordering, vec![0, 1, 3]); + assert!(state.draw_states[2].is_none()); assert_eq!(extract_index(&p4), 1); } _ => unreachable!(), @@ -948,8 +810,10 @@ mod tests { let state = mp.state.read().unwrap(); // the removed place for p1 is reused - assert_eq!(state.objects.len(), 2); - assert_eq!(state.objects.iter().filter(|obj| obj.is_some()).count(), 1); + assert_eq!(state.draw_states.len(), 2); + assert_eq!(state.free_set.len(), 1); + assert_eq!(state.len(), 1); + assert!(state.draw_states[0].is_none()); assert_eq!(state.free_set.last(), Some(&0)); assert_eq!(state.ordering, vec![1]); @@ -969,6 +833,5 @@ mod tests { let mpb = MultiProgress::with_draw_target(ProgressDrawTarget::hidden()); let pb = mpb.add(ProgressBar::new(123)); pb.finish(); - mpb.join().unwrap(); } } diff --git a/src/state.rs b/src/state.rs index 5900a893..b2acec00 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,7 +1,6 @@ use std::borrow::Cow; use std::io; -use std::sync::mpsc::Sender; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; use std::thread; use std::time::{Duration, Instant}; @@ -257,12 +256,14 @@ impl Drop for ProgressState { } } +#[derive(Debug)] pub(crate) struct MultiProgressState { /// The collection of states corresponding to progress bars - pub(crate) objects: Vec>, - /// Set of `None` elements in the `objects` vector + /// the state is None for bars that have not yet been drawn or have been removed + pub(crate) draw_states: Vec>, + /// Set of removed bars, should have corresponding `None` elements in the `draw_states` vector pub(crate) free_set: Vec, - /// Indices to the `objects` to maintain correct visual order + /// Indices to the `draw_states` to maintain correct visual order pub(crate) ordering: Vec, /// Target for draw operation for MultiProgress pub(crate) draw_target: ProgressDrawTarget, @@ -275,26 +276,78 @@ impl MultiProgressState { self.draw_target.width() } - pub(crate) fn is_done(&self) -> bool { - self.objects.iter().all(|o| match o { - Some(obj) => obj.done, - None => true, + pub(crate) fn draw(&mut self, idx: usize, draw_state: ProgressDrawState) -> io::Result<()> { + let force_draw = draw_state.finished || draw_state.force_draw; + let mut orphan_lines = vec![]; + + // Split orphan lines out of the draw state, if any + let lines = if draw_state.orphan_lines > 0 { + let split = draw_state.lines.split_at(draw_state.orphan_lines); + orphan_lines.extend_from_slice(split.0); + split.1.to_vec() + } else { + draw_state.lines + }; + + let draw_state = ProgressDrawState { + lines, + orphan_lines: 0, + ..draw_state + }; + + self.draw_states[idx] = Some(draw_state); + + // the rest from here is only drawing, we can skip it. + if self.draw_target.is_hidden() { + return Ok(()); + } + + let mut lines = vec![]; + + // Make orphaned lines appear at the top, so they can be properly + // forgotten. + let orphan_lines_count = orphan_lines.len(); + lines.append(&mut orphan_lines); + + for index in self.ordering.iter() { + let draw_state = &self.draw_states[*index]; + if let Some(ref draw_state) = draw_state { + lines.extend_from_slice(&draw_state.lines[..]); + } + } + + // !any(!done) is also true when iter() is empty, contrary to all(done) + let finished = !self + .draw_states + .iter() + .any(|ref x| !x.as_ref().map(|s| s.finished).unwrap_or(false)); + self.draw_target.apply_draw_state(ProgressDrawState { + lines, + orphan_lines: orphan_lines_count, + force_draw: force_draw || orphan_lines_count > 0, + move_cursor: self.move_cursor, + finished, }) } + pub(crate) fn len(&self) -> usize { + self.draw_states.len() - self.free_set.len() + } + pub(crate) fn remove_idx(&mut self, idx: usize) { - if self.objects[idx].take().is_none() { + if self.free_set.contains(&idx) { return; } + self.draw_states[idx].take(); self.free_set.push(idx); self.ordering.retain(|&x| x != idx); - } -} -pub(crate) struct MultiObject { - pub(crate) done: bool, - pub(crate) draw_state: Option, + assert!( + self.len() == self.ordering.len(), + "Draw state is inconsistent" + ); + } } /// The drawn state of an element. @@ -334,6 +387,7 @@ pub(crate) enum Status { /// The draw target is a stateful wrapper over a drawing destination and /// internally optimizes how often the state is painted to the output /// device. +#[derive(Debug)] pub struct ProgressDrawTarget { pub(crate) kind: ProgressDrawTargetKind, } @@ -468,11 +522,11 @@ impl ProgressDrawTarget { } if draw_state.finished || draw_state.force_draw || last_draw.elapsed() > rate => { (term, last_line_count, last_draw) } - ProgressDrawTargetKind::Remote { idx, ref chan, .. } => { - return chan - .lock() + ProgressDrawTargetKind::Remote { idx, ref state, .. } => { + return state + .write() .unwrap() - .send((idx, draw_state)) + .draw(idx, draw_state) .map_err(|e| io::Error::new(io::ErrorKind::Other, e)); } // Hidden, finished, or no need to refresh yet @@ -496,10 +550,11 @@ impl ProgressDrawTarget { pub(crate) fn disconnect(&self) { match self.kind { ProgressDrawTargetKind::Term { .. } => {} - ProgressDrawTargetKind::Remote { idx, ref chan, .. } => { - chan.lock() + ProgressDrawTargetKind::Remote { idx, ref state, .. } => { + state + .write() .unwrap() - .send(( + .draw( idx, ProgressDrawState { lines: vec![], @@ -508,13 +563,14 @@ impl ProgressDrawTarget { force_draw: false, move_cursor: false, }, - )) + ) .ok(); } ProgressDrawTargetKind::Hidden => {} }; } } +#[derive(Debug)] pub(crate) enum ProgressDrawTargetKind { Term { term: Term, @@ -525,7 +581,6 @@ pub(crate) enum ProgressDrawTargetKind { Remote { state: Arc>, idx: usize, - chan: Mutex>, }, Hidden, } diff --git a/tests/multi-autodrop.rs b/tests/multi-autodrop.rs index 481ef6fc..6e9a7c85 100644 --- a/tests/multi-autodrop.rs +++ b/tests/multi-autodrop.rs @@ -1,20 +1,14 @@ use indicatif::{MultiProgress, ProgressBar}; -use std::sync::mpsc; use std::thread; use std::time::Duration; #[test] fn main() { - let m = MultiProgress::new(); - let pb = m.add(ProgressBar::new(10)); - let (tx, rx) = mpsc::channel(); - - // start a thread to drive MultiProgress - let h = thread::spawn(move || { - m.join().unwrap(); - tx.send(()).unwrap(); - println!("Done in thread, droping MultiProgress"); - }); + let pb = { + let m = MultiProgress::new(); + m.add(ProgressBar::new(10)) + // The MultiProgress is dropped here. + }; { let pb2 = pb.clone(); @@ -24,23 +18,8 @@ fn main() { } } - // make sure anything is done in driver thread - thread::sleep(Duration::from_millis(50)); - - // the driver thread shouldn't finish - rx.try_recv() - .expect_err("The driver thread shouldn't finish"); - pb.set_message("Done"); pb.finish(); - // make sure anything is done in driver thread - thread::sleep(Duration::from_millis(50)); - - // the driver thread should finish here - rx.try_recv().expect("The driver thread should finish"); - - h.join().unwrap(); - - println!("Done in main"); + println!("Done with MultiProgress"); } From fee4bdca2172beaeb7b906b9bdc205de8bdc0e89 Mon Sep 17 00:00:00 2001 From: AJ Bagwell Date: Mon, 17 May 2021 17:00:46 +0100 Subject: [PATCH 3/3] Add leaky bucket to rate limiting of progress renders. The leaky bucket makes sure that progress bars are drawn on average a the desired rate but allows it to burst faster if progress is not uniform. It works by having a "bucket" of redraws that are added to every time a draw is requested, when the bucket is full the redraws are skipped, for every tick a redraw is removed from the bucket. This is to fix the regression of #166 when join was removed. --- src/state.rs | 68 +++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 14 deletions(-) diff --git a/src/state.rs b/src/state.rs index b2acec00..200d0d28 100644 --- a/src/state.rs +++ b/src/state.rs @@ -467,16 +467,15 @@ impl ProgressDrawTarget { /// /// Will panic if refresh_rate is `Some(0)`. To disable rate limiting use `None` instead. pub fn term(term: Term, refresh_rate: impl Into>) -> ProgressDrawTarget { - let rate = refresh_rate - .into() - .map(|x| Duration::from_millis(1000 / x)) - .unwrap_or_else(|| Duration::from_secs(0)); ProgressDrawTarget { kind: ProgressDrawTargetKind::Term { term, last_line_count: 0, - rate, - last_draw: Instant::now() - rate, + leaky_bucket: refresh_rate.into().map(|rate| LeakyBucket { + bucket: MAX_GROUP_SIZE, + leak_rate: rate as f64, + last_update: Instant::now(), + }), }, } } @@ -513,14 +512,23 @@ impl ProgressDrawTarget { /// Apply the given draw state (draws it). pub(crate) fn apply_draw_state(&mut self, draw_state: ProgressDrawState) -> io::Result<()> { - let (term, last_line_count, last_draw) = match self.kind { + let (term, last_line_count) = match self.kind { ProgressDrawTargetKind::Term { ref term, ref mut last_line_count, - rate, - ref mut last_draw, - } if draw_state.finished || draw_state.force_draw || last_draw.elapsed() > rate => { - (term, last_line_count, last_draw) + leaky_bucket: None, + } => (term, last_line_count), + ProgressDrawTargetKind::Term { + ref term, + ref mut last_line_count, + leaky_bucket: Some(ref mut leaky_bucket), + } => { + if draw_state.finished || draw_state.force_draw || leaky_bucket.try_add_work() { + (term, last_line_count) + } else { + // rate limited + return Ok(()); + } } ProgressDrawTargetKind::Remote { idx, ref state, .. } => { return state @@ -542,7 +550,6 @@ impl ProgressDrawTarget { draw_state.draw_to_term(term)?; term.flush()?; *last_line_count = draw_state.lines.len() - draw_state.orphan_lines; - *last_draw = Instant::now(); Ok(()) } @@ -570,13 +577,46 @@ impl ProgressDrawTarget { }; } } + +const MAX_GROUP_SIZE: f64 = 32.0; + +#[derive(Debug)] +pub(crate) struct LeakyBucket { + leak_rate: f64, + last_update: Instant, + bucket: f64, +} + +/// Rate limit but allow occasional bursts above desired rate +impl LeakyBucket { + /// try to add some work to the bucket + /// return false if the bucket is already full and the work should be skipped + fn try_add_work(&mut self) -> bool { + self.leak(); + if self.bucket < MAX_GROUP_SIZE { + self.bucket += 1.0; + true + } else { + false + } + } + + fn leak(&mut self) { + let ticks = self.last_update.elapsed().as_secs_f64() * self.leak_rate; + self.bucket -= ticks; + if self.bucket < 0.0 { + self.bucket = 0.0; + } + self.last_update = Instant::now(); + } +} + #[derive(Debug)] pub(crate) enum ProgressDrawTargetKind { Term { term: Term, last_line_count: usize, - rate: Duration, - last_draw: Instant, + leaky_bucket: Option, }, Remote { state: Arc>,