From fb50213352e563a31dff8d33632c8824242627c0 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 22 Dec 2020 11:18:09 +0100 Subject: [PATCH 1/7] Make Lines and Split poll_next public --- tokio/src/io/util/lines.rs | 18 +++++++++++++++++- tokio/src/io/util/split.rs | 19 ++++++++++++++++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/tokio/src/io/util/lines.rs b/tokio/src/io/util/lines.rs index 5ce249c0ed8..25df78e99f1 100644 --- a/tokio/src/io/util/lines.rs +++ b/tokio/src/io/util/lines.rs @@ -83,7 +83,23 @@ impl Lines where R: AsyncBufRead, { - fn poll_next_line( + /// Polls for the next line in the stream. + /// + /// This method returns: + /// + /// * `Poll::Pending` if the next line is not yet available. + /// * `Poll::Ready(Ok(Some(line)))` if the next line is available. + /// * `Poll::Ready(Ok(None))` if there are no more lines in this stream. + /// * `Poll::Ready(Err(err))` if an IO error occurred while reading the next line. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when more bytes become + /// available on the underlying IO resource. + /// + /// Note that on multiple calls to `poll_next_line`, only the `Waker` from + /// the `Context` passed to the most recent call is scheduled to receive a + /// wakeup. + pub fn poll_next_line( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { diff --git a/tokio/src/io/util/split.rs b/tokio/src/io/util/split.rs index 75115aa08b9..eb828659e08 100644 --- a/tokio/src/io/util/split.rs +++ b/tokio/src/io/util/split.rs @@ -65,7 +65,24 @@ impl Split where R: AsyncBufRead, { - fn poll_next_segment( + /// Polls for the next segment in the stream. + /// + /// This method returns: + /// + /// * `Poll::Pending` if the next segment is not yet available. + /// * `Poll::Ready(Ok(Some(segment)))` if the next segment is available. + /// * `Poll::Ready(Ok(None))` if there are no more segments in this stream. + /// * `Poll::Ready(Err(err))` if an IO error occurred while reading the + /// next segment. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when more bytes become + /// available on the underlying IO resource. + /// + /// Note that on multiple calls to `poll_next_segment`, only the `Waker` + /// from the `Context` passed to the most recent call is scheduled to + /// receive a wakeup. + pub fn poll_next_segment( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>>> { From 763ab7dbede5c889d4f117247ab7e5e296dc5ab6 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 22 Dec 2020 11:18:22 +0100 Subject: [PATCH 2/7] Update poll_accept doc --- tokio/src/net/tcp/listener.rs | 8 +++----- tokio/src/net/unix/listener.rs | 10 ++++------ 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index 9ac1b3e83db..a2a8637ecff 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -155,11 +155,9 @@ impl TcpListener { /// Polls to accept a new incoming connection to this listener. /// /// If there is no connection to accept, `Poll::Pending` is returned and the - /// current task will be notified by a waker. - /// - /// When ready, the most recent task that called `poll_accept` is notified. - /// The caller is responsible to ensure that `poll_accept` is called from a - /// single task. Failing to do this could result in tasks hanging. + /// current task will be notified by a waker. Note that on multiple calls + /// to `poll_accept`, only the `Waker` from the `Context` passed to the most + /// recent call is scheduled to receive a wakeup. pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { loop { let ev = ready!(self.io.registration().poll_read_ready(cx))?; diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 7d23a2e3149..9ed4ce175b0 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -109,12 +109,10 @@ impl UnixListener { /// Polls to accept a new incoming connection to this listener. /// - /// If there is no connection to accept, `Poll::Pending` is returned and - /// the current task will be notified by a waker. - /// - /// When ready, the most recent task that called `poll_accept` is notified. - /// The caller is responsible to ensure that `poll_accept` is called from a - /// single task. Failing to do this could result in tasks hanging. + /// If there is no connection to accept, `Poll::Pending` is returned and the + /// current task will be notified by a waker. Note that on multiple calls + /// to `poll_accept`, only the `Waker` from the `Context` passed to the most + /// recent call is scheduled to receive a wakeup. pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { let (sock, addr) = ready!(self.io.registration().poll_read_io(cx, || self.io.accept()))?; let addr = SocketAddr(addr); From bd2068577a70d5958648e09c08ad0be16f4c232e Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 22 Dec 2020 11:32:40 +0100 Subject: [PATCH 3/7] Expose poll_recv on mpsc --- tokio/src/sync/mpsc/bounded.rs | 24 +++++++++++++++++++----- tokio/src/sync/mpsc/unbounded.rs | 23 +++++++++++++++++++---- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index dbe4559330a..09709b05e73 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -147,11 +147,6 @@ impl Receiver { poll_fn(|cx| self.chan.recv(cx)).await } - #[cfg(any(feature = "signal", feature = "process"))] - pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - self.chan.recv(cx) - } - /// Blocking receive to call outside of asynchronous contexts. /// /// # Panics @@ -243,6 +238,25 @@ impl Receiver { pub fn close(&mut self) { self.chan.close(); } + + /// Polls to receive the next message on this channel. + /// + /// This method returns: + /// + /// * `Poll::Pending` if no messages are available but the channel is not + /// closed. + /// * `Poll::Ready(Some(message))` if a message is available. + /// * `Poll::Ready(None)` if the channel has been closed and all messages + /// sent before it was closed have been received. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when a message is sent on any + /// receiver, or when the channel is closed. Note that on multiple calls to + /// `poll_recv`, only the `Waker` from the `Context` passed to the most + /// recent call is scheduled to receive a wakeup. + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } } impl fmt::Debug for Receiver { diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index e8e3faccb30..38953b8f978 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -73,10 +73,6 @@ impl UnboundedReceiver { UnboundedReceiver { chan } } - fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - self.chan.recv(cx) - } - /// Receives the next value for this receiver. /// /// `None` is returned when all `Sender` halves have dropped, indicating @@ -159,6 +155,25 @@ impl UnboundedReceiver { pub fn close(&mut self) { self.chan.close(); } + + /// Polls to receive the next message on this channel. + /// + /// This method returns: + /// + /// * `Poll::Pending` if no messages are available but the channel is not + /// closed. + /// * `Poll::Ready(Some(message))` if a message is available. + /// * `Poll::Ready(None)` if the channel has been closed and all messages + /// sent before it was closed have been received. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when a message is sent on any + /// receiver, or when the channel is closed. Note that on multiple calls to + /// `poll_recv`, only the `Waker` from the `Context` passed to the most + /// recent call is scheduled to receive a wakeup. + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } } impl UnboundedSender { From 0f1c3fc0085998c3d8b147de6d6a627762b8a602 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 22 Dec 2020 11:47:31 +0100 Subject: [PATCH 4/7] Fix imports in mpsc --- tokio/src/sync/mpsc/bounded.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 09709b05e73..2dae7e26fe4 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -11,7 +11,6 @@ cfg_time! { } use std::fmt; -#[cfg(any(feature = "signal", feature = "process"))] use std::task::{Context, Poll}; /// Send values to the associated `Receiver`. From fbbb0f9f1c2731dd22bab4f2e84424ba6a826813 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 22 Dec 2020 11:48:55 +0100 Subject: [PATCH 5/7] Expose DirEntry::poll_next_entry --- tokio/src/fs/read_dir.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs index 219a0a305be..7b21c9ccec0 100644 --- a/tokio/src/fs/read_dir.rs +++ b/tokio/src/fs/read_dir.rs @@ -52,7 +52,25 @@ impl ReadDir { poll_fn(|cx| self.poll_next_entry(cx)).await } - fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll>> { + /// Polls for the next directory entry in the stream. + /// + /// This method returns: + /// + /// * `Poll::Pending` if the next directory entry is not yet available. + /// * `Poll::Ready(Ok(Some(entry)))` if the next directory entry is available. + /// * `Poll::Ready(Ok(None))` if there are no more directory entries in this + /// stream. + /// * `Poll::Ready(Err(err))` if an IO error occurred while reading the next + /// directory entry. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when the next directory entry + /// becomes available on the underlying IO resource. + /// + /// Note that on multiple calls to `poll_next_entry`, only the `Waker` from + /// the `Context` passed to the most recent call is scheduled to receive a + /// wakeup. + pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll>> { loop { match self.0 { State::Idle(ref mut std) => { From 7e072a0c9846c9482df32f361b9a75117a72e469 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 22 Dec 2020 12:00:18 +0100 Subject: [PATCH 6/7] Use mpsc' poll_recv in tests --- tokio/tests/support/mpsc_stream.rs | 41 ++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/tokio/tests/support/mpsc_stream.rs b/tokio/tests/support/mpsc_stream.rs index 3df541ff75c..aa385a39dcf 100644 --- a/tokio/tests/support/mpsc_stream.rs +++ b/tokio/tests/support/mpsc_stream.rs @@ -1,29 +1,42 @@ #![allow(dead_code)] -use async_stream::stream; -use tokio::sync::mpsc::{self, Sender, UnboundedSender}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}; use tokio_stream::Stream; +struct UnboundedStream { + recv: UnboundedReceiver, +} +impl Stream for UnboundedStream { + type Item = T; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).recv.poll_recv(cx) + } +} + pub fn unbounded_channel_stream() -> (UnboundedSender, impl Stream) { - let (tx, mut rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::unbounded_channel(); - let stream = stream! { - while let Some(item) = rx.recv().await { - yield item; - } - }; + let stream = UnboundedStream { recv: rx }; (tx, stream) } +struct BoundedStream { + recv: Receiver, +} +impl Stream for BoundedStream { + type Item = T; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).recv.poll_recv(cx) + } +} + pub fn channel_stream(size: usize) -> (Sender, impl Stream) { - let (tx, mut rx) = mpsc::channel(size); + let (tx, rx) = mpsc::channel(size); - let stream = stream! { - while let Some(item) = rx.recv().await { - yield item; - } - }; + let stream = BoundedStream { recv: rx }; (tx, stream) } From 8aa067aa83e312ba149490b472fdf820237a29c6 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 22 Dec 2020 12:04:24 +0100 Subject: [PATCH 7/7] Fix documentation typo --- tokio/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 0cc004dd094..946006bb5d1 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -409,7 +409,7 @@ mod util; /// release, most of the Tokio stream utilities have been moved into the [`tokio-stream`] /// crate. /// -/// # Why was `Stream` no included in Tokio 1.0? +/// # Why was `Stream` not included in Tokio 1.0? /// /// Originally, we had planned to ship Tokio 1.0 with a stable `Stream` type /// but unfortunetly the [RFC] had not been merged in time for `Stream` to @@ -424,6 +424,7 @@ mod util; /// to create a `impl Stream` from `async fn` using the [`async-stream`] crate. /// /// [`tokio-stream`]: https://docs.rs/tokio-stream +/// [`async-stream`]: https://docs.rs/async-stream /// [RFC]: https://github.com/rust-lang/rfcs/pull/2996 /// /// # Example