diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs index 219a0a305be..7b21c9ccec0 100644 --- a/tokio/src/fs/read_dir.rs +++ b/tokio/src/fs/read_dir.rs @@ -52,7 +52,25 @@ impl ReadDir { poll_fn(|cx| self.poll_next_entry(cx)).await } - fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll>> { + /// Polls for the next directory entry in the stream. + /// + /// This method returns: + /// + /// * `Poll::Pending` if the next directory entry is not yet available. + /// * `Poll::Ready(Ok(Some(entry)))` if the next directory entry is available. + /// * `Poll::Ready(Ok(None))` if there are no more directory entries in this + /// stream. + /// * `Poll::Ready(Err(err))` if an IO error occurred while reading the next + /// directory entry. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when the next directory entry + /// becomes available on the underlying IO resource. + /// + /// Note that on multiple calls to `poll_next_entry`, only the `Waker` from + /// the `Context` passed to the most recent call is scheduled to receive a + /// wakeup. + pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll>> { loop { match self.0 { State::Idle(ref mut std) => { diff --git a/tokio/src/io/util/lines.rs b/tokio/src/io/util/lines.rs index 5ce249c0ed8..25df78e99f1 100644 --- a/tokio/src/io/util/lines.rs +++ b/tokio/src/io/util/lines.rs @@ -83,7 +83,23 @@ impl Lines where R: AsyncBufRead, { - fn poll_next_line( + /// Polls for the next line in the stream. + /// + /// This method returns: + /// + /// * `Poll::Pending` if the next line is not yet available. + /// * `Poll::Ready(Ok(Some(line)))` if the next line is available. + /// * `Poll::Ready(Ok(None))` if there are no more lines in this stream. + /// * `Poll::Ready(Err(err))` if an IO error occurred while reading the next line. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when more bytes become + /// available on the underlying IO resource. + /// + /// Note that on multiple calls to `poll_next_line`, only the `Waker` from + /// the `Context` passed to the most recent call is scheduled to receive a + /// wakeup. + pub fn poll_next_line( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { diff --git a/tokio/src/io/util/split.rs b/tokio/src/io/util/split.rs index 75115aa08b9..eb828659e08 100644 --- a/tokio/src/io/util/split.rs +++ b/tokio/src/io/util/split.rs @@ -65,7 +65,24 @@ impl Split where R: AsyncBufRead, { - fn poll_next_segment( + /// Polls for the next segment in the stream. + /// + /// This method returns: + /// + /// * `Poll::Pending` if the next segment is not yet available. + /// * `Poll::Ready(Ok(Some(segment)))` if the next segment is available. + /// * `Poll::Ready(Ok(None))` if there are no more segments in this stream. + /// * `Poll::Ready(Err(err))` if an IO error occurred while reading the + /// next segment. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when more bytes become + /// available on the underlying IO resource. + /// + /// Note that on multiple calls to `poll_next_segment`, only the `Waker` + /// from the `Context` passed to the most recent call is scheduled to + /// receive a wakeup. + pub fn poll_next_segment( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>>> { diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 0cc004dd094..946006bb5d1 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -409,7 +409,7 @@ mod util; /// release, most of the Tokio stream utilities have been moved into the [`tokio-stream`] /// crate. /// -/// # Why was `Stream` no included in Tokio 1.0? +/// # Why was `Stream` not included in Tokio 1.0? /// /// Originally, we had planned to ship Tokio 1.0 with a stable `Stream` type /// but unfortunetly the [RFC] had not been merged in time for `Stream` to @@ -424,6 +424,7 @@ mod util; /// to create a `impl Stream` from `async fn` using the [`async-stream`] crate. /// /// [`tokio-stream`]: https://docs.rs/tokio-stream +/// [`async-stream`]: https://docs.rs/async-stream /// [RFC]: https://github.com/rust-lang/rfcs/pull/2996 /// /// # Example diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index 9ac1b3e83db..a2a8637ecff 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -155,11 +155,9 @@ impl TcpListener { /// Polls to accept a new incoming connection to this listener. /// /// If there is no connection to accept, `Poll::Pending` is returned and the - /// current task will be notified by a waker. - /// - /// When ready, the most recent task that called `poll_accept` is notified. - /// The caller is responsible to ensure that `poll_accept` is called from a - /// single task. Failing to do this could result in tasks hanging. + /// current task will be notified by a waker. Note that on multiple calls + /// to `poll_accept`, only the `Waker` from the `Context` passed to the most + /// recent call is scheduled to receive a wakeup. pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { loop { let ev = ready!(self.io.registration().poll_read_ready(cx))?; diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 7d23a2e3149..9ed4ce175b0 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -109,12 +109,10 @@ impl UnixListener { /// Polls to accept a new incoming connection to this listener. /// - /// If there is no connection to accept, `Poll::Pending` is returned and - /// the current task will be notified by a waker. - /// - /// When ready, the most recent task that called `poll_accept` is notified. - /// The caller is responsible to ensure that `poll_accept` is called from a - /// single task. Failing to do this could result in tasks hanging. + /// If there is no connection to accept, `Poll::Pending` is returned and the + /// current task will be notified by a waker. Note that on multiple calls + /// to `poll_accept`, only the `Waker` from the `Context` passed to the most + /// recent call is scheduled to receive a wakeup. pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { let (sock, addr) = ready!(self.io.registration().poll_read_io(cx, || self.io.accept()))?; let addr = SocketAddr(addr); diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index dbe4559330a..2dae7e26fe4 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -11,7 +11,6 @@ cfg_time! { } use std::fmt; -#[cfg(any(feature = "signal", feature = "process"))] use std::task::{Context, Poll}; /// Send values to the associated `Receiver`. @@ -147,11 +146,6 @@ impl Receiver { poll_fn(|cx| self.chan.recv(cx)).await } - #[cfg(any(feature = "signal", feature = "process"))] - pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - self.chan.recv(cx) - } - /// Blocking receive to call outside of asynchronous contexts. /// /// # Panics @@ -243,6 +237,25 @@ impl Receiver { pub fn close(&mut self) { self.chan.close(); } + + /// Polls to receive the next message on this channel. + /// + /// This method returns: + /// + /// * `Poll::Pending` if no messages are available but the channel is not + /// closed. + /// * `Poll::Ready(Some(message))` if a message is available. + /// * `Poll::Ready(None)` if the channel has been closed and all messages + /// sent before it was closed have been received. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when a message is sent on any + /// receiver, or when the channel is closed. Note that on multiple calls to + /// `poll_recv`, only the `Waker` from the `Context` passed to the most + /// recent call is scheduled to receive a wakeup. + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } } impl fmt::Debug for Receiver { diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index e8e3faccb30..38953b8f978 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -73,10 +73,6 @@ impl UnboundedReceiver { UnboundedReceiver { chan } } - fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - self.chan.recv(cx) - } - /// Receives the next value for this receiver. /// /// `None` is returned when all `Sender` halves have dropped, indicating @@ -159,6 +155,25 @@ impl UnboundedReceiver { pub fn close(&mut self) { self.chan.close(); } + + /// Polls to receive the next message on this channel. + /// + /// This method returns: + /// + /// * `Poll::Pending` if no messages are available but the channel is not + /// closed. + /// * `Poll::Ready(Some(message))` if a message is available. + /// * `Poll::Ready(None)` if the channel has been closed and all messages + /// sent before it was closed have been received. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when a message is sent on any + /// receiver, or when the channel is closed. Note that on multiple calls to + /// `poll_recv`, only the `Waker` from the `Context` passed to the most + /// recent call is scheduled to receive a wakeup. + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } } impl UnboundedSender { diff --git a/tokio/tests/support/mpsc_stream.rs b/tokio/tests/support/mpsc_stream.rs index 3df541ff75c..aa385a39dcf 100644 --- a/tokio/tests/support/mpsc_stream.rs +++ b/tokio/tests/support/mpsc_stream.rs @@ -1,29 +1,42 @@ #![allow(dead_code)] -use async_stream::stream; -use tokio::sync::mpsc::{self, Sender, UnboundedSender}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}; use tokio_stream::Stream; +struct UnboundedStream { + recv: UnboundedReceiver, +} +impl Stream for UnboundedStream { + type Item = T; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).recv.poll_recv(cx) + } +} + pub fn unbounded_channel_stream() -> (UnboundedSender, impl Stream) { - let (tx, mut rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::unbounded_channel(); - let stream = stream! { - while let Some(item) = rx.recv().await { - yield item; - } - }; + let stream = UnboundedStream { recv: rx }; (tx, stream) } +struct BoundedStream { + recv: Receiver, +} +impl Stream for BoundedStream { + type Item = T; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).recv.poll_recv(cx) + } +} + pub fn channel_stream(size: usize) -> (Sender, impl Stream) { - let (tx, mut rx) = mpsc::channel(size); + let (tx, rx) = mpsc::channel(size); - let stream = stream! { - while let Some(item) = rx.recv().await { - yield item; - } - }; + let stream = BoundedStream { recv: rx }; (tx, stream) }