From 437fe4837d17847872cd1029d9ddf4f0333e1876 Mon Sep 17 00:00:00 2001 From: x0f5c3 Date: Sat, 11 Sep 2021 13:54:50 +0200 Subject: [PATCH 1/3] Added tokio as optional dependency, wrap_async_read and wrap_async_write methods for ProgressBar and implemented AsyncRead, AsyncWrite and AsyncSeek for ProgressBarIter --- Cargo.toml | 1 + src/iter.rs | 52 ++++++++++++++++++++++++++++++++++++++++++++- src/progress_bar.rs | 43 +++++++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 040ad970..93550b42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ console = { version = ">=0.9.1, <1.0.0", default-features = false } unicode-segmentation = { version = "1.6.0", optional = true } unicode-width = { version = "0.1.7", optional = true } rayon = { version = "1.0", optional = true } +tokio = { version = "1.0", optional = true, features = ["fs", "io-util"] } [dev-dependencies] rand = "0.8" diff --git a/src/iter.rs b/src/iter.rs index cc038779..b178bd82 100644 --- a/src/iter.rs +++ b/src/iter.rs @@ -1,7 +1,11 @@ use crate::progress_bar::ProgressBar; use std::convert::TryFrom; -use std::io::{self, IoSliceMut}; +use std::io::{self, IoSliceMut, Error, SeekFrom}; use std::iter::FusedIterator; +use std::pin::Pin; +use std::task::{Context, Poll}; +#[cfg(feature = "tokio")] +use tokio::io::ReadBuf; /// Wraps an iterator to display its progress. pub trait ProgressIterator @@ -136,6 +140,52 @@ impl io::Seek for ProgressBarIter { } } +#[cfg(feature = "tokio")] +impl tokio::io::AsyncWrite for ProgressBarIter { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + Pin::new(&mut self.it).poll_write(cx, buf).map(|poll| { + poll.map(|inc| { + self.progress.inc(inc as u64); + inc + }) + }) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.it).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.it).poll_shutdown(cx) + } +} + + +#[cfg(feature = "tokio")] +impl tokio::io::AsyncRead for ProgressBarIter { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let prev_len = buf.filled().len() as u64; + if let Poll::Ready(e) = Pin::new(&mut self.it).poll_read(cx, buf) { + self.progress.inc(buf.filled().len() as u64 - prev_len); + Poll::Ready(e) + } + else { + Poll::Pending + } + } +} + +#[cfg(feature = "tokio")] +impl tokio::io::AsyncSeek for ProgressBarIter { + fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> { + Pin::new(&mut self.it).start_seek(position) + } + + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.it).poll_complete(cx) + } +} + impl io::Write for ProgressBarIter { fn write(&mut self, buf: &[u8]) -> io::Result { self.it.write(buf).map(|inc| { diff --git a/src/progress_bar.rs b/src/progress_bar.rs index 34e82ff3..80f5b577 100644 --- a/src/progress_bar.rs +++ b/src/progress_bar.rs @@ -457,6 +457,49 @@ impl ProgressBar { } } + #[cfg(feature = "tokio")] + /// Wraps an [`tokio::io::AsyncWrite`] with the progress bar + /// + /// ```rust,no_run + /// # use tokio::fs::File; + /// # use tokio::io; + /// # use indicatif::ProgressBar; + /// # async fn test() -> io::Result<()> { + /// let mut source = File::open("work.txt").await?; + /// let mut target = File::open("done.txt").await?; + /// let pb = ProgressBar::new(source.metadata().await?.len()); + /// io::copy(&mut source, &mut pb.wrap_async_write(target)).await?; + /// # Ok(()) + /// # } + /// ``` + pub fn wrap_async_write(&self, write: W) -> ProgressBarIter { + ProgressBarIter { + progress: self.clone(), + it: write, + } + } + #[cfg(feature = "tokio")] + /// Wraps an [`tokio::io::AsyncRead`] with the progress bar + /// + /// ```rust,no_run + /// # use tokio::fs::File; + /// # use tokio::io; + /// # use indicatif::ProgressBar; + /// # async fn test() -> io::Result<()> { + /// let mut source = File::open("work.txt").await?; + /// let mut target = File::open("done.txt").await?; + /// let pb = ProgressBar::new(source.metadata().await?.len()); + /// io::copy(&mut pb.wrap_async_read(source), &mut target).await?; + /// # Ok(()) + /// # } + /// ``` + pub fn wrap_async_read(&self, write: W) -> ProgressBarIter { + ProgressBarIter { + progress: self.clone(), + it: write, + } + } + fn update_and_draw(&self, f: F) { // Delegate to the wrapped state. let mut state = self.state.lock().unwrap(); From aa48350603b902c5686b2c6a15aa6a232e40198a Mon Sep 17 00:00:00 2001 From: x0f5c3 Date: Sat, 18 Sep 2021 11:54:58 +0200 Subject: [PATCH 2/3] Fix clippy warnings by moving async related uses under a feature condition --- src/iter.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/iter.rs b/src/iter.rs index b178bd82..9f2740f7 100644 --- a/src/iter.rs +++ b/src/iter.rs @@ -1,11 +1,11 @@ use crate::progress_bar::ProgressBar; use std::convert::TryFrom; -use std::io::{self, IoSliceMut, Error, SeekFrom}; +use std::io::{self, IoSliceMut}; use std::iter::FusedIterator; -use std::pin::Pin; -use std::task::{Context, Poll}; #[cfg(feature = "tokio")] -use tokio::io::ReadBuf; +use tokio::io::{ReadBuf, SeekFrom}; +#[cfg(feature = "tokio")] +use std::{pin::Pin, task::{Context, Poll}}; /// Wraps an iterator to display its progress. pub trait ProgressIterator @@ -142,7 +142,7 @@ impl io::Seek for ProgressBarIter { #[cfg(feature = "tokio")] impl tokio::io::AsyncWrite for ProgressBarIter { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { Pin::new(&mut self.it).poll_write(cx, buf).map(|poll| { poll.map(|inc| { self.progress.inc(inc as u64); @@ -151,11 +151,11 @@ impl tokio::io::AsyncWrite for ProgressBarIte }) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.it).poll_flush(cx) } - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.it).poll_shutdown(cx) } } From 85d4abda443e6cc813e5f5cd69e70166a4734d96 Mon Sep 17 00:00:00 2001 From: x0f5c3 Date: Sat, 18 Sep 2021 14:04:31 +0200 Subject: [PATCH 3/3] Fixed the formatting --- src/iter.rs | 33 +++++++++++++++++++++------------ src/progress_bar.rs | 5 ++++- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/src/iter.rs b/src/iter.rs index 9f2740f7..a46cca30 100644 --- a/src/iter.rs +++ b/src/iter.rs @@ -3,9 +3,12 @@ use std::convert::TryFrom; use std::io::{self, IoSliceMut}; use std::iter::FusedIterator; #[cfg(feature = "tokio")] -use tokio::io::{ReadBuf, SeekFrom}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; #[cfg(feature = "tokio")] -use std::{pin::Pin, task::{Context, Poll}}; +use tokio::io::{ReadBuf, SeekFrom}; /// Wraps an iterator to display its progress. pub trait ProgressIterator @@ -141,8 +144,12 @@ impl io::Seek for ProgressBarIter { } #[cfg(feature = "tokio")] -impl tokio::io::AsyncWrite for ProgressBarIter { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { +impl tokio::io::AsyncWrite for ProgressBarIter { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { Pin::new(&mut self.it).poll_write(cx, buf).map(|poll| { poll.map(|inc| { self.progress.inc(inc as u64); @@ -160,23 +167,25 @@ impl tokio::io::AsyncWrite for ProgressBarIte } } - #[cfg(feature = "tokio")] -impl tokio::io::AsyncRead for ProgressBarIter { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { +impl tokio::io::AsyncRead for ProgressBarIter { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { let prev_len = buf.filled().len() as u64; if let Poll::Ready(e) = Pin::new(&mut self.it).poll_read(cx, buf) { - self.progress.inc(buf.filled().len() as u64 - prev_len); - Poll::Ready(e) - } - else { + self.progress.inc(buf.filled().len() as u64 - prev_len); + Poll::Ready(e) + } else { Poll::Pending } } } #[cfg(feature = "tokio")] -impl tokio::io::AsyncSeek for ProgressBarIter { +impl tokio::io::AsyncSeek for ProgressBarIter { fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> { Pin::new(&mut self.it).start_seek(position) } diff --git a/src/progress_bar.rs b/src/progress_bar.rs index 80f5b577..c8eca743 100644 --- a/src/progress_bar.rs +++ b/src/progress_bar.rs @@ -472,7 +472,10 @@ impl ProgressBar { /// # Ok(()) /// # } /// ``` - pub fn wrap_async_write(&self, write: W) -> ProgressBarIter { + pub fn wrap_async_write( + &self, + write: W, + ) -> ProgressBarIter { ProgressBarIter { progress: self.clone(), it: write,