Skip to content

Commit

Permalink
uds: adds poll_{recv,send}_ready methods
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Gautier <baloo@superbaloo.net>
  • Loading branch information
baloo committed Sep 24, 2021
1 parent 1e820d3 commit 5291bc6
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
66 changes: 66 additions & 0 deletions tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -226,6 +226,39 @@ impl UnixDatagram {
Ok(())
}

/// Polls for write/send readiness.
///
/// If the socket is not currently ready for sending, this method will
/// store a clone of the `Waker` from the provided `Context`. When the socket
/// becomes ready for sending, `Waker::wake` will be called on the
/// waker.
///
/// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
/// the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
/// second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`writable`] is not feasible. Where possible, using [`writable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready for writing.
/// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`writable`]: method@Self::writable
pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}

/// Wait for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
Expand Down Expand Up @@ -289,6 +322,39 @@ impl UnixDatagram {
Ok(())
}

/// Polls for read/receive readiness.
///
/// If the socket is not currently ready for receiving, this method will
/// store a clone of the `Waker` from the provided `Context`. When the
/// socket becomes ready for reading, `Waker::wake` will be called on the
/// waker.
///
/// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
/// `poll_peek`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup. (However,
/// `poll_send_ready` retains a second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`readable`] is not feasible. Where possible, using [`readable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready for reading.
/// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`readable`]: method@Self::readable
pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}

/// Creates a new `UnixDatagram` bound to the specified path.
///
/// # Examples
Expand Down
47 changes: 47 additions & 0 deletions tokio/tests/uds_datagram.rs
Expand Up @@ -328,3 +328,50 @@ async fn try_recv_buf_never_block() -> io::Result<()> {

Ok(())
}

#[tokio::test]
async fn poll_ready() -> io::Result<()> {
let dir = tempfile::tempdir().unwrap();
let server_path = dir.path().join("server.sock");
let client_path = dir.path().join("client.sock");

// Create listener
let server = UnixDatagram::bind(&server_path)?;

// Create socket pair
let client = UnixDatagram::bind(&client_path)?;

for _ in 0..5 {
loop {
poll_fn(|cx| client.poll_send_ready(cx)).await?;

match client.try_send_to(b"hello world", &server_path) {
Ok(n) => {
assert_eq!(n, 11);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}

loop {
poll_fn(|cx| server.poll_recv_ready(cx)).await?;

let mut buf = Vec::with_capacity(512);

match server.try_recv_buf_from(&mut buf) {
Ok((n, addr)) => {
assert_eq!(n, 11);
assert_eq!(addr.as_pathname(), Some(client_path.as_ref()));
assert_eq!(&buf[0..11], &b"hello world"[..]);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
}

Ok(())
}

0 comments on commit 5291bc6

Please sign in to comment.