Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add try methods to SendHalf and RecvHalf. #2021

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
114 changes: 114 additions & 0 deletions tokio/src/net/unix/datagram.rs
@@ -1,5 +1,6 @@
use crate::future::poll_fn;
use crate::io::PollEvented;
use super::split::{split_dgram, RecvHalf, SendHalf};

use std::convert::TryFrom;
use std::fmt;
Expand Down Expand Up @@ -75,6 +76,90 @@ impl UnixDatagram {
poll_fn(|cx| self.poll_send_priv(cx, buf)).await
}

/// Try to send a datagram to the peer without waiting.
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// use tokio::net::UnixDatagram;
///
/// let bytes = b"bytes";
/// // We use a socket pair so that they are assigned
/// // each other as a peer.
/// let (mut first, mut second) = UnixDatagram::pair()?;
///
/// let size = first.try_send(bytes)?;
/// assert_eq!(size, bytes.len());
///
/// let mut buffer = vec![0u8; 24];
/// let size = second.try_recv(&mut buffer)?;
///
/// let dgram = &buffer.as_slice()[..size];
/// assert_eq!(dgram, bytes);
/// # Ok(())
/// # }
/// ```
pub fn try_send(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.get_ref().send(buf)
}

// Poll IO functions that takes `&self` are provided for the split API.
//
// See `poll_send_priv` for more info.
pub(crate) fn try_send_priv(&self, buf: &[u8]) -> io::Result<usize> {
self.io.get_ref().send(buf)
}

/// Try to send a datagram to the peer without waiting.
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// use {
/// tokio::net::UnixDatagram,
/// tempfile::tempdir,
/// };
///
/// let bytes = b"bytes";
/// // We use a temporary directory so that the socket
/// // files left by the bound sockets will get cleaned up.
/// let tmp = tempdir().unwrap();
///
/// let server_path = tmp.path().join("server");
/// let mut server = UnixDatagram::bind(&server_path)?;
///
/// let client_path = tmp.path().join("client");
/// let mut client = UnixDatagram::bind(&client_path)?;
///
/// let size = client.try_send_to(bytes, &server_path)?;
/// assert_eq!(size, bytes.len());
///
/// let mut buffer = vec![0u8; 24];
/// let (size, addr) = server.try_recv_from(&mut buffer)?;
///
/// let dgram = &buffer.as_slice()[..size];
/// assert_eq!(dgram, bytes);
/// assert_eq!(addr.as_pathname().unwrap(), &client_path);
/// # Ok(())
/// # }
/// ```
pub fn try_send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
where
P: AsRef<Path>
{
self.io.get_ref().send_to(buf, target)
}

// Poll IO functions that takes `&self` are provided for the split API.
//
// See `poll_send_priv` for more info.
pub(crate) fn try_send_to_priv<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
where
P: AsRef<Path>
{
self.io.get_ref().send_to(buf, target)
}

// Poll IO functions that takes `&self` are provided for the split API.
//
// They are not public because (taken from the doc of `PollEvented`):
Expand Down Expand Up @@ -106,6 +191,18 @@ impl UnixDatagram {
poll_fn(|cx| self.poll_recv_priv(cx, buf)).await
}

/// Try to receive a datagram from the peer without waiting.
pub fn try_recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.get_ref().recv(buf)
}

// Poll IO functions that takes `&self` are provided for the split API.
//
// See `poll_send_priv` for more info.
pub(crate) fn try_recv_priv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io.get_ref().recv(buf)
}

pub(crate) fn poll_recv_priv(
&self,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -152,6 +249,18 @@ impl UnixDatagram {
poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await
}

/// Try to receive data from the socket without waiting.
pub fn try_recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.io.get_ref().recv_from(buf)
}

// Poll IO functions that takes `&self` are provided for the split API.
//
// See `poll_send_priv` for more info.
pub(crate) fn try_recv_from_priv(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.io.get_ref().recv_from(buf)
}

pub(crate) fn poll_recv_from_priv(
&self,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -193,6 +302,11 @@ impl UnixDatagram {
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.io.get_ref().shutdown(how)
}

/// Splits the socket into a `RecvHalf` and `SendHalf`.
pub fn split(&mut self) -> (RecvHalf<'_>, SendHalf<'_>) {
split_dgram(self)
}
}

