From 08dbab9f449a7a442c561e88660b9174381c760f Mon Sep 17 00:00:00 2001 From: Marien Zwart Date: Sun, 3 Jan 2021 16:14:12 +1100 Subject: [PATCH 1/3] Remove MultiProgress::join() Instead of drawing from a thread blocked on MultiProgress::join(), draw directly from the threads updating the ProgressBars in the MultiProgress, using write access to MultiProgress's state to synchronize terminal updates. --- examples/finebars.rs | 43 ++++--- examples/morebars.rs | 5 +- examples/multi-tree.rs | 5 +- examples/multi.rs | 8 +- examples/yarnish.rs | 39 ++++--- src/progress.rs | 252 +++++++++++++++------------------------- tests/multi-autodrop.rs | 33 +----- 7 files changed, 152 insertions(+), 233 deletions(-) diff --git a/examples/finebars.rs b/examples/finebars.rs index 7baa131d..51e4d7cb 100644 --- a/examples/finebars.rs +++ b/examples/finebars.rs @@ -15,24 +15,29 @@ fn main() { let m = MultiProgress::new(); - for s in styles.iter() { - let pb = m.add(ProgressBar::new(512)); - pb.set_style( - ProgressStyle::default_bar() - .template(&format!("{{prefix:.bold}}▕{{bar:.{}}}▏{{msg}}", s.2)) - .progress_chars(s.1), - ); - pb.set_prefix(s.0); - let wait = Duration::from_millis(thread_rng().gen_range(10, 30)); - thread::spawn(move || { - for i in 0..512 { - pb.inc(1); - pb.set_message(&format!("{:3}%", 100 * i / 512)); - thread::sleep(wait); - } - pb.finish_with_message("100%"); - }); - } + let handles: Vec<_> = styles + .iter() + .map(|s| { + let pb = m.add(ProgressBar::new(512)); + pb.set_style( + ProgressStyle::default_bar() + .template(&format!("{{prefix:.bold}}▕{{bar:.{}}}▏{{msg}}", s.2)) + .progress_chars(s.1), + ); + pb.set_prefix(s.0); + let wait = Duration::from_millis(thread_rng().gen_range(10, 30)); + thread::spawn(move || { + for i in 0..512 { + pb.inc(1); + pb.set_message(&format!("{:3}%", 100 * i / 512)); + thread::sleep(wait); + } + pb.finish_with_message("100%"); + }) + }) + .collect(); - m.join().unwrap(); + for h in handles { + let _ = h.join(); + } } diff --git a/examples/morebars.rs b/examples/morebars.rs index cbbe64de..4455d5f4 100644 --- a/examples/morebars.rs +++ b/examples/morebars.rs @@ -27,7 +27,6 @@ fn main() { pb.inc(1); } pb.finish_with_message("done"); - }); - - m.join().unwrap(); + }) + .join(); } diff --git a/examples/multi-tree.rs b/examples/multi-tree.rs index fdc4c586..fee5b63b 100644 --- a/examples/multi-tree.rs +++ b/examples/multi-tree.rs @@ -108,9 +108,8 @@ fn main() { } thread::sleep(Duration::from_millis(15)); } - }); - - mp.join().unwrap(); + }) + .join(); println!("==============================="); println!("the tree should be the same as:"); diff --git a/examples/multi.rs b/examples/multi.rs index d61d2b9a..ca4bbed1 100644 --- a/examples/multi.rs +++ b/examples/multi.rs @@ -11,7 +11,7 @@ fn main() { let pb = m.add(ProgressBar::new(128)); pb.set_style(sty.clone()); - let _ = thread::spawn(move || { + let h1 = thread::spawn(move || { for i in 0..128 { pb.set_message(&format!("item #{}", i + 1)); pb.inc(1); @@ -22,7 +22,7 @@ fn main() { let pb = m.add(ProgressBar::new(128)); pb.set_style(sty.clone()); - let _ = thread::spawn(move || { + let h2 = thread::spawn(move || { for _ in 0..3 { pb.set_position(0); for i in 0..128 { @@ -45,5 +45,7 @@ fn main() { pb.finish_with_message("done"); }); - m.join_and_clear().unwrap(); + let _ = h1.join(); + let _ = h2.join(); + m.clear().unwrap(); } diff --git a/examples/yarnish.rs b/examples/yarnish.rs index b81d344f..1acf26f0 100644 --- a/examples/yarnish.rs +++ b/examples/yarnish.rs @@ -71,24 +71,29 @@ pub fn main() { PAPER ); let m = MultiProgress::new(); - for i in 0..4 { - let count = rng.gen_range(30, 80); - let pb = m.add(ProgressBar::new(count)); - pb.set_style(spinner_style.clone()); - pb.set_prefix(&format!("[{}/?]", i + 1)); - let _ = thread::spawn(move || { - let mut rng = rand::thread_rng(); - let pkg = PACKAGES.choose(&mut rng).unwrap(); - for _ in 0..count { - let cmd = COMMANDS.choose(&mut rng).unwrap(); - pb.set_message(&format!("{}: {}", pkg, cmd)); - pb.inc(1); - thread::sleep(Duration::from_millis(rng.gen_range(25, 200))); - } - pb.finish_with_message("waiting..."); - }); + let handles: Vec<_> = (0..4u32) + .map(|i| { + let count = rng.gen_range(30, 80); + let pb = m.add(ProgressBar::new(count)); + pb.set_style(spinner_style.clone()); + pb.set_prefix(&format!("[{}/?]", i + 1)); + thread::spawn(move || { + let mut rng = rand::thread_rng(); + let pkg = PACKAGES.choose(&mut rng).unwrap(); + for _ in 0..count { + let cmd = COMMANDS.choose(&mut rng).unwrap(); + pb.set_message(&format!("{}: {}", pkg, cmd)); + pb.inc(1); + thread::sleep(Duration::from_millis(rng.gen_range(25, 200))); + } + pb.finish_with_message("waiting..."); + }) + }) + .collect(); + for h in handles { + let _ = h.join(); } - m.join_and_clear().unwrap(); + m.clear().unwrap(); println!("{} Done in {}", SPARKLE, HumanDuration(started.elapsed())); } diff --git a/src/progress.rs b/src/progress.rs index ab3920a6..4b079e99 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -1,9 +1,7 @@ use std::fmt; use std::io; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::RwLock; use std::sync::{Arc, Weak}; -use std::sync::{Mutex, RwLock}; use std::thread; use std::time::{Duration, Instant}; @@ -35,9 +33,10 @@ enum Status { DoneHidden, } +#[derive(Debug)] enum ProgressDrawTargetKind { Term(Term, Option, Option), - Remote(usize, Mutex>), + Remote(usize, Arc>), Hidden, } @@ -47,6 +46,7 @@ enum ProgressDrawTargetKind { /// The draw target is a stateful wrapper over a drawing destination and /// internally optimizes how often the state is painted to the output /// device. +#[derive(Debug)] pub struct ProgressDrawTarget { kind: ProgressDrawTargetKind, } @@ -164,11 +164,11 @@ impl ProgressDrawTarget { *last_state = Some(draw_state); } } - ProgressDrawTargetKind::Remote(idx, ref chan) => { - return chan - .lock() + ProgressDrawTargetKind::Remote(idx, ref state) => { + return state + .write() .unwrap() - .send((idx, draw_state)) + .draw(idx, draw_state) .map_err(|e| io::Error::new(io::ErrorKind::Other, e)); } ProgressDrawTargetKind::Hidden => {} @@ -180,10 +180,11 @@ impl ProgressDrawTarget { fn disconnect(&self) { match self.kind { ProgressDrawTargetKind::Term(_, _, _) => {} - ProgressDrawTargetKind::Remote(idx, ref chan) => { - chan.lock() + ProgressDrawTargetKind::Remote(idx, ref state) => { + state + .write() .unwrap() - .send(( + .draw( idx, ProgressDrawState { lines: vec![], @@ -193,7 +194,7 @@ impl ProgressDrawTarget { move_cursor: false, ts: Instant::now(), }, - )) + ) .ok(); } ProgressDrawTargetKind::Hidden => {} @@ -883,11 +884,13 @@ fn test_weak_pb() { assert!(weak.upgrade().is_none()); } +#[derive(Debug)] struct MultiObject { done: bool, draw_state: Option, } +#[derive(Debug)] struct MultiProgressState { objects: Vec, ordering: Vec, @@ -895,21 +898,71 @@ struct MultiProgressState { move_cursor: bool, } -/// Manages multiple progress bars from different threads. -pub struct MultiProgress { - state: RwLock, - joining: AtomicBool, - tx: Sender<(usize, ProgressDrawState)>, - rx: Receiver<(usize, ProgressDrawState)>, -} +impl MultiProgressState { + fn draw(&mut self, idx: usize, draw_state: ProgressDrawState) -> io::Result<()> { + let ts = draw_state.ts; + let force_draw = draw_state.finished || draw_state.force_draw; -impl fmt::Debug for MultiProgress { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MultiProgress").finish() + if draw_state.finished { + self.objects[idx].done = true; + } + + let mut orphan_lines = vec![]; + + // Split orphan lines out of the draw state, if any + let lines = if draw_state.orphan_lines > 0 { + let split = draw_state.lines.split_at(draw_state.orphan_lines); + orphan_lines.extend_from_slice(split.0); + split.1.to_vec() + } else { + draw_state.lines + }; + + let draw_state = ProgressDrawState { + lines, + orphan_lines: 0, + ..draw_state + }; + + self.objects[idx].draw_state = Some(draw_state); + + // the rest from here is only drawing, we can skip it. + if self.draw_target.is_hidden() { + return Ok(()); + } + + let mut lines = vec![]; + + // Make orphaned lines appear at the top, so they can be properly + // forgotten. + let orphan_lines_count = orphan_lines.len(); + lines.append(&mut orphan_lines); + + for index in self.ordering.iter() { + let obj = &self.objects[*index]; + if let Some(ref draw_state) = obj.draw_state { + lines.extend_from_slice(&draw_state.lines[..]); + } + } + + // !any(!done) is also true when iter() is empty, contrary to all(done) + let finished = !self.objects.iter().any(|ref x| !x.done); + self.draw_target.apply_draw_state(ProgressDrawState { + lines, + orphan_lines: orphan_lines_count, + force_draw: force_draw || orphan_lines_count > 0, + move_cursor: self.move_cursor, + finished, + ts, + }) } } -unsafe impl Sync for MultiProgress {} +/// Manages multiple progress bars from different threads. +#[derive(Debug)] +pub struct MultiProgress { + state: Arc>, +} impl Default for MultiProgress { fn default() -> MultiProgress { @@ -929,17 +982,13 @@ impl MultiProgress { /// Creates a new multi progress object with the given draw target. pub fn with_draw_target(draw_target: ProgressDrawTarget) -> MultiProgress { - let (tx, rx) = channel(); MultiProgress { - state: RwLock::new(MultiProgressState { + state: Arc::new(RwLock::new(MultiProgressState { objects: vec![], ordering: vec![], draw_target, move_cursor: false, - }), - joining: AtomicBool::new(false), - tx, - rx, + })), } } @@ -972,7 +1021,7 @@ impl MultiProgress { }); state.ordering.push(object_idx); pb.set_draw_target(ProgressDrawTarget { - kind: ProgressDrawTargetKind::Remote(object_idx, Mutex::new(self.tx.clone())), + kind: ProgressDrawTargetKind::Remote(object_idx, self.state.clone()), }); pb } @@ -998,141 +1047,22 @@ impl MultiProgress { state.ordering.insert(index, object_idx); } pb.set_draw_target(ProgressDrawTarget { - kind: ProgressDrawTargetKind::Remote(object_idx, Mutex::new(self.tx.clone())), + kind: ProgressDrawTargetKind::Remote(object_idx, self.state.clone()), }); pb } - /// Waits for all progress bars to report that they are finished. - /// - /// You need to call this as this will request the draw instructions - /// from the remote progress bars. Not calling this will deadlock - /// your program. - pub fn join(&self) -> io::Result<()> { - self.join_impl(false) - } - - /// Works like `join` but clears the progress bar in the end. - pub fn join_and_clear(&self) -> io::Result<()> { - self.join_impl(true) - } - - fn is_done(&self) -> bool { - let state = self.state.read().unwrap(); - if state.objects.is_empty() { - return true; - } - for obj in &state.objects { - if !obj.done { - return false; - } - } - true - } - - fn join_impl(&self, clear: bool) -> io::Result<()> { - if self.joining.load(Ordering::Acquire) { - panic!("Already joining!"); - } - self.joining.store(true, Ordering::Release); - - let move_cursor = self.state.read().unwrap().move_cursor; - // Max amount of grouped together updates at once. This is meant - // to ensure there isn't a situation where continuous updates prevent - // any actual draws happening. - const MAX_GROUP_SIZE: usize = 32; - let mut recv_peek = None; - let mut grouped = 0usize; - let mut orphan_lines: Vec = Vec::new(); - while !self.is_done() { - let (idx, draw_state) = if let Some(peeked) = recv_peek.take() { - peeked - } else { - self.rx.recv().unwrap() - }; - let ts = draw_state.ts; - let force_draw = draw_state.finished || draw_state.force_draw; - - let mut state = self.state.write().unwrap(); - if draw_state.finished { - state.objects[idx].done = true; - } - - // Split orphan lines out of the draw state, if any - let lines = if draw_state.orphan_lines > 0 { - let split = draw_state.lines.split_at(draw_state.orphan_lines); - orphan_lines.extend_from_slice(split.0); - split.1.to_vec() - } else { - draw_state.lines - }; - - let draw_state = ProgressDrawState { - lines, - orphan_lines: 0, - ..draw_state - }; - - state.objects[idx].draw_state = Some(draw_state); - - // the rest from here is only drawing, we can skip it. - if state.draw_target.is_hidden() { - continue; - } - - debug_assert!(recv_peek.is_none()); - if grouped >= MAX_GROUP_SIZE { - // Can't group any more draw calls, proceed to just draw - grouped = 0; - } else if let Ok(state) = self.rx.try_recv() { - // Only group draw calls if there is another draw already queued - recv_peek = Some(state); - grouped += 1; - continue; - } else { - // No more draws queued, proceed to just draw - grouped = 0; - } - - let mut lines = vec![]; - - // Make orphaned lines appear at the top, so they can be properly - // forgotten. - let orphan_lines_count = orphan_lines.len(); - lines.append(&mut orphan_lines); - - for index in state.ordering.iter() { - let obj = &state.objects[*index]; - if let Some(ref draw_state) = obj.draw_state { - lines.extend_from_slice(&draw_state.lines[..]); - } - } - - // !any(!done) is also true when iter() is empty, contrary to all(done) - let finished = !state.objects.iter().any(|ref x| !x.done); - state.draw_target.apply_draw_state(ProgressDrawState { - lines, - orphan_lines: orphan_lines_count, - force_draw: force_draw || orphan_lines_count > 0, - move_cursor, - finished, - ts, - })?; - } - - if clear { - let mut state = self.state.write().unwrap(); - state.draw_target.apply_draw_state(ProgressDrawState { - lines: vec![], - orphan_lines: 0, - finished: true, - force_draw: true, - move_cursor, - ts: Instant::now(), - })?; - } - - self.joining.store(false, Ordering::Release); + pub fn clear(&self) -> io::Result<()> { + let mut state = self.state.write().unwrap(); + let move_cursor = state.move_cursor; + state.draw_target.apply_draw_state(ProgressDrawState { + lines: vec![], + orphan_lines: 0, + finished: true, + force_draw: true, + move_cursor, + ts: Instant::now(), + })?; Ok(()) } diff --git a/tests/multi-autodrop.rs b/tests/multi-autodrop.rs index 481ef6fc..6e9a7c85 100644 --- a/tests/multi-autodrop.rs +++ b/tests/multi-autodrop.rs @@ -1,20 +1,14 @@ use indicatif::{MultiProgress, ProgressBar}; -use std::sync::mpsc; use std::thread; use std::time::Duration; #[test] fn main() { - let m = MultiProgress::new(); - let pb = m.add(ProgressBar::new(10)); - let (tx, rx) = mpsc::channel(); - - // start a thread to drive MultiProgress - let h = thread::spawn(move || { - m.join().unwrap(); - tx.send(()).unwrap(); - println!("Done in thread, droping MultiProgress"); - }); + let pb = { + let m = MultiProgress::new(); + m.add(ProgressBar::new(10)) + // The MultiProgress is dropped here. + }; { let pb2 = pb.clone(); @@ -24,23 +18,8 @@ fn main() { } } - // make sure anything is done in driver thread - thread::sleep(Duration::from_millis(50)); - - // the driver thread shouldn't finish - rx.try_recv() - .expect_err("The driver thread shouldn't finish"); - pb.set_message("Done"); pb.finish(); - // make sure anything is done in driver thread - thread::sleep(Duration::from_millis(50)); - - // the driver thread should finish here - rx.try_recv().expect("The driver thread should finish"); - - h.join().unwrap(); - - println!("Done in main"); + println!("Done with MultiProgress"); } From 38115c0461ab3c182748d208d80e53a24cd47fce Mon Sep 17 00:00:00 2001 From: Marien Zwart Date: Sun, 10 Jan 2021 00:29:56 +1100 Subject: [PATCH 2/3] Remove MultiObject's done field We set this from the "finished" field of a ProgressDrawState, which we also store. It does not look like the separate field is necessary. --- src/progress.rs | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/src/progress.rs b/src/progress.rs index 4b079e99..d9a70895 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -886,7 +886,6 @@ fn test_weak_pb() { #[derive(Debug)] struct MultiObject { - done: bool, draw_state: Option, } @@ -903,10 +902,6 @@ impl MultiProgressState { let ts = draw_state.ts; let force_draw = draw_state.finished || draw_state.force_draw; - if draw_state.finished { - self.objects[idx].done = true; - } - let mut orphan_lines = vec![]; // Split orphan lines out of the draw state, if any @@ -946,7 +941,10 @@ impl MultiProgressState { } // !any(!done) is also true when iter() is empty, contrary to all(done) - let finished = !self.objects.iter().any(|ref x| !x.done); + let finished = !self + .objects + .iter() + .any(|ref x| !x.draw_state.as_ref().map(|s| s.finished).unwrap_or(false)); self.draw_target.apply_draw_state(ProgressDrawState { lines, orphan_lines: orphan_lines_count, @@ -1015,10 +1013,7 @@ impl MultiProgress { pub fn add(&self, pb: ProgressBar) -> ProgressBar { let mut state = self.state.write().unwrap(); let object_idx = state.objects.len(); - state.objects.push(MultiObject { - done: false, - draw_state: None, - }); + state.objects.push(MultiObject { draw_state: None }); state.ordering.push(object_idx); pb.set_draw_target(ProgressDrawTarget { kind: ProgressDrawTargetKind::Remote(object_idx, self.state.clone()), @@ -1037,10 +1032,7 @@ impl MultiProgress { pub fn insert(&self, index: usize, pb: ProgressBar) -> ProgressBar { let mut state = self.state.write().unwrap(); let object_idx = state.objects.len(); - state.objects.push(MultiObject { - done: false, - draw_state: None, - }); + state.objects.push(MultiObject { draw_state: None }); if index > state.ordering.len() { state.ordering.push(object_idx); } else { From 6c4285a9359bffe3a228b223e00e26d74a5824e6 Mon Sep 17 00:00:00 2001 From: Marien Zwart Date: Sun, 10 Jan 2021 00:37:09 +1100 Subject: [PATCH 3/3] Remove MultiObject --- src/progress.rs | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/src/progress.rs b/src/progress.rs index d9a70895..b9a5aa8d 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -884,14 +884,9 @@ fn test_weak_pb() { assert!(weak.upgrade().is_none()); } -#[derive(Debug)] -struct MultiObject { - draw_state: Option, -} - #[derive(Debug)] struct MultiProgressState { - objects: Vec, + draw_states: Vec>, ordering: Vec, draw_target: ProgressDrawTarget, move_cursor: bool, @@ -919,7 +914,7 @@ impl MultiProgressState { ..draw_state }; - self.objects[idx].draw_state = Some(draw_state); + self.draw_states[idx] = Some(draw_state); // the rest from here is only drawing, we can skip it. if self.draw_target.is_hidden() { @@ -934,17 +929,17 @@ impl MultiProgressState { lines.append(&mut orphan_lines); for index in self.ordering.iter() { - let obj = &self.objects[*index]; - if let Some(ref draw_state) = obj.draw_state { + let draw_state = &self.draw_states[*index]; + if let Some(ref draw_state) = draw_state { lines.extend_from_slice(&draw_state.lines[..]); } } // !any(!done) is also true when iter() is empty, contrary to all(done) let finished = !self - .objects + .draw_states .iter() - .any(|ref x| !x.draw_state.as_ref().map(|s| s.finished).unwrap_or(false)); + .any(|ref x| !x.as_ref().map(|s| s.finished).unwrap_or(false)); self.draw_target.apply_draw_state(ProgressDrawState { lines, orphan_lines: orphan_lines_count, @@ -982,7 +977,7 @@ impl MultiProgress { pub fn with_draw_target(draw_target: ProgressDrawTarget) -> MultiProgress { MultiProgress { state: Arc::new(RwLock::new(MultiProgressState { - objects: vec![], + draw_states: vec![], ordering: vec![], draw_target, move_cursor: false, @@ -1012,8 +1007,8 @@ impl MultiProgress { /// object overriding custom `ProgressDrawTarget` settings. pub fn add(&self, pb: ProgressBar) -> ProgressBar { let mut state = self.state.write().unwrap(); - let object_idx = state.objects.len(); - state.objects.push(MultiObject { draw_state: None }); + let object_idx = state.draw_states.len(); + state.draw_states.push(None); state.ordering.push(object_idx); pb.set_draw_target(ProgressDrawTarget { kind: ProgressDrawTargetKind::Remote(object_idx, self.state.clone()), @@ -1027,12 +1022,12 @@ impl MultiProgress { /// target changed to a remote draw target that is intercepted by the /// multi progress object overriding custom `ProgressDrawTarget` settings. /// - /// If `index >= MultiProgressState::objects.len()`, the progress bar + /// If `index >= MultiProgressState::draw_states.len()`, the progress bar /// is added to the end of the list. pub fn insert(&self, index: usize, pb: ProgressBar) -> ProgressBar { let mut state = self.state.write().unwrap(); - let object_idx = state.objects.len(); - state.objects.push(MultiObject { draw_state: None }); + let object_idx = state.draw_states.len(); + state.draw_states.push(None); if index > state.ordering.len() { state.ordering.push(object_idx); } else {