Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose poll_next replacement on Split, Lines, DirEntry, Receiver and UnboundedReceiver #3308

Merged
merged 7 commits into from Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 19 additions & 1 deletion tokio/src/fs/read_dir.rs
Expand Up @@ -52,7 +52,25 @@ impl ReadDir {
poll_fn(|cx| self.poll_next_entry(cx)).await
}

fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<Option<DirEntry>>> {
/// Polls for the next directory entry in the stream.
///
/// This method returns:
///
/// * `Poll::Pending` if the next directory entry is not yet available.
/// * `Poll::Ready(Ok(Some(entry)))` if the next directory entry is available.
/// * `Poll::Ready(Ok(None))` if there are no more directory entries in this
/// stream.
/// * `Poll::Ready(Err(err))` if an IO error occurred while reading the next
/// directory entry.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when the next directory entry
/// becomes available on the underlying IO resource.
///
/// Note that on multiple calls to `poll_next_entry`, only the `Waker` from
/// the `Context` passed to the most recent call is scheduled to receive a
/// wakeup.
pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<Option<DirEntry>>> {
loop {
match self.0 {
State::Idle(ref mut std) => {
Expand Down
18 changes: 17 additions & 1 deletion tokio/src/io/util/lines.rs
Expand Up @@ -83,7 +83,23 @@ impl<R> Lines<R>
where
R: AsyncBufRead,
{
fn poll_next_line(
/// Polls for the next line in the stream.
///
/// This method returns:
///
/// * `Poll::Pending` if the next line is not yet available.
/// * `Poll::Ready(Ok(Some(line)))` if the next line is available.
/// * `Poll::Ready(Ok(None))` if there are no more lines in this stream.
/// * `Poll::Ready(Err(err))` if an IO error occurred while reading the next line.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when more bytes become
/// available on the underlying IO resource.
///
/// Note that on multiple calls to `poll_next_line`, only the `Waker` from
/// the `Context` passed to the most recent call is scheduled to receive a
/// wakeup.
pub fn poll_next_line(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<Option<String>>> {
Expand Down
19 changes: 18 additions & 1 deletion tokio/src/io/util/split.rs
Expand Up @@ -65,7 +65,24 @@ impl<R> Split<R>
where
R: AsyncBufRead,
{
fn poll_next_segment(
/// Polls for the next segment in the stream.
///
/// This method returns:
///
/// * `Poll::Pending` if the next segment is not yet available.
/// * `Poll::Ready(Ok(Some(segment)))` if the next segment is available.
/// * `Poll::Ready(Ok(None))` if there are no more segments in this stream.
/// * `Poll::Ready(Err(err))` if an IO error occurred while reading the
/// next segment.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when more bytes become
/// available on the underlying IO resource.
///
/// Note that on multiple calls to `poll_next_segment`, only the `Waker`
/// from the `Context` passed to the most recent call is scheduled to
/// receive a wakeup.
pub fn poll_next_segment(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<Option<Vec<u8>>>> {
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/lib.rs
Expand Up @@ -409,7 +409,7 @@ mod util;
/// release, most of the Tokio stream utilities have been moved into the [`tokio-stream`]
/// crate.
///
/// # Why was `Stream` no included in Tokio 1.0?
/// # Why was `Stream` not included in Tokio 1.0?
///
/// Originally, we had planned to ship Tokio 1.0 with a stable `Stream` type
/// but unfortunetly the [RFC] had not been merged in time for `Stream` to
Expand All @@ -424,6 +424,7 @@ mod util;
/// to create a `impl Stream` from `async fn` using the [`async-stream`] crate.
///
/// [`tokio-stream`]: https://docs.rs/tokio-stream
/// [`async-stream`]: https://docs.rs/async-stream
/// [RFC]: https://github.com/rust-lang/rfcs/pull/2996
///
/// # Example
Expand Down
8 changes: 3 additions & 5 deletions tokio/src/net/tcp/listener.rs
Expand Up @@ -155,11 +155,9 @@ impl TcpListener {
/// Polls to accept a new incoming connection to this listener.
///
/// If there is no connection to accept, `Poll::Pending` is returned and the
/// current task will be notified by a waker.
///
/// When ready, the most recent task that called `poll_accept` is notified.
/// The caller is responsible to ensure that `poll_accept` is called from a
/// single task. Failing to do this could result in tasks hanging.
/// current task will be notified by a waker. Note that on multiple calls
/// to `poll_accept`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
loop {
let ev = ready!(self.io.registration().poll_read_ready(cx))?;
Expand Down
10 changes: 4 additions & 6 deletions tokio/src/net/unix/listener.rs
Expand Up @@ -109,12 +109,10 @@ impl UnixListener {

/// Polls to accept a new incoming connection to this listener.
///
/// If there is no connection to accept, `Poll::Pending` is returned and
/// the current task will be notified by a waker.
///
/// When ready, the most recent task that called `poll_accept` is notified.
/// The caller is responsible to ensure that `poll_accept` is called from a
/// single task. Failing to do this could result in tasks hanging.
/// If there is no connection to accept, `Poll::Pending` is returned and the
/// current task will be notified by a waker. Note that on multiple calls
/// to `poll_accept`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
let (sock, addr) = ready!(self.io.registration().poll_read_io(cx, || self.io.accept()))?;
let addr = SocketAddr(addr);
Expand Down
25 changes: 19 additions & 6 deletions tokio/src/sync/mpsc/bounded.rs
Expand Up @@ -11,7 +11,6 @@ cfg_time! {
}

use std::fmt;
#[cfg(any(feature = "signal", feature = "process"))]
use std::task::{Context, Poll};

/// Send values to the associated `Receiver`.
Expand Down Expand Up @@ -147,11 +146,6 @@ impl<T> Receiver<T> {
poll_fn(|cx| self.chan.recv(cx)).await
}

#[cfg(any(feature = "signal", feature = "process"))]
pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Blocking receive to call outside of asynchronous contexts.
///
/// # Panics
Expand Down Expand Up @@ -243,6 +237,25 @@ impl<T> Receiver<T> {
pub fn close(&mut self) {
self.chan.close();
}

/// Polls to receive the next message on this channel.
///
/// This method returns:
///
/// * `Poll::Pending` if no messages are available but the channel is not
/// closed.
/// * `Poll::Ready(Some(message))` if a message is available.
/// * `Poll::Ready(None)` if the channel has been closed and all messages
/// sent before it was closed have been received.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
}

impl<T> fmt::Debug for Receiver<T> {
Expand Down
23 changes: 19 additions & 4 deletions tokio/src/sync/mpsc/unbounded.rs
Expand Up @@ -73,10 +73,6 @@ impl<T> UnboundedReceiver<T> {
UnboundedReceiver { chan }
}

fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Receives the next value for this receiver.
///
/// `None` is returned when all `Sender` halves have dropped, indicating
Expand Down Expand Up @@ -159,6 +155,25 @@ impl<T> UnboundedReceiver<T> {
pub fn close(&mut self) {
self.chan.close();
}

/// Polls to receive the next message on this channel.
///
/// This method returns:
///
/// * `Poll::Pending` if no messages are available but the channel is not
/// closed.
/// * `Poll::Ready(Some(message))` if a message is available.
/// * `Poll::Ready(None)` if the channel has been closed and all messages
/// sent before it was closed have been received.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
}

impl<T> UnboundedSender<T> {
Expand Down
41 changes: 27 additions & 14 deletions tokio/tests/support/mpsc_stream.rs
@@ -1,29 +1,42 @@
#![allow(dead_code)]

use async_stream::stream;
use tokio::sync::mpsc::{self, Sender, UnboundedSender};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
use tokio_stream::Stream;

struct UnboundedStream<T> {
recv: UnboundedReceiver<T>,
}
impl<T> Stream for UnboundedStream<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
Pin::into_inner(self).recv.poll_recv(cx)
}
}

pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) {
let (tx, mut rx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel();

let stream = stream! {
while let Some(item) = rx.recv().await {
yield item;
}
};
let stream = UnboundedStream { recv: rx };

(tx, stream)
}

struct BoundedStream<T> {
recv: Receiver<T>,
}
impl<T> Stream for BoundedStream<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
Pin::into_inner(self).recv.poll_recv(cx)
}
}

pub fn channel_stream<T: Unpin>(size: usize) -> (Sender<T>, impl Stream<Item = T>) {
let (tx, mut rx) = mpsc::channel(size);
let (tx, rx) = mpsc::channel(size);

let stream = stream! {
while let Some(item) = rx.recv().await {
yield item;
}
};
let stream = BoundedStream { recv: rx };

(tx, stream)
}