Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: add poll_{recv,send}_ready methods to udp/uds_datagram #4131

Merged
merged 2 commits into from Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 66 additions & 0 deletions tokio/src/net/udp.rs
Expand Up @@ -443,6 +443,39 @@ impl UdpSocket {
Ok(())
}

/// Polls for write/send readiness.
///
/// If the udp stream is not currently ready for sending, this method will
/// store a clone of the `Waker` from the provided `Context`. When the udp
/// stream becomes ready for sending, `Waker::wake` will be called on the
/// waker.
///
/// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
/// the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
/// second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`writable`] is not feasible. Where possible, using [`writable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the udp stream is not ready for writing.
/// * `Poll::Ready(Ok(()))` if the udp stream is ready for writing.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`writable`]: method@Self::writable
pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}

/// Sends data on the socket to the remote address that the socket is
/// connected to.
///
Expand Down Expand Up @@ -630,6 +663,39 @@ impl UdpSocket {
Ok(())
}

/// Polls for read/receive readiness.
///
/// If the udp stream is not currently ready for receiving, this method will
/// store a clone of the `Waker` from the provided `Context`. When the udp
/// socket becomes ready for reading, `Waker::wake` will be called on the
/// waker.
///
/// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
/// `poll_peek`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup. (However,
/// `poll_send_ready` retains a second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`readable`] is not feasible. Where possible, using [`readable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the udp stream is not ready for reading.
/// * `Poll::Ready(Ok(()))` if the udp stream is ready for reading.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`readable`]: method@Self::readable
pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}

/// Receives a single datagram message on the socket from the remote address
/// to which it is connected. On success, returns the number of bytes read.
///
Expand Down
66 changes: 66 additions & 0 deletions tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -226,6 +226,39 @@ impl UnixDatagram {
Ok(())
}

/// Polls for write/send readiness.
///
/// If the socket is not currently ready for sending, this method will
/// store a clone of the `Waker` from the provided `Context`. When the socket
/// becomes ready for sending, `Waker::wake` will be called on the
/// waker.
///
/// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
/// the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
/// second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`writable`] is not feasible. Where possible, using [`writable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready for writing.
/// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`writable`]: method@Self::writable
pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}

/// Wait for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
Expand Down Expand Up @@ -289,6 +322,39 @@ impl UnixDatagram {
Ok(())
}

/// Polls for read/receive readiness.
///
/// If the socket is not currently ready for receiving, this method will
/// store a clone of the `Waker` from the provided `Context`. When the
/// socket becomes ready for reading, `Waker::wake` will be called on the
/// waker.
///
/// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
/// `poll_peek`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup. (However,
/// `poll_send_ready` retains a second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`readable`] is not feasible. Where possible, using [`readable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready for reading.
/// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`readable`]: method@Self::readable
pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}

/// Creates a new `UnixDatagram` bound to the specified path.
///
/// # Examples
Expand Down
44 changes: 44 additions & 0 deletions tokio/tests/udp.rs
Expand Up @@ -5,6 +5,7 @@ use futures::future::poll_fn;
use std::io;
use std::sync::Arc;
use tokio::{io::ReadBuf, net::UdpSocket};
use tokio_test::assert_ok;

const MSG: &[u8] = b"hello";
const MSG_LEN: usize = MSG.len();
Expand Down Expand Up @@ -440,3 +441,46 @@ async fn try_recv_buf_from() {
}
}
}

#[tokio::test]
async fn poll_ready() {
// Create listener
let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let saddr = server.local_addr().unwrap();

// Create socket pair
let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let caddr = client.local_addr().unwrap();

for _ in 0..5 {
loop {
assert_ok!(poll_fn(|cx| client.poll_send_ready(cx)).await);

match client.try_send_to(b"hello world", saddr) {
Ok(n) => {
assert_eq!(n, 11);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}

loop {
assert_ok!(poll_fn(|cx| server.poll_recv_ready(cx)).await);

let mut buf = Vec::with_capacity(512);

match server.try_recv_buf_from(&mut buf) {
Ok((n, addr)) => {
assert_eq!(n, 11);
assert_eq!(addr, caddr);
assert_eq!(&buf[0..11], &b"hello world"[..]);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
}
}
47 changes: 47 additions & 0 deletions tokio/tests/uds_datagram.rs
Expand Up @@ -328,3 +328,50 @@ async fn try_recv_buf_never_block() -> io::Result<()> {

Ok(())
}

#[tokio::test]
async fn poll_ready() -> io::Result<()> {
let dir = tempfile::tempdir().unwrap();
let server_path = dir.path().join("server.sock");
let client_path = dir.path().join("client.sock");

// Create listener
let server = UnixDatagram::bind(&server_path)?;

// Create socket pair
let client = UnixDatagram::bind(&client_path)?;

for _ in 0..5 {
loop {
poll_fn(|cx| client.poll_send_ready(cx)).await?;

match client.try_send_to(b"hello world", &server_path) {
Ok(n) => {
assert_eq!(n, 11);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}

loop {
poll_fn(|cx| server.poll_recv_ready(cx)).await?;

let mut buf = Vec::with_capacity(512);

match server.try_recv_buf_from(&mut buf) {
Ok((n, addr)) => {
assert_eq!(n, 11);
assert_eq!(addr.as_pathname(), Some(client_path.as_ref()));
assert_eq!(&buf[0..11], &b"hello world"[..]);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
}

Ok(())
}