Skip to content

Commit

Permalink
net: expose poll_* methods on UnixDatagram
Browse files Browse the repository at this point in the history
  • Loading branch information
cssivision committed Dec 7, 2020
1 parent 0707f4c commit cd13b95
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 1 deletion.
143 changes: 142 additions & 1 deletion tokio/src/net/unix/datagram/socket.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::io::{Interest, PollEvented};
use crate::io::{Interest, PollEvented, ReadBuf};
use crate::net::unix::SocketAddr;

use std::convert::TryFrom;
Expand All @@ -8,6 +8,7 @@ use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net;
use std::path::Path;
use std::task::{Context, Poll};

cfg_net_unix! {
/// An I/O object representing a Unix datagram socket.
Expand Down Expand Up @@ -520,6 +521,146 @@ impl UnixDatagram {
Ok((n, SocketAddr(addr)))
}

/// Attempts to receive a single datagram on the specified address.
///
/// Note that on multiple calls to a `poll_*` method in the recv direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready to read
/// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
pub fn poll_recv_from(
&self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<SocketAddr>> {
let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
// Safety: will not read the maybe uinitialized bytes.
let b = unsafe {
&mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
};

self.io.recv_from(b)
}))?;

// Safety: We trust `recv` to have filled up `n` bytes in the buffer.
unsafe {
buf.assume_init(n);
}
buf.advance(n);
Poll::Ready(Ok(SocketAddr(addr)))
}

/// Attempts to send data to the specified address.
///
/// Note that on multiple calls to a `poll_*` method in the send direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready to write
/// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
pub fn poll_send_to<P>(
&self,
cx: &mut Context<'_>,
buf: &[u8],
target: P,
) -> Poll<io::Result<usize>>
where
P: AsRef<Path>,
{
self.io
.registration()
.poll_write_io(cx, || self.io.send_to(buf, target.as_ref()))
}

/// Attempts to send data on the socket to the remote address to which it
/// was previously `connect`ed.
///
/// The [`connect`] method will connect this socket to a remote address.
/// This method will fail if the socket is not connected.
///
/// Note that on multiple calls to a `poll_*` method in the send direction,
/// only the `Waker` from the `Context` passed to the most recent call will
/// be scheduled to receive a wakeup.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not available to write
/// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`connect`]: method@Self::connect
pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
self.io
.registration()
.poll_write_io(cx, || self.io.send(buf))
}

/// Attempts to receive a single datagram message on the socket from the remote
/// address to which it is `connect`ed.
///
/// The [`connect`] method will connect this socket to a remote address. This method
/// resolves to an error if the socket is not connected.
///
/// Note that on multiple calls to a `poll_*` method in the recv direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready to read
/// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`connect`]: method@Self::connect
pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
let n = ready!(self.io.registration().poll_read_io(cx, || {
// Safety: will not read the maybe uinitialized bytes.
let b = unsafe {
&mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
};

self.io.recv(b)
}))?;

// Safety: We trust `recv` to have filled up `n` bytes in the buffer.
unsafe {
buf.assume_init(n);
}
buf.advance(n);
Poll::Ready(Ok(()))
}

/// Try to receive data from the socket without waiting.
///
/// # Examples
Expand Down
46 changes: 46 additions & 0 deletions tokio/tests/uds_datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#![cfg(feature = "full")]
#![cfg(unix)]

use futures::future::poll_fn;
use tokio::io::ReadBuf;
use tokio::net::UnixDatagram;
use tokio::try_join;

Expand Down Expand Up @@ -134,3 +136,47 @@ async fn split() -> std::io::Result<()> {

Ok(())
}

#[tokio::test]
async fn send_to_recv_from_poll() -> std::io::Result<()> {
let dir = tempfile::tempdir().unwrap();
let sender_path = dir.path().join("sender.sock");
let receiver_path = dir.path().join("receiver.sock");

let sender = UnixDatagram::bind(&sender_path)?;
let receiver = UnixDatagram::bind(&receiver_path)?;

let msg = b"hello";
poll_fn(|cx| sender.poll_send_to(cx, msg, &receiver_path)).await?;

let mut recv_buf = [0u8; 32];
let mut read = ReadBuf::new(&mut recv_buf);
let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;

assert_eq!(read.filled(), msg);
assert_eq!(addr.as_pathname(), Some(sender_path.as_ref()));
Ok(())
}

#[tokio::test]
async fn send_recv_poll() -> std::io::Result<()> {
let dir = tempfile::tempdir().unwrap();
let sender_path = dir.path().join("sender.sock");
let receiver_path = dir.path().join("receiver.sock");

let sender = UnixDatagram::bind(&sender_path)?;
let receiver = UnixDatagram::bind(&receiver_path)?;

sender.connect(&receiver_path)?;
receiver.connect(&sender_path)?;

let msg = b"hello";
poll_fn(|cx| sender.poll_send(cx, msg)).await?;

let mut recv_buf = [0u8; 32];
let mut read = ReadBuf::new(&mut recv_buf);
let _len = poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;

assert_eq!(read.filled(), msg);
Ok(())
}

0 comments on commit cd13b95

Please sign in to comment.