New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
io: Add write_all_buf to AsyncWriteExt #3737
Changes from 2 commits
19d4442
2d8d7bf
1fede65
84e9ea8
d09e4ca
2c3337f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ use crate::io::util::flush::{flush, Flush}; | |
use crate::io::util::shutdown::{shutdown, Shutdown}; | ||
use crate::io::util::write::{write, Write}; | ||
use crate::io::util::write_all::{write_all, WriteAll}; | ||
use crate::io::util::write_all_buf::{write_all_buf, WriteAllBuf}; | ||
use crate::io::util::write_buf::{write_buf, WriteBuf}; | ||
use crate::io::util::write_int::{ | ||
WriteI128, WriteI128Le, WriteI16, WriteI16Le, WriteI32, WriteI32Le, WriteI64, WriteI64Le, | ||
|
@@ -233,6 +234,52 @@ cfg_io_util! { | |
write_buf(self, src) | ||
} | ||
|
||
/// Attempts to write an entire buffer into this writer | ||
/// | ||
/// Equivalent to: | ||
/// | ||
/// ```ignore | ||
/// async fn write_all_buf(&mut self, buf: impl Buf) -> Result<(), io::Error> { | ||
/// while buf.has_remaining() { | ||
/// self.write_buf(&mut buf).await?; | ||
/// } | ||
/// } | ||
/// ``` | ||
/// | ||
/// This method will continuously call [`write`] until | ||
/// [`buf.has_remaining()`](bytes::Buf::has_remaining) returns false. This method will not | ||
/// return until the entire buffer has been successfully written or an error occurs. The | ||
/// first error generated will be returned. | ||
/// | ||
/// The buffer is advanced after each chunk is successfully written. After failure, | ||
/// `src.chunk()` will return the chunk that failed to write. | ||
/// | ||
/// # Examples | ||
/// | ||
/// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]: | ||
/// | ||
/// [`File`]: crate::fs::File | ||
/// [`Buf`]: bytes::Buf | ||
/// | ||
/// ```no_run | ||
/// use tokio::io::{self, AsyncWriteExt}; | ||
/// use tokio::fs::File; | ||
/// | ||
/// use std::io::Cursor; | ||
/// | ||
/// #[tokio::main] | ||
/// async fn main() -> io::Result<()> { | ||
/// let mut file = File::create("foo.txt").await?; | ||
/// let mut buffer = Cursor::new(b"data to write"); | ||
/// | ||
/// file.write_all_buf(&mut buffer).await?; | ||
/// Ok(()) | ||
/// } | ||
/// ``` | ||
fn write_all_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteAllBuf<'a, Self, B> where Self: Sized + Unpin, B: Buf { | ||
write_all_buf(self, src) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please run this through rustfmt. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using rustfmt from the commandline will not fix this due to the macro around the trait. You will need to paste it elsewhere. Correctly rustfmtd code does not have the |
||
|
||
/// Attempts to write an entire buffer into this writer. | ||
/// | ||
/// Equivalent to: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
use crate::io::AsyncWrite; | ||
|
||
use bytes::Buf; | ||
use pin_project_lite::pin_project; | ||
use std::future::Future; | ||
use std::io; | ||
use std::marker::PhantomPinned; | ||
use std::pin::Pin; | ||
use std::task::{Context, Poll}; | ||
|
||
pin_project! { | ||
/// A future to write some of the buffer to an `AsyncWrite`. | ||
#[derive(Debug)] | ||
#[must_use = "futures do nothing unless you `.await` or poll them"] | ||
pub struct WriteAllBuf<'a, W, B> { | ||
writer: &'a mut W, | ||
buf: &'a mut B, | ||
#[pin] | ||
_pin: PhantomPinned, | ||
} | ||
} | ||
|
||
/// Tries to write some bytes from the given `buf` to the writer in an | ||
/// asynchronous manner, returning a future. | ||
pub(crate) fn write_all_buf<'a, W, B>(writer: &'a mut W, buf: &'a mut B) -> WriteAllBuf<'a, W, B> | ||
where | ||
W: AsyncWrite + Unpin, | ||
B: Buf, | ||
{ | ||
WriteAllBuf { | ||
writer, | ||
buf, | ||
_pin: PhantomPinned, | ||
} | ||
} | ||
|
||
impl<W, B> Future for WriteAllBuf<'_, W, B> | ||
where | ||
W: AsyncWrite + Unpin, | ||
B: Buf, | ||
{ | ||
type Output = io::Result<()>; | ||
|
||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
let me = self.project(); | ||
while me.buf.has_remaining() { | ||
let n = ready!(Pin::new(&mut *me.writer).poll_write(cx, me.buf.chunk())?); | ||
me.buf.advance(n); | ||
if n == 0 { | ||
return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); | ||
} | ||
} | ||
|
||
Poll::Ready(Ok(())) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
#![warn(rust_2018_idioms)] | ||
#![cfg(feature = "full")] | ||
|
||
use tokio::io::{AsyncWrite, AsyncWriteExt}; | ||
use tokio_test::{assert_err, assert_ok}; | ||
|
||
use bytes::{Buf, Bytes, BytesMut}; | ||
use std::cmp; | ||
use std::io; | ||
use std::pin::Pin; | ||
use std::task::{Context, Poll}; | ||
|
||
#[tokio::test] | ||
async fn write_all_buf() { | ||
struct Wr { | ||
buf: BytesMut, | ||
cnt: usize, | ||
} | ||
|
||
impl AsyncWrite for Wr { | ||
fn poll_write( | ||
mut self: Pin<&mut Self>, | ||
_cx: &mut Context<'_>, | ||
buf: &[u8], | ||
) -> Poll<io::Result<usize>> { | ||
let n = cmp::min(4, buf.len()); | ||
dbg!(buf); | ||
let buf = &buf[0..n]; | ||
|
||
self.cnt += 1; | ||
self.buf.extend(buf); | ||
Ok(buf.len()).into() | ||
} | ||
|
||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
Ok(()).into() | ||
} | ||
|
||
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
Ok(()).into() | ||
} | ||
} | ||
|
||
let mut wr = Wr { | ||
buf: BytesMut::with_capacity(64), | ||
cnt: 0, | ||
}; | ||
|
||
let mut buf = Bytes::from_static(b"hello").chain(Bytes::from_static(b"world")); | ||
|
||
assert_ok!(wr.write_all_buf(&mut buf).await); | ||
assert_eq!(wr.buf, b"helloworld"[..]); | ||
// expect 4 writes, [hell],[o],[worl],[d] | ||
assert_eq!(wr.cnt, 4); | ||
assert_eq!(buf.has_remaining(), false); | ||
} | ||
|
||
#[tokio::test] | ||
async fn write_buf_err() { | ||
/// Error out after writing the first 4 bytes | ||
struct Wr { | ||
cnt: usize, | ||
} | ||
|
||
impl AsyncWrite for Wr { | ||
fn poll_write( | ||
mut self: Pin<&mut Self>, | ||
_cx: &mut Context<'_>, | ||
_buf: &[u8], | ||
) -> Poll<io::Result<usize>> { | ||
self.cnt += 1; | ||
if self.cnt == 2 { | ||
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "whoops"))); | ||
} | ||
Poll::Ready(Ok(4)) | ||
} | ||
|
||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
Ok(()).into() | ||
} | ||
|
||
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
Ok(()).into() | ||
} | ||
} | ||
|
||
let mut wr = Wr { cnt: 0 }; | ||
|
||
let mut buf = Bytes::from_static(b"hello").chain(Bytes::from_static(b"world")); | ||
|
||
assert_err!(wr.write_all_buf(&mut buf).await); | ||
assert_eq!( | ||
buf.copy_to_bytes(buf.remaining()), | ||
Bytes::from_static(b"oworld") | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the cursor link here is broken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. It was broken for
write_buf
as well (& it was just wrong, I think? I believe it meant to sayfile implements
AsyncWrite`).I fixed both.