Skip to content

Commit

Permalink
net: Add try_read_buf and try_recv_buf
Browse files Browse the repository at this point in the history
  • Loading branch information
cssivision committed Dec 27, 2020
1 parent 2330edc commit a78f371
Show file tree
Hide file tree
Showing 8 changed files with 782 additions and 1 deletion.
85 changes: 85 additions & 0 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -17,6 +17,8 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use bytes::BufMut;

cfg_net! {
/// A TCP stream between a local and a remote socket.
///
Expand Down Expand Up @@ -559,6 +561,89 @@ impl TcpStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}

/// Try to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
/// the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: TcpStream::readable()
/// [`ready()`]: TcpStream::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
/// and will no longer yield data. If the stream is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::error::Error;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// loop {
/// // Wait for the socket to be readable
/// stream.readable().await?;
///
/// // Creating the buffer **after** the `await` prevents it from
/// // being stored in the async task.
/// let mut buf = Vec::with_capacity(4096);
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_read_buf(&mut buf) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
if !buf.has_remaining_mut() {
return Ok(0);
}

self.io.registration().try_io(Interest::READABLE, || {
use std::io::Read;

let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };

// Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
// buffer.
let n = (&*self.io).read(dst)?;

unsafe {
buf.advance_mut(n);
}

Ok(n)
})
}

/// Wait for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
Expand Down
140 changes: 139 additions & 1 deletion tokio/src/net/udp.rs
Expand Up @@ -7,6 +7,8 @@ use std::io;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::task::{Context, Poll};

use bytes::BufMut;

cfg_net! {
/// A UDP socket
///
Expand Down Expand Up @@ -683,6 +685,77 @@ impl UdpSocket {
.try_io(Interest::READABLE, || self.io.recv(buf))
}

/// Try to receive data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// The function must be called with valid byte array buf of sufficient size
/// to hold the message bytes. If a message is too long to fit in the
/// supplied buffer, excess bytes may be discarded.
///
/// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
/// returned. This function is usually paired with `readable()`.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UdpSocket;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Connect to a peer
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
/// socket.connect("127.0.0.1:8081").await?;
///
/// loop {
/// // Wait for the socket to be readable
/// socket.readable().await?;
///
/// // The buffer is **not** included in the async task and will
/// // only exist on the stack.
/// let mut buf = Vec::with_capacity(1024);
///
/// // Try to recv data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match socket.try_recv_from(&mut buf) {
/// Ok(n) => {
/// println!("GOT {:?}", &buf[..n]);
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e);
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
if !buf.has_remaining_mut() {
return Ok(0);
}

self.io.registration().try_io(Interest::READABLE, || {
let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };

// Safety: We trust `UdpSocket::recv` to have filled up `n` bytes in the
// buffer.
let n = (&*self.io).recv(dst)?;

unsafe {
buf.advance_mut(n);
}

Ok(n)
})
}

/// Sends data on the socket to the given address. On success, returns the
/// number of bytes written.
///
Expand Down Expand Up @@ -904,7 +977,6 @@ impl UdpSocket {
/// async fn main() -> io::Result<()> {
/// // Connect to a peer
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
/// socket.connect("127.0.0.1:8081").await?;
///
/// loop {
/// // Wait for the socket to be readable
Expand Down Expand Up @@ -939,6 +1011,72 @@ impl UdpSocket {
.try_io(Interest::READABLE, || self.io.recv_from(buf))
}

/// Try to receive a single datagram message on the socket. On success,
/// returns the number of bytes read and the origin.
///
/// The function must be called with valid byte array buf of sufficient size
/// to hold the message bytes. If a message is too long to fit in the
/// supplied buffer, excess bytes may be discarded.
///
/// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
/// returned. This function is usually paired with `readable()`.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UdpSocket;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Connect to a peer
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
///
/// loop {
/// // Wait for the socket to be readable
/// socket.readable().await?;
///
/// // The buffer is **not** included in the async task and will
/// // only exist on the stack.
/// let mut buf = Vec::with_capacity(1024);
///
/// // Try to recv data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match socket.try_recv_from(&mut buf) {
/// Ok((n, _addr)) => {
/// println!("GOT {:?}", &buf[..n]);
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e);
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
self.io.registration().try_io(Interest::READABLE, || {
let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };

// Safety: We trust `UdpSocket::recv_from` to have filled up `n` bytes in the
// buffer.
let (n, addr) = (&*self.io).recv_from(dst)?;

unsafe {
buf.advance_mut(n);
}

Ok((n, addr))
})
}

/// Receives data from the socket, without removing it from the input queue.
/// On success, returns the number of bytes read and the address from whence
/// the data came.
Expand Down

0 comments on commit a78f371

Please sign in to comment.