Skip to content

Commit

Permalink
feat: add futures::Stream support (#551)
Browse files Browse the repository at this point in the history
  • Loading branch information
aatifsyed committed Jun 30, 2023
1 parent 1bf2cce commit 1cbfe20
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Expand Up @@ -14,6 +14,7 @@ rust-version = "1.62.1"

[dependencies]
console = { version = "0.15", default-features = false, features = ["ansi-parsing"] }
futures-core = { version = "0.3", default-features = false, optional = true }
number_prefix = "0.4"
portable-atomic = "1.0.0"
rayon = { version = "1.1", optional = true }
Expand All @@ -27,6 +28,7 @@ clap = { version = "4", features = ["color", "derive"] }
once_cell = "1"
rand = "0.8"
tokio = { version = "1", features = ["fs", "time", "rt"] }
futures = "0.3" # so the doctest for wrap_stream is nice

[target.'cfg(target_arch = "wasm32")'.dependencies]
instant = "0.1"
Expand All @@ -35,6 +37,7 @@ instant = "0.1"
default = ["unicode-width", "console/unicode-width"]
improved_unicode = ["unicode-segmentation", "unicode-width", "console/unicode-width"]
in_memory = ["vt100"]
futures = ["dep:futures-core"]

[package.metadata.docs.rs]
all-features = true
Expand Down
20 changes: 20 additions & 0 deletions src/iter.rs
Expand Up @@ -277,6 +277,26 @@ impl<W: tokio::io::AsyncBufRead + Unpin + tokio::io::AsyncRead> tokio::io::Async
}
}

#[cfg(feature = "futures")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
impl<S: futures_core::Stream + Unpin> futures_core::Stream for ProgressBarIter<S> {
type Item = S::Item;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
let item = std::pin::Pin::new(&mut this.it).poll_next(cx);
match &item {
std::task::Poll::Ready(Some(_)) => this.progress.inc(1),
std::task::Poll::Ready(None) => this.progress.finish_using_style(),
std::task::Poll::Pending => {}
}
item
}
}

impl<W: io::Write> io::Write for ProgressBarIter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.it.write(buf).map(|inc| {
Expand Down
22 changes: 22 additions & 0 deletions src/progress_bar.rs
Expand Up @@ -511,6 +511,28 @@ impl ProgressBar {
}
}

/// Wraps a [`futures::Stream`](https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html) with the progress bar
///
/// ```
/// # use indicatif::ProgressBar;
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
/// let pb = ProgressBar::new(10);
/// let mut stream = pb.wrap_stream(stream::iter('a'..='z'));
///
/// assert_eq!(stream.next().await, Some('a'));
/// assert_eq!(stream.count().await, 25);
/// # }); // block_on
/// ```
#[cfg(feature = "futures")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
pub fn wrap_stream<S: futures_core::Stream>(&self, stream: S) -> ProgressBarIter<S> {
ProgressBarIter {
progress: self.clone(),
it: stream,
}
}

/// Returns the current position
pub fn position(&self) -> u64 {
self.state().state.pos()
Expand Down

0 comments on commit 1cbfe20

Please sign in to comment.