Skip to content

Commit

Permalink
Added AsyncRead and AsyncWrite support for ProgressBar (#308)
Browse files Browse the repository at this point in the history
  • Loading branch information
x0f5c3 committed Sep 18, 2021
1 parent 82644f3 commit 33a411b
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -19,6 +19,7 @@ console = { version = ">=0.9.1, <1.0.0", default-features = false }
unicode-segmentation = { version = "1.6.0", optional = true }
unicode-width = { version = "0.1.7", optional = true }
rayon = { version = "1.0", optional = true }
tokio = { version = "1.0", optional = true, features = ["fs", "io-util"] }

[dev-dependencies]
rand = "0.8"
Expand Down
59 changes: 59 additions & 0 deletions src/iter.rs
Expand Up @@ -2,6 +2,13 @@ use crate::progress_bar::ProgressBar;
use std::convert::TryFrom;
use std::io::{self, IoSliceMut};
use std::iter::FusedIterator;
#[cfg(feature = "tokio")]
use std::{
pin::Pin,
task::{Context, Poll},
};
#[cfg(feature = "tokio")]
use tokio::io::{ReadBuf, SeekFrom};

/// Wraps an iterator to display its progress.
pub trait ProgressIterator
Expand Down Expand Up @@ -141,6 +148,58 @@ impl<S: io::Seek> io::Seek for ProgressBarIter<S> {
}
}

#[cfg(feature = "tokio")]
impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for ProgressBarIter<W> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.it).poll_write(cx, buf).map(|poll| {
poll.map(|inc| {
self.progress.inc(inc as u64);
inc
})
})
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.it).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.it).poll_shutdown(cx)
}
}

#[cfg(feature = "tokio")]
impl<W: tokio::io::AsyncRead + Unpin> tokio::io::AsyncRead for ProgressBarIter<W> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let prev_len = buf.filled().len() as u64;
if let Poll::Ready(e) = Pin::new(&mut self.it).poll_read(cx, buf) {
self.progress.inc(buf.filled().len() as u64 - prev_len);
Poll::Ready(e)
} else {
Poll::Pending
}
}
}

#[cfg(feature = "tokio")]
impl<W: tokio::io::AsyncSeek + Unpin> tokio::io::AsyncSeek for ProgressBarIter<W> {
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
Pin::new(&mut self.it).start_seek(position)
}

fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
Pin::new(&mut self.it).poll_complete(cx)
}
}

impl<W: io::Write> io::Write for ProgressBarIter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.it.write(buf).map(|inc| {
Expand Down
46 changes: 46 additions & 0 deletions src/progress_bar.rs
Expand Up @@ -457,6 +457,52 @@ impl ProgressBar {
}
}

#[cfg(feature = "tokio")]
/// Wraps an [`tokio::io::AsyncWrite`] with the progress bar
///
/// ```rust,no_run
/// # use tokio::fs::File;
/// # use tokio::io;
/// # use indicatif::ProgressBar;
/// # async fn test() -> io::Result<()> {
/// let mut source = File::open("work.txt").await?;
/// let mut target = File::open("done.txt").await?;
/// let pb = ProgressBar::new(source.metadata().await?.len());
/// io::copy(&mut source, &mut pb.wrap_async_write(target)).await?;
/// # Ok(())
/// # }
/// ```
pub fn wrap_async_write<W: tokio::io::AsyncWrite + Unpin>(
&self,
write: W,
) -> ProgressBarIter<W> {
ProgressBarIter {
progress: self.clone(),
it: write,
}
}
#[cfg(feature = "tokio")]
/// Wraps an [`tokio::io::AsyncRead`] with the progress bar
///
/// ```rust,no_run
/// # use tokio::fs::File;
/// # use tokio::io;
/// # use indicatif::ProgressBar;
/// # async fn test() -> io::Result<()> {
/// let mut source = File::open("work.txt").await?;
/// let mut target = File::open("done.txt").await?;
/// let pb = ProgressBar::new(source.metadata().await?.len());
/// io::copy(&mut pb.wrap_async_read(source), &mut target).await?;
/// # Ok(())
/// # }
/// ```
pub fn wrap_async_read<W: tokio::io::AsyncRead + Unpin>(&self, write: W) -> ProgressBarIter<W> {
ProgressBarIter {
progress: self.clone(),
it: write,
}
}

fn update_and_draw<F: FnOnce(&mut ProgressState)>(&self, f: F) {
// Delegate to the wrapped state.
let mut state = self.state.lock().unwrap();
Expand Down

0 comments on commit 33a411b

Please sign in to comment.