Skip to content

Commit

Permalink
net: add poll_{write,read}_ready methods to udp
Browse files Browse the repository at this point in the history
similar to unix and tcp sockets, this commits brings `poll_read_ready` and
`poll_write_ready` methods.

Signed-off-by: Arthur Gautier <baloo@superbaloo.net>
  • Loading branch information
baloo committed Sep 23, 2021
1 parent 7ce8f05 commit d67a388
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 0 deletions.
66 changes: 66 additions & 0 deletions tokio/src/net/udp.rs
Expand Up @@ -443,6 +443,39 @@ impl UdpSocket {
Ok(())
}

/// Polls for write readiness.
///
/// If the udp stream is not currently ready for writing, this method will
/// store a clone of the `Waker` from the provided `Context`. When the udp
/// stream becomes ready for writing, `Waker::wake` will be called on the
/// waker.
///
/// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
/// the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
/// second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`writable`] is not feasible. Where possible, using [`writable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the udp stream is not ready for writing.
/// * `Poll::Ready(Ok(()))` if the udp stream is ready for writing.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`writable`]: method@Self::writable
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}

/// Sends data on the socket to the remote address that the socket is
/// connected to.
///
Expand Down Expand Up @@ -630,6 +663,39 @@ impl UdpSocket {
Ok(())
}

/// Polls for read readiness.
///
/// If the udp stream is not currently ready for reading, this method will
/// store a clone of the `Waker` from the provided `Context`. When the udp
/// socket becomes ready for reading, `Waker::wake` will be called on the
/// waker.
///
/// Note that on multiple calls to `poll_read_ready`, `poll_read` or
/// `poll_peek`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup. (However,
/// `poll_write_ready` retains a second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`readable`] is not feasible. Where possible, using [`readable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the udp stream is not ready for reading.
/// * `Poll::Ready(Ok(()))` if the udp stream is ready for reading.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`readable`]: method@Self::readable
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}

/// Receives a single datagram message on the socket from the remote address
/// to which it is connected. On success, returns the number of bytes read.
///
Expand Down
44 changes: 44 additions & 0 deletions tokio/tests/udp.rs
Expand Up @@ -5,6 +5,7 @@ use futures::future::poll_fn;
use std::io;
use std::sync::Arc;
use tokio::{io::ReadBuf, net::UdpSocket};
use tokio_test::assert_ok;

const MSG: &[u8] = b"hello";
const MSG_LEN: usize = MSG.len();
Expand Down Expand Up @@ -440,3 +441,46 @@ async fn try_recv_buf_from() {
}
}
}

#[tokio::test]
async fn poll_ready() {
// Create listener
let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let saddr = server.local_addr().unwrap();

// Create socket pair
let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let caddr = client.local_addr().unwrap();

for _ in 0..5 {
loop {
assert_ok!(poll_fn(|cx| client.poll_write_ready(cx)).await);

match client.try_send_to(b"hello world", saddr) {
Ok(n) => {
assert_eq!(n, 11);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}

loop {
assert_ok!(poll_fn(|cx| server.poll_read_ready(cx)).await);

let mut buf = Vec::with_capacity(512);

match server.try_recv_buf_from(&mut buf) {
Ok((n, addr)) => {
assert_eq!(n, 11);
assert_eq!(addr, caddr);
assert_eq!(&buf[0..11], &b"hello world"[..]);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
}
}

0 comments on commit d67a388

Please sign in to comment.