Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: add {Tcp, Unix}Stream::try_{read, write}_vectored #3761

Merged
merged 1 commit into from May 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
140 changes: 140 additions & 0 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -563,6 +563,84 @@ impl TcpStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}

/// Try to read data from the stream into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
/// written to possibly being only partially filled. This method behaves
/// equivalently to a single call to [`try_read()`] with concatenated
/// buffers.
///
/// Receives any pending data from the socket but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_vectored()` is non-blocking, the buffer does not have to be
/// stored by the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`try_read()`]: TcpStream::try_read()
/// [`readable()`]: TcpStream::readable()
/// [`ready()`]: TcpStream::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
/// and will no longer yield data. If the stream is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::error::Error;
/// use std::io::{self, IoSliceMut};
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// loop {
/// // Wait for the socket to be readable
/// stream.readable().await?;
///
/// // Creating the buffer **after** the `await` prevents it from
/// // being stored in the async task.
/// let mut buf_a = [0; 512];
/// let mut buf_b = [0; 1024];
/// let mut bufs = [
/// IoSliceMut::new(&mut buf_a),
/// IoSliceMut::new(&mut buf_b),
/// ];
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_read_vectored(&mut bufs) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
use std::io::Read;

self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}

cfg_io_util! {
/// Try to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
Expand Down Expand Up @@ -775,6 +853,68 @@ impl TcpStream {
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}

/// Try to write several buffers to the stream, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
/// from possible being only partially consumed. This method behaves
/// equivalently to a single call to [`try_write()`] with concatenated
/// buffers.
///
/// This function is usually paired with `writable()`.
///
/// [`try_write()`]: TcpStream::try_write()
///
/// # Return
///
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
/// number of bytes written. If the stream is not ready to write data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::error::Error;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
///
/// loop {
/// // Wait for the socket to be writable
/// stream.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_write_vectored(&bufs) {
/// Ok(n) => {
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
use std::io::Write;

self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
}

/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
Expand Down
140 changes: 140 additions & 0 deletions tokio/src/net/unix/stream.rs
Expand Up @@ -271,6 +271,84 @@ impl UnixStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}

/// Try to read data from the stream into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
/// written to possibly being only partially filled. This method behaves
/// equivalently to a single call to [`try_read()`] with concatenated
/// buffers.
///
/// Receives any pending data from the socket but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_vectored()` is non-blocking, the buffer does not have to be
/// stored by the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`try_read()`]: UnixStream::try_read()
/// [`readable()`]: UnixStream::readable()
/// [`ready()`]: UnixStream::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
/// and will no longer yield data. If the stream is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UnixStream;
/// use std::error::Error;
/// use std::io::{self, IoSliceMut};
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let dir = tempfile::tempdir().unwrap();
/// let bind_path = dir.path().join("bind_path");
/// let stream = UnixStream::connect(bind_path).await?;
///
/// loop {
/// // Wait for the socket to be readable
/// stream.readable().await?;
///
/// // Creating the buffer **after** the `await` prevents it from
/// // being stored in the async task.
/// let mut buf_a = [0; 512];
/// let mut buf_b = [0; 1024];
/// let mut bufs = [
/// IoSliceMut::new(&mut buf_a),
/// IoSliceMut::new(&mut buf_b),
/// ];
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_read_vectored(&mut bufs) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}

cfg_io_util! {
/// Try to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
Expand Down Expand Up @@ -487,6 +565,68 @@ impl UnixStream {
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}

/// Try to write several buffers to the stream, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
/// from possible being only partially consumed. This method behaves
/// equivalently to a single call to [`try_write()`] with concatenated
/// buffers.
///
/// This function is usually paired with `writable()`.
///
/// [`try_write()`]: UnixStream::try_write()
///
/// # Return
///
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
/// number of bytes written. If the stream is not ready to write data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UnixStream;
/// use std::error::Error;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let dir = tempfile::tempdir().unwrap();
/// let bind_path = dir.path().join("bind_path");
/// let stream = UnixStream::connect(bind_path).await?;
///
/// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
///
/// loop {
/// // Wait for the socket to be writable
/// stream.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_write_vectored(&bufs) {
/// Ok(n) => {
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}

/// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
///
/// This function is intended to be used to wrap a UnixStream from the
Expand Down
49 changes: 47 additions & 2 deletions tokio/tests/tcp_stream.rs
Expand Up @@ -55,7 +55,7 @@ async fn try_read_write() {
tokio::task::yield_now().await;
}

// Fill the write buffer
// Fill the write buffer using non-vectored I/O
loop {
// Still ready
let mut writable = task::spawn(client.writable());
Expand All @@ -75,7 +75,7 @@ async fn try_read_write() {
let mut writable = task::spawn(client.writable());
assert_pending!(writable.poll());

// Drain the socket from the server end
// Drain the socket from the server end using non-vectored I/O
let mut read = vec![0; written.len()];
let mut i = 0;

Expand All @@ -92,6 +92,51 @@ async fn try_read_write() {
assert_eq!(read, written);
}

written.clear();
client.writable().await.unwrap();

// Fill the write buffer using vectored I/O
let data_bufs: Vec<_> = DATA.chunks(10).map(io::IoSlice::new).collect();
loop {
// Still ready
let mut writable = task::spawn(client.writable());
assert_ready_ok!(writable.poll());

match client.try_write_vectored(&data_bufs) {
Ok(n) => written.extend(&DATA[..n]),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
break;
}
Err(e) => panic!("error = {:?}", e),
}
}

{
// Write buffer full
let mut writable = task::spawn(client.writable());
assert_pending!(writable.poll());

// Drain the socket from the server end using vectored I/O
let mut read = vec![0; written.len()];
let mut i = 0;

while i < read.len() {
server.readable().await.unwrap();

let mut bufs: Vec<_> = read[i..]
.chunks_mut(0x10000)
.map(io::IoSliceMut::new)
.collect();
match server.try_read_vectored(&mut bufs) {
Ok(n) => i += n,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("error = {:?}", e),
}
}

assert_eq!(read, written);
}

// Now, we listen for shutdown
drop(client);

Expand Down