diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index dc7e04d0d27..99c39386ad6 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1,4 +1,4 @@ -use crate::io::{Interest, PollEvented, ReadBuf}; +use crate::io::{Interest, PollEvented, ReadBuf, Ready}; use crate::net::unix::SocketAddr; use std::convert::TryFrom; @@ -84,6 +84,131 @@ cfg_net_unix! { } impl UnixDatagram { + /// Wait for any of the requested ready states. + /// + /// This function is usually paired with `try_recv()` or `try_send()`. It + /// can be used to concurrently recv / send to the same socket on a single + /// task without splitting the socket. + /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. + /// + /// # Examples + /// + /// Concurrently receive from and send to the socket on the same task + /// without splitting. + pub async fn ready(&self, interest: Interest) -> io::Result { + let event = self.io.registration().readiness(interest).await?; + Ok(event.ready) + } + + /// Wait for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is + /// usually paired with `try_send()` or `try_send_to()`. + /// + /// The function may complete without the socket being writable. This is a + /// false-positive and attempting a `try_send()` will return with + /// `io::ErrorKind::WouldBlock`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// socket.connect(&server_path)?; + /// + /// loop { + /// // Wait for the socket to be writable + /// socket.writable().await?; + /// + /// // Try to send data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_send(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn writable(&self) -> io::Result<()> { + self.ready(Interest::WRITABLE).await?; + Ok(()) + } + + /// Wait for the socket to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with `try_recv()`. + /// + /// The function may complete without the socket being readable. This is a + /// false-positive and attempting a `try_recv()` will return with + /// `io::ErrorKind::WouldBlock`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// socket.connect(&server_path)?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// // The buffer is **not** included in the async task and will + /// // only exist on the stack. + /// let mut buf = [0; 1024]; + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv(&mut buf) { + /// Ok(n) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn readable(&self) -> io::Result<()> { + self.ready(Interest::READABLE).await?; + Ok(()) + } + /// Creates a new `UnixDatagram` bound to the specified path. /// /// # Examples @@ -310,68 +435,91 @@ impl UnixDatagram { /// Try to send a datagram to the peer without waiting. /// /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { - /// use tokio::net::UnixDatagram; - /// - /// let bytes = b"bytes"; - /// // We use a socket pair so that they are assigned - /// // each other as a peer. - /// let (first, second) = UnixDatagram::pair()?; - /// - /// let size = first.try_send(bytes)?; - /// assert_eq!(size, bytes.len()); /// - /// let mut buffer = vec![0u8; 24]; - /// let size = second.try_recv(&mut buffer)?; - /// - /// let dgram = &buffer[..size]; - /// assert_eq!(dgram, bytes); - /// # Ok(()) - /// # } + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// socket.connect(&server_path)?; + /// + /// loop { + /// // Wait for the socket to be writable + /// socket.writable().await?; + /// + /// // Try to send data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_send(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } /// ``` pub fn try_send(&self, buf: &[u8]) -> io::Result { - self.io.send(buf) + self.io + .registration() + .try_io(Interest::WRITABLE, || self.io.send(buf)) } /// Try to send a datagram to the peer without waiting. /// /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { - /// use tokio::net::UnixDatagram; - /// use tempfile::tempdir; /// - /// let bytes = b"bytes"; - /// // We use a temporary directory so that the socket - /// // files left by the bound sockets will get cleaned up. - /// let tmp = tempdir().unwrap(); - /// - /// let server_path = tmp.path().join("server"); - /// let server = UnixDatagram::bind(&server_path)?; - /// - /// let client_path = tmp.path().join("client"); - /// let client = UnixDatagram::bind(&client_path)?; - /// - /// let size = client.try_send_to(bytes, &server_path)?; - /// assert_eq!(size, bytes.len()); - /// - /// let mut buffer = vec![0u8; 24]; - /// let (size, addr) = server.try_recv_from(&mut buffer)?; - /// - /// let dgram = &buffer[..size]; - /// assert_eq!(dgram, bytes); - /// assert_eq!(addr.as_pathname().unwrap(), &client_path); - /// # Ok(()) - /// # } + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// + /// loop { + /// // Wait for the socket to be writable + /// socket.writable().await?; + /// + /// // Try to send data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_send_to(b"hello world", &server_path) { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } /// ``` pub fn try_send_to

(&self, buf: &[u8], target: P) -> io::Result where P: AsRef, { - self.io.send_to(buf, target) + self.io + .registration() + .try_io(Interest::WRITABLE, || self.io.send_to(buf, target)) } /// Receives data from the socket. @@ -410,29 +558,51 @@ impl UnixDatagram { /// Try to receive a datagram from the peer without waiting. /// /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { - /// use tokio::net::UnixDatagram; - /// - /// let bytes = b"bytes"; - /// // We use a socket pair so that they are assigned - /// // each other as a peer. - /// let (first, second) = UnixDatagram::pair()?; - /// - /// let size = first.try_send(bytes)?; - /// assert_eq!(size, bytes.len()); - /// - /// let mut buffer = vec![0u8; 24]; - /// let size = second.try_recv(&mut buffer)?; /// - /// let dgram = &buffer[..size]; - /// assert_eq!(dgram, bytes); - /// # Ok(()) - /// # } + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// socket.connect(&server_path)?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// // The buffer is **not** included in the async task and will + /// // only exist on the stack. + /// let mut buf = [0; 1024]; + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv(&mut buf) { + /// Ok(n) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } /// ``` pub fn try_recv(&self, buf: &mut [u8]) -> io::Result { - self.io.recv(buf) + self.io + .registration() + .try_io(Interest::READABLE, || self.io.recv(buf)) } /// Sends data on the socket to the specified address. @@ -664,37 +834,52 @@ impl UnixDatagram { /// Try to receive data from the socket without waiting. /// /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { - /// use tokio::net::UnixDatagram; - /// use tempfile::tempdir; - /// - /// let bytes = b"bytes"; - /// // We use a temporary directory so that the socket - /// // files left by the bound sockets will get cleaned up. - /// let tmp = tempdir().unwrap(); - /// - /// let server_path = tmp.path().join("server"); - /// let server = UnixDatagram::bind(&server_path)?; - /// - /// let client_path = tmp.path().join("client"); - /// let client = UnixDatagram::bind(&client_path)?; /// - /// let size = client.try_send_to(bytes, &server_path)?; - /// assert_eq!(size, bytes.len()); - /// - /// let mut buffer = vec![0u8; 24]; - /// let (size, addr) = server.try_recv_from(&mut buffer)?; - /// - /// let dgram = &buffer[..size]; - /// assert_eq!(dgram, bytes); - /// assert_eq!(addr.as_pathname().unwrap(), &client_path); - /// # Ok(()) - /// # } + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// // The buffer is **not** included in the async task and will + /// // only exist on the stack. + /// let mut buf = [0; 1024]; + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv_from(&mut buf) { + /// Ok((n, _addr)) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } /// ``` pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - let (n, addr) = self.io.recv_from(buf)?; + let (n, addr) = self + .io + .registration() + .try_io(Interest::READABLE, || self.io.recv_from(buf))?; + Ok((n, SocketAddr(addr))) } diff --git a/tokio/tests/uds_datagram.rs b/tokio/tests/uds_datagram.rs index 541bdc51dce..cdabd7b196e 100644 --- a/tokio/tests/uds_datagram.rs +++ b/tokio/tests/uds_datagram.rs @@ -84,6 +84,8 @@ async fn try_send_recv_never_block() -> io::Result<()> { // Send until we hit the OS `net.unix.max_dgram_qlen`. loop { + dgram1.writable().await.unwrap(); + match dgram1.try_send(payload) { Err(err) => match err.kind() { io::ErrorKind::WouldBlock | io::ErrorKind::Other => break, @@ -98,6 +100,7 @@ async fn try_send_recv_never_block() -> io::Result<()> { // Read every dgram we sent. while count > 0 { + dgram2.readable().await.unwrap(); let len = dgram2.try_recv(&mut recv_buf[..])?; assert_eq!(len, payload.len()); assert_eq!(payload, &recv_buf[..len]); @@ -180,3 +183,50 @@ async fn send_recv_poll() -> std::io::Result<()> { assert_eq!(read.filled(), msg); Ok(()) } + +#[tokio::test] +async fn try_send_to_recv_from() -> std::io::Result<()> { + let dir = tempfile::tempdir().unwrap(); + let server_path = dir.path().join("server.sock"); + let client_path = dir.path().join("client.sock"); + + // Create listener + let server = UnixDatagram::bind(&server_path)?; + + // Create socket pair + let client = UnixDatagram::bind(&client_path)?; + + for _ in 0..5 { + loop { + client.writable().await?; + + match client.try_send_to(b"hello world", &server_path) { + Ok(n) => { + assert_eq!(n, 11); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + + loop { + server.readable().await?; + + let mut buf = [0; 512]; + + match server.try_recv_from(&mut buf) { + Ok((n, addr)) => { + assert_eq!(n, 11); + assert_eq!(addr.as_pathname(), Some(client_path.as_ref())); + assert_eq!(&buf[0..11], &b"hello world"[..]); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + } + + Ok(()) +}