Skip to content

Commit

Permalink
net: add try_*, readable, writable, ready, and peer_addr me…
Browse files Browse the repository at this point in the history
…thods to split halves (tokio-rs#4120)
  • Loading branch information
Noah-Kennedy authored and suikammd committed Oct 7, 2021
1 parent cb646b1 commit 4a9cf71
Show file tree
Hide file tree
Showing 5 changed files with 859 additions and 6 deletions.
213 changes: 211 additions & 2 deletions tokio/src/net/tcp/split.rs
Expand Up @@ -9,14 +9,18 @@
//! level.

use crate::future::poll_fn;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
use crate::net::TcpStream;

use std::io;
use std::net::Shutdown;
use std::net::{Shutdown, SocketAddr};
use std::pin::Pin;
use std::task::{Context, Poll};

cfg_io_util! {
use bytes::BufMut;
}

/// Borrowed read half of a [`TcpStream`], created by [`split`].
///
/// Reading from a `ReadHalf` is usually done using the convenience methods found on the
Expand Down Expand Up @@ -134,6 +138,211 @@ impl ReadHalf<'_> {
let mut buf = ReadBuf::new(buf);
poll_fn(|cx| self.poll_peek(cx, &mut buf)).await
}

/// Wait for any of the requested ready states.
///
/// This function is usually paired with `try_read()` or `try_write()`. It
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// This function is equivalent to [`TcpStream::ready`].
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
/// will continue to return immediately until the readiness event is
/// consumed by an attempt to read or write that fails with `WouldBlock` or
/// `Poll::Pending`.
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
self.0.ready(interest).await
}

/// Wait for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_read()`.
///
/// This function is also equivalent to [`TcpStream::ready`].
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
/// will continue to return immediately until the readiness event is
/// consumed by an attempt to read that fails with `WouldBlock` or
/// `Poll::Pending`.
pub async fn readable(&self) -> io::Result<()> {
self.0.readable().await
}

/// Try to read data from the stream into the provided buffer, returning how
/// many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read()` is non-blocking, the buffer does not have to be stored by
/// the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: Self::readable()
/// [`ready()`]: Self::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
/// and will no longer yield data. If the stream is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.0.try_read(buf)
}

/// Try to read data from the stream into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
/// written to possibly being only partially filled. This method behaves
/// equivalently to a single call to [`try_read()`] with concatenated
/// buffers.
///
/// Receives any pending data from the socket but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_vectored()` is non-blocking, the buffer does not have to be
/// stored by the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`try_read()`]: Self::try_read()
/// [`readable()`]: Self::readable()
/// [`ready()`]: Self::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
/// and will no longer yield data. If the stream is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
self.0.try_read_vectored(bufs)
}

cfg_io_util! {
/// Try to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
/// the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: Self::readable()
/// [`ready()`]: Self::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
/// and will no longer yield data. If the stream is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.0.try_read_buf(buf)
}
}

/// Returns the remote address that this stream is connected to.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.0.peer_addr()
}

/// Returns the local address that this stream is bound to.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.0.local_addr()
}
}

impl WriteHalf<'_> {
/// Wait for any of the requested ready states.
///
/// This function is usually paired with `try_read()` or `try_write()`. It
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// This function is equivalent to [`TcpStream::ready`].
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
/// will continue to return immediately until the readiness event is
/// consumed by an attempt to read or write that fails with `WouldBlock` or
/// `Poll::Pending`.
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
self.0.ready(interest).await
}

/// Wait for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
/// paired with `try_write()`.
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
/// will continue to return immediately until the readiness event is
/// consumed by an attempt to write that fails with `WouldBlock` or
/// `Poll::Pending`.
pub async fn writable(&self) -> io::Result<()> {
self.0.writable().await
}

/// Try to write a buffer to the stream, returning how many bytes were
/// written.
///
/// The function will attempt to write the entire contents of `buf`, but
/// only part of the buffer may be written.
///
/// This function is usually paired with `writable()`.
///
/// # Return
///
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
/// number of bytes written. If the stream is not ready to write data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
self.0.try_write(buf)
}

/// Try to write several buffers to the stream, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
/// from possible being only partially consumed. This method behaves
/// equivalently to a single call to [`try_write()`] with concatenated
/// buffers.
///
/// This function is usually paired with `writable()`.
///
/// [`try_write()`]: Self::try_write()
///
/// # Return
///
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
/// number of bytes written. If the stream is not ready to write data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
self.0.try_write_vectored(bufs)
}

/// Returns the remote address that this stream is connected to.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.0.peer_addr()
}

/// Returns the local address that this stream is bound to.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.0.local_addr()
}
}

impl AsyncRead for ReadHalf<'_> {
Expand Down

0 comments on commit 4a9cf71

Please sign in to comment.