impl TryFrom<UnixDatagram> for mio_uds::UnixDatagram {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/unix/mod.rs
Expand Up @@ -9,7 +9,7 @@ pub(crate) mod listener;
pub(crate) use listener::UnixListener;

mod split;
pub use split::{ReadHalf, WriteHalf};
pub use split::{ReadHalf, WriteHalf, RecvHalf, SendHalf};

pub(crate) mod stream;
pub(crate) use stream::UnixStream;
Expand Down
82 changes: 79 additions & 3 deletions tokio/src/net/unix/split.rs
@@ -1,4 +1,6 @@
//! `UnixStream` split support.
//! `UnixStream` and `UnixDatagram` split support.
//!
//! ## UnixStream
//!
//! A `UnixStream` can be split into a read half and a write half with
//! `UnixStream::split`. The read half implements `AsyncRead` while the write
Expand All @@ -7,13 +9,25 @@
//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
//! split has no associated overhead and enforces all invariants at the type
//! level.
//!
//! ## UnixDatagram
//!
//! A `UnixDatagram` can be split into a receive half and a send half with
//! `UnixDatagram::split`. The send half implements `send` and `send_to` and
//! the receiving one implements `recv` and `recv_from`.
//!
//! This split method has no overhead and enforces all invariants at the type
//! level.

use crate::io::{AsyncRead, AsyncWrite};
use crate::net::UnixStream;
use crate::net::{UnixDatagram, UnixStream};
use crate::future::poll_fn;

use std::io;
use std::mem::MaybeUninit;
use std::net::Shutdown;
use std::os::unix::net::SocketAddr;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};

Expand All @@ -25,10 +39,22 @@ pub struct ReadHalf<'a>(&'a UnixStream);
#[derive(Debug)]
pub struct WriteHalf<'a>(&'a UnixStream);

pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
/// Receiving half of a `UnixDatagram`.
#[derive(Debug)]
pub struct RecvHalf<'a>(&'a UnixDatagram);

/// Sending half of a `UnixDatagram`.
#[derive(Debug)]
pub struct SendHalf<'a>(&'a UnixDatagram);

pub(crate) fn split_stream(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
(ReadHalf(stream), WriteHalf(stream))
}

pub(crate) fn split_dgram(dgram: &mut UnixDatagram) -> (RecvHalf<'_>, SendHalf<'_>) {
(RecvHalf(dgram), SendHalf(dgram))
}

impl AsyncRead for ReadHalf<'_> {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
false
Expand Down Expand Up @@ -72,3 +98,53 @@ impl AsRef<UnixStream> for WriteHalf<'_> {
self.0
}
}

impl RecvHalf<'_> {
/// Receives a datagram from the socket.
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| self.0.poll_recv_priv(cx, buf)).await
}

/// Try to receive a datagram from the peer without waiting.
pub fn try_recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.try_recv_priv(buf)
}

/// Receives a datagram with the source address from the socket.
pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
poll_fn(|cx| self.0.poll_recv_from_priv(cx, buf)).await
}

/// Try to receive data from the socket without waiting.
pub fn try_recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.0.try_recv_from_priv(buf)
}
}

impl SendHalf<'_> {
/// Sends a datagram to the socket's peer.
pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
poll_fn(|cx| self.0.poll_send_priv(cx, buf)).await
}

/// Try to send a datagram to the peer without waiting.
pub fn try_send(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.try_send_priv(buf)
}

/// Sends a datagram to the specified address.
pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
where
P: AsRef<Path> + Unpin,
{
poll_fn(|cx| self.0.poll_send_to_priv(cx, buf, target.as_ref())).await
}

/// Try to send a datagram to the peer without waiting.
pub fn try_send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
where
P: AsRef<Path>,
{
self.0.try_send_to_priv(buf, target)
}
}
4 changes: 2 additions & 2 deletions tokio/src/net/unix/stream.rs
@@ -1,6 +1,6 @@
use crate::future::poll_fn;
use crate::io::{AsyncRead, AsyncWrite, PollEvented};
use crate::net::unix::split::{split, ReadHalf, WriteHalf};
use crate::net::unix::split::{split_stream, ReadHalf, WriteHalf};
use crate::net::unix::ucred::{self, UCred};

use std::convert::TryFrom;
Expand Down Expand Up @@ -107,7 +107,7 @@ impl UnixStream {
/// See the module level documenation of [`split`](super::split) for more
/// details.
pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) {
split(self)
split_stream(self)
}
}

Expand Down
40 changes: 40 additions & 0 deletions tokio/tests/uds_datagram.rs
Expand Up @@ -41,3 +41,43 @@ async fn echo() -> io::Result<()> {

Ok(())
}

// Even though we use sync non-blocking io we still need a reactor.
#[tokio::test]
async fn try_send_recv_never_block() -> io::Result<()> {
let mut recv_buf = [0u8; 16];
let payload = b"PAYLOAD";
let mut count = 0;

let (mut dgram1, mut dgram2) = UnixDatagram::pair()?;

// Send until we hit the OS `net.unix.max_dgram_qlen`.
loop {
match dgram1.try_send(payload) {
Err(err) => match err.kind() {
io::ErrorKind::WouldBlock | io::ErrorKind::Other => break,
_ => unreachable!("unexpected error {:?}", err),
},
Ok(len) => {
assert_eq!(len, payload.len());
}
}
count += 1;
}

// Read every dgram we sent.
while count > 0 {
let len = dgram2.try_recv(&mut recv_buf[..])?;
assert_eq!(len, payload.len());
assert_eq!(payload, &recv_buf[..len]);
count -= 1;
}

let err = dgram2.try_recv(&mut recv_buf[..]).unwrap_err();
match err.kind() {
io::ErrorKind::WouldBlock => (),
_ => unreachable!("unexpected error {:?}", err),
}

Ok(())
}