Skip to content

Commit

Permalink
Use first nonempty buffer in vectored I/O
Browse files Browse the repository at this point in the history
Previous implementation would use the first buffer even when it was
empty and other nonempty buffers are provided. This could result in
`Ok(0)` return value which has otherwise a special meaning. The new
implementation uses the first nonempty buffer instead to avoid the
issue.

Previous implementation would return early if no buffers are provided.
The new implementation delegates operation using an empty buffer. This
gives an implementation an ability to decide how an empty read / write
operation should behave.

This change aligns the implementation with synchronous version of those
traits.
  • Loading branch information
tmiasko authored and cramertj committed Dec 10, 2019
1 parent d7d6aa7 commit 06098e4
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 14 deletions.
30 changes: 16 additions & 14 deletions futures-io/src/lib.rs
Expand Up @@ -103,8 +103,8 @@ mod if_std {
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// readable or is closed.
/// By default, this method delegates to using `poll_read` on the first
/// buffer in `bufs`. Objects which support vectored IO should override
/// this method.
/// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
/// support vectored IO should override this method.
///
/// # Implementation
///
Expand All @@ -115,12 +115,13 @@ mod if_std {
fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>])
-> Poll<Result<usize>>
{
if let Some(first_iovec) = bufs.get_mut(0) {
self.poll_read(cx, &mut **first_iovec)
} else {
// `bufs` is empty.
Poll::Ready(Ok(0))
for b in bufs {
if !b.is_empty() {
return self.poll_read(cx, b);
}
}

self.poll_read(cx, &mut [])
}
}

Expand Down Expand Up @@ -164,8 +165,8 @@ mod if_std {
/// writable or is closed.
///
/// By default, this method delegates to using `poll_write` on the first
/// buffer in `bufs`. Objects which support vectored IO should override
/// this method.
/// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
/// support vectored IO should override this method.
///
/// # Implementation
///
Expand All @@ -176,12 +177,13 @@ mod if_std {
fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>])
-> Poll<Result<usize>>
{
if let Some(first_iovec) = bufs.get(0) {
self.poll_write(cx, &**first_iovec)
} else {
// `bufs` is empty.
Poll::Ready(Ok(0))
for b in bufs {
if !b.is_empty() {
return self.poll_write(cx, b);
}
}

self.poll_write(cx, &[])
}

/// Attempt to flush the object, ensuring that any buffered data reach
Expand Down
65 changes: 65 additions & 0 deletions futures/tests/io_read.rs
@@ -0,0 +1,65 @@
use futures::io::AsyncRead;
use futures_test::task::panic_context;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MockReader {
fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>,
}

impl MockReader {
pub fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
MockReader { fun: Box::new(fun) }
}
}

impl AsyncRead for MockReader {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut [u8]
) -> Poll<io::Result<usize>> {
(self.get_mut().fun)(buf)
}
}

/// Verifies that the default implementation of `poll_read_vectored`
/// calls `poll_read` with an empty slice if no buffers are provided.
#[test]
fn read_vectored_no_buffers() {
let mut reader = MockReader::new(|buf| {
assert_eq!(buf, b"");
Err(io::ErrorKind::BrokenPipe.into()).into()
});
let cx = &mut panic_context();
let bufs = &mut [];

let res = Pin::new(&mut reader).poll_read_vectored(cx, bufs);
let res = res.map_err(|e| e.kind());
assert_eq!(res, Poll::Ready(Err(io::ErrorKind::BrokenPipe)))
}

/// Verifies that the default implementation of `poll_read_vectored`
/// calls `poll_read` with the first non-empty buffer.
#[test]
fn read_vectored_first_non_empty() {
let mut reader = MockReader::new(|buf| {
assert_eq!(buf.len(), 4);
buf.copy_from_slice(b"four");
Poll::Ready(Ok(4))
});
let cx = &mut panic_context();
let mut buf = [0; 4];
let bufs = &mut [
io::IoSliceMut::new(&mut []),
io::IoSliceMut::new(&mut []),
io::IoSliceMut::new(&mut buf),
];

let res = Pin::new(&mut reader).poll_read_vectored(cx, bufs);
let res = res.map_err(|e| e.kind());
assert_eq!(res, Poll::Ready(Ok(4)));
assert_eq!(buf, b"four"[..]);
}

70 changes: 70 additions & 0 deletions futures/tests/io_write.rs
@@ -0,0 +1,70 @@
use futures::io::AsyncWrite;
use futures_test::task::panic_context;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MockWriter {
fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>,
}

impl MockWriter {
pub fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
MockWriter { fun: Box::new(fun) }
}
}

impl AsyncWrite for MockWriter {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
(self.get_mut().fun)(buf)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
panic!()
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
panic!()
}
}

/// Verifies that the default implementation of `poll_write_vectored`
/// calls `poll_write` with an empty slice if no buffers are provided.
#[test]
fn write_vectored_no_buffers() {
let mut writer = MockWriter::new(|buf| {
assert_eq!(buf, b"");
Err(io::ErrorKind::BrokenPipe.into()).into()
});
let cx = &mut panic_context();
let bufs = &mut [];

let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs);
let res = res.map_err(|e| e.kind());
assert_eq!(res, Poll::Ready(Err(io::ErrorKind::BrokenPipe)))
}

/// Verifies that the default implementation of `poll_write_vectored`
/// calls `poll_write` with the first non-empty buffer.
#[test]
fn write_vectored_first_non_empty() {
let mut writer = MockWriter::new(|buf| {
assert_eq!(buf, b"four");
Poll::Ready(Ok(4))
});
let cx = &mut panic_context();
let bufs = &mut [
io::IoSlice::new(&[]),
io::IoSlice::new(&[]),
io::IoSlice::new(b"four")
];

let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs);
let res = res.map_err(|e| e.kind());
assert_eq!(res, Poll::Ready(Ok(4)));
}

0 comments on commit 06098e4

Please sign in to comment.