Skip to content

Commit

Permalink
stream: update features and doc for broadcast/watch stream (#3504)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Feb 5, 2021
1 parent fcb6d04 commit 0a04954
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 6 deletions.
3 changes: 2 additions & 1 deletion tokio-stream/Cargo.toml
Expand Up @@ -25,12 +25,13 @@ time = ["tokio/time"]
net = ["tokio/net"]
io-util = ["tokio/io-util"]
fs = ["tokio/fs"]
sync = ["tokio/sync", "tokio-util"]

[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
tokio = { version = "1.0", features = ["sync"] }
tokio-util = { version = "0.6.3" }
tokio-util = { version = "0.6.3", optional = true }

[dev-dependencies]
tokio = { version = "1.0", features = ["full", "test-util"] }
Expand Down
10 changes: 10 additions & 0 deletions tokio-stream/src/macros.rs
Expand Up @@ -38,6 +38,16 @@ macro_rules! cfg_time {
}
}

macro_rules! cfg_sync {
($($item:item)*) => {
$(
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
$item
)*
}
}

macro_rules! ready {
($e:expr $(,)?) => {
match $e {
Expand Down
18 changes: 13 additions & 5 deletions tokio-stream/src/wrappers.rs
@@ -1,17 +1,25 @@
//! Wrappers for Tokio types that implement `Stream`.

/// Error types for the wrappers.
pub mod errors {
cfg_sync! {
pub use crate::wrappers::broadcast::BroadcastStreamRecvError;
}
}

mod mpsc_bounded;
pub use mpsc_bounded::ReceiverStream;

mod mpsc_unbounded;
pub use mpsc_unbounded::UnboundedReceiverStream;

mod broadcast;
pub use broadcast::BroadcastStream;
pub use broadcast::BroadcastStreamRecvError;
cfg_sync! {
mod broadcast;
pub use broadcast::BroadcastStream;

mod watch;
pub use watch::WatchStream;
mod watch;
pub use watch::WatchStream;
}

cfg_time! {
mod interval;
Expand Down
1 change: 1 addition & 0 deletions tokio-stream/src/wrappers/broadcast.rs
Expand Up @@ -12,6 +12,7 @@ use std::task::{Context, Poll};
///
/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub struct BroadcastStream<T> {
inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>,
}
Expand Down
1 change: 1 addition & 0 deletions tokio-stream/src/wrappers/watch.rs
Expand Up @@ -12,6 +12,7 @@ use tokio::sync::watch::error::RecvError;
///
/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub struct WatchStream<T> {
inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>,
}
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/sync/broadcast.rs
Expand Up @@ -163,6 +163,11 @@ pub struct Sender<T> {
/// Must not be used concurrently. Messages may be retrieved using
/// [`recv`][Receiver::recv].
///
/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
/// wrapper.
///
/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
///
/// # Examples
///
/// ```
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/sync/watch.rs
Expand Up @@ -61,6 +61,11 @@ use std::ops;
/// Receives values from the associated [`Sender`](struct@Sender).
///
/// Instances are created by the [`channel`](fn@channel) function.
///
/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
/// wrapper.
///
/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
#[derive(Debug)]
pub struct Receiver<T> {
/// Pointer to the shared state
Expand Down

0 comments on commit 0a04954

Please sign in to comment.