Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use first nonempty buffer in vectored I/O #1998

Merged
merged 1 commit into from Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 16 additions & 14 deletions futures-io/src/lib.rs
Expand Up @@ -103,8 +103,8 @@ mod if_std {
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// readable or is closed.
/// By default, this method delegates to using `poll_read` on the first
/// buffer in `bufs`. Objects which support vectored IO should override
/// this method.
/// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
/// support vectored IO should override this method.
///
/// # Implementation
///
Expand All @@ -115,12 +115,13 @@ mod if_std {
fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>])
-> Poll<Result<usize>>
{
if let Some(first_iovec) = bufs.get_mut(0) {
self.poll_read(cx, &mut **first_iovec)
} else {
// `bufs` is empty.
Poll::Ready(Ok(0))
for b in bufs {
if !b.is_empty() {
return self.poll_read(cx, b);
}
}

self.poll_read(cx, &mut [])
}
}

Expand Down Expand Up @@ -164,8 +165,8 @@ mod if_std {
/// writable or is closed.
///
/// By default, this method delegates to using `poll_write` on the first
/// buffer in `bufs`. Objects which support vectored IO should override
/// this method.
/// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
/// support vectored IO should override this method.
///
/// # Implementation
///
Expand All @@ -176,12 +177,13 @@ mod if_std {
fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>])
-> Poll<Result<usize>>
{
if let Some(first_iovec) = bufs.get(0) {
self.poll_write(cx, &**first_iovec)
} else {
// `bufs` is empty.
Poll::Ready(Ok(0))
for b in bufs {
if !b.is_empty() {
return self.poll_write(cx, b);
}
}

self.poll_write(cx, &[])
}

/// Attempt to flush the object, ensuring that any buffered data reach
Expand Down
65 changes: 65 additions & 0 deletions futures/tests/io_read.rs
@@ -0,0 +1,65 @@
use futures::io::AsyncRead;
use futures_test::task::panic_context;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MockReader {
fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>,
}

impl MockReader {
pub fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
MockReader { fun: Box::new(fun) }
}
}

impl AsyncRead for MockReader {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut [u8]
) -> Poll<io::Result<usize>> {
(self.get_mut().fun)(buf)
}
}

/// Verifies that the default implementation of `poll_read_vectored`
/// calls `poll_read` with an empty slice if no buffers are provided.
#[test]
fn read_vectored_no_buffers() {
let mut reader = MockReader::new(|buf| {
assert_eq!(buf, b"");
Err(io::ErrorKind::BrokenPipe.into()).into()
});
let cx = &mut panic_context();
let bufs = &mut [];

let res = Pin::new(&mut reader).poll_read_vectored(cx, bufs);
let res = res.map_err(|e| e.kind());
assert_eq!(res, Poll::Ready(Err(io::ErrorKind::BrokenPipe)))
}

/// Verifies that the default implementation of `poll_read_vectored`
/// calls `poll_read` with the first non-empty buffer.
#[test]
fn read_vectored_first_non_empty() {
let mut reader = MockReader::new(|buf| {
assert_eq!(buf.len(), 4);
buf.copy_from_slice(b"four");
Poll::Ready(Ok(4))
});
let cx = &mut panic_context();
let mut buf = [0; 4];
let bufs = &mut [
io::IoSliceMut::new(&mut []),
io::IoSliceMut::new(&mut []),
io::IoSliceMut::new(&mut buf),
];

let res = Pin::new(&mut reader).poll_read_vectored(cx, bufs);
let res = res.map_err(|e| e.kind());
assert_eq!(res, Poll::Ready(Ok(4)));
assert_eq!(buf, b"four"[..]);
}

70 changes: 70 additions & 0 deletions futures/tests/io_write.rs
@@ -0,0 +1,70 @@
use futures::io::AsyncWrite;
use futures_test::task::panic_context;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MockWriter {
fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>,
}

impl MockWriter {
pub fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
MockWriter { fun: Box::new(fun) }
}
}

impl AsyncWrite for MockWriter {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
(self.get_mut().fun)(buf)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
panic!()
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
panic!()
}
}

/// Verifies that the default implementation of `poll_write_vectored`
/// calls `poll_write` with an empty slice if no buffers are provided.
#[test]
fn write_vectored_no_buffers() {
let mut writer = MockWriter::new(|buf| {
assert_eq!(buf, b"");
Err(io::ErrorKind::BrokenPipe.into()).into()
});
let cx = &mut panic_context();
let bufs = &mut [];

let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs);
let res = res.map_err(|e| e.kind());
assert_eq!(res, Poll::Ready(Err(io::ErrorKind::BrokenPipe)))
}

/// Verifies that the default implementation of `poll_write_vectored`
/// calls `poll_write` with the first non-empty buffer.
#[test]
fn write_vectored_first_non_empty() {
let mut writer = MockWriter::new(|buf| {
assert_eq!(buf, b"four");
Poll::Ready(Ok(4))
});
let cx = &mut panic_context();
let bufs = &mut [
io::IoSlice::new(&[]),
io::IoSlice::new(&[]),
io::IoSlice::new(b"four")
];

let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs);
let res = res.map_err(|e| e.kind());
assert_eq!(res, Poll::Ready(Ok(4)));
}