From 54743e6b1061f8b31d4f56101883e20bda391499 Mon Sep 17 00:00:00 2001 From: Felipe Lema <1232306+FelipeLema@users.noreply.github.com> Date: Fri, 20 Aug 2021 15:46:46 -0400 Subject: [PATCH 1/9] fill with boilerplate code --- futures-util/src/io/buf_writer.rs | 39 ++++++++- futures-util/src/io/line_writer.rs | 127 +++++++++++++++++++++++++++++ futures-util/src/io/mod.rs | 3 + 3 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 futures-util/src/io/line_writer.rs diff --git a/futures-util/src/io/buf_writer.rs b/futures-util/src/io/buf_writer.rs index f292b871f1..44d35cee62 100644 --- a/futures-util/src/io/buf_writer.rs +++ b/futures-util/src/io/buf_writer.rs @@ -6,6 +6,7 @@ use pin_project_lite::pin_project; use std::fmt; use std::io::{self, Write}; use std::pin::Pin; +use std::ptr; pin_project! { /// Wraps a writer and buffers its output. @@ -49,7 +50,7 @@ impl BufWriter { Self { inner, buf: Vec::with_capacity(cap), written: 0 } } - fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + pub(super) fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); let len = this.buf.len(); @@ -83,6 +84,42 @@ impl BufWriter { pub fn buffer(&self) -> &[u8] { &self.buf } + + /// TODO WIP + pub(super) fn capacity(&self) -> usize { + self.buf.capacity() + } + + #[inline] + /// TODO WIP + pub(super) fn spare_capacity(&self) -> usize { + self.buf.capacity() - self.buf.len() + } + + /// TODO WIP + pub(super) fn write_to_buf(mut self: Pin<&mut Self>, buf: &[u8]) -> usize { + let available = self.spare_capacity(); + let amt_to_buffer = available.min(buf.len()); + + // SAFETY: `amt_to_buffer` is <= buffer's spare capacity by construction. + unsafe { + self.write_to_buffer_unchecked(&buf[..amt_to_buffer]); + } + + amt_to_buffer + } + + #[inline] + /// TODO WIP + unsafe fn write_to_buffer_unchecked(mut self: Pin<&mut Self>, buf: &[u8]) { + debug_assert!(buf.len() <= self.spare_capacity()); + let old_len = self.buf.len(); + let buf_len = buf.len(); + let src = buf.as_ptr(); + let dst = self.buf.as_mut_ptr().add(old_len); + ptr::copy_nonoverlapping(src, dst, buf_len); + self.buf.set_len(old_len + buf_len); + } } impl AsyncWrite for BufWriter { diff --git a/futures-util/src/io/line_writer.rs b/futures-util/src/io/line_writer.rs new file mode 100644 index 0000000000..7e801df8c8 --- /dev/null +++ b/futures-util/src/io/line_writer.rs @@ -0,0 +1,127 @@ +use super::buf_writer::BufWriter; +use futures_core::ready; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncWrite; +use pin_project_lite::pin_project; +use pin_utils::pin_mut; +use std::io; +use std::pin::Pin; + +pin_project! { +#[derive(Debug)] +struct LineWriterShim { + #[pin] + buffer: BufWriter, // TODO HELP what's this field's type suppossed to be? +} +} + +impl LineWriterShim { + /// TODO WIP + fn buffered(&self) -> &[u8] { + self.buffer.buffer() + } + /// TODO WIP + fn flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + //let this = &mut *self; + match self.buffered().last().copied() { + Some(b'\n') => this.buffer.flush_buf(cx), + _ => Poll::Ready(Ok(())), + } + } +} + +impl AsyncWrite for LineWriterShim { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.project(); + let newline_index = match memchr::memrchr(b'\n', buf) { + None => { + ready!(self.flush_if_completed_line(cx)?); + return this.buffer.poll_write(cx, buf); + } + Some(newline_index) => newline_index + 1, + }; + + this.buffer.poll_flush(cx)?; + + let lines = &buf[..newline_index]; + + let flushed = ready!(this.buffer.as_mut().poll_write(cx, lines))?; + + if flushed == 0 { + return Poll::Ready(Ok(0)); + } + + let tail = if flushed >= newline_index { + &buf[flushed..] + } else if newline_index - flushed <= self.buffer.capacity() { + &buf[flushed..newline_index] + } else { + let scan_area = &buf[flushed..]; + let scan_area = &scan_area[..self.buffer.capacity()]; + match memchr::memrchr(b'\n', scan_area) { + Some(newline_index) => &scan_area[..newline_index + 1], + None => scan_area, + } + }; + + let buffered = self.buffer.write_to_buf(tail); + Poll::Ready(Ok(flushed + buffered)) + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().buffer.poll_flush(cx) + } + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().buffer.poll_close(cx) + } +} +pin_project! { +/// TODO: WIP +#[derive(Debug)] +pub struct LineWriter { + #[pin] + inner: BufWriter, +} +} + +impl LineWriter { + /// TODO: WIP + pub fn new(inner: W) -> LineWriter { + // 1024 is taken from std::io::buffered::LineWriter + LineWriter::with_capacity(1024, inner) + } + /// TODO: WIP + pub fn with_capacity(capacity: usize, inner: W) -> LineWriter { + LineWriter { inner: BufWriter::with_capacity(capacity, inner) } + } +} + +impl AsyncWrite for LineWriter { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let lws = LineWriterShim { buffer: Box::new(&self.inner).as_mut() }; + pin_mut!(lws); + lws.poll_write(cx, buf) + + //lws.poll_write(cx, buf) + + //Pin::new(&mut lws).poll_write(cx, buf) + + //Box::pin(&mut lws).poll_write(cx, buf) + + //Pin::new(&mut *lws).poll_write(cx, buf) + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_close(cx) + } +} diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index b96223d1c1..b9377c4fe3 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -61,6 +61,9 @@ pub use self::buf_reader::BufReader; mod buf_writer; pub use self::buf_writer::BufWriter; +mod line_writer; +pub use self::line_writer::LineWriter; + mod chain; pub use self::chain::Chain; From 0fd0e5e3622e42d8c5f4df9a429429331d15622d Mon Sep 17 00:00:00 2001 From: Felipe Lema <1232306+FelipeLema@users.noreply.github.com> Date: Sat, 21 Aug 2021 16:52:39 -0400 Subject: [PATCH 2/9] apply recommended changes and comply with compiler see https://github.com/rust-lang/futures-rs/pull/2477#pullrequestreview-735474602 --- futures-util/src/io/buf_writer.rs | 11 ++++++----- futures-util/src/io/line_writer.rs | 20 ++++++++++---------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/futures-util/src/io/buf_writer.rs b/futures-util/src/io/buf_writer.rs index 44d35cee62..bbf1df4a0d 100644 --- a/futures-util/src/io/buf_writer.rs +++ b/futures-util/src/io/buf_writer.rs @@ -97,7 +97,7 @@ impl BufWriter { } /// TODO WIP - pub(super) fn write_to_buf(mut self: Pin<&mut Self>, buf: &[u8]) -> usize { + pub(super) fn write_to_buf(self: Pin<&mut Self>, buf: &[u8]) -> usize { let available = self.spare_capacity(); let amt_to_buffer = available.min(buf.len()); @@ -111,14 +111,15 @@ impl BufWriter { #[inline] /// TODO WIP - unsafe fn write_to_buffer_unchecked(mut self: Pin<&mut Self>, buf: &[u8]) { + unsafe fn write_to_buffer_unchecked(self: Pin<&mut Self>, buf: &[u8]) { debug_assert!(buf.len() <= self.spare_capacity()); - let old_len = self.buf.len(); + let this = self.project(); + let old_len = this.buf.len(); let buf_len = buf.len(); let src = buf.as_ptr(); - let dst = self.buf.as_mut_ptr().add(old_len); + let dst = this.buf.as_mut_ptr().add(old_len); ptr::copy_nonoverlapping(src, dst, buf_len); - self.buf.set_len(old_len + buf_len); + this.buf.set_len(old_len + buf_len); } } diff --git a/futures-util/src/io/line_writer.rs b/futures-util/src/io/line_writer.rs index 7e801df8c8..7aef2b575e 100644 --- a/futures-util/src/io/line_writer.rs +++ b/futures-util/src/io/line_writer.rs @@ -24,7 +24,7 @@ impl LineWriterShim { fn flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); //let this = &mut *self; - match self.buffered().last().copied() { + match this.buffer.buffer().last().copied() { Some(b'\n') => this.buffer.flush_buf(cx), _ => Poll::Ready(Ok(())), } @@ -37,16 +37,16 @@ impl AsyncWrite for LineWriterShim { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - let this = self.project(); + let mut this = self.as_mut().project(); let newline_index = match memchr::memrchr(b'\n', buf) { None => { - ready!(self.flush_if_completed_line(cx)?); - return this.buffer.poll_write(cx, buf); + ready!(self.as_mut().flush_if_completed_line(cx)?); + return self.project().buffer.poll_write(cx, buf); } Some(newline_index) => newline_index + 1, }; - this.buffer.poll_flush(cx)?; + this.buffer.as_mut().poll_flush(cx)?; let lines = &buf[..newline_index]; @@ -58,24 +58,24 @@ impl AsyncWrite for LineWriterShim { let tail = if flushed >= newline_index { &buf[flushed..] - } else if newline_index - flushed <= self.buffer.capacity() { + } else if newline_index - flushed <= this.buffer.capacity() { &buf[flushed..newline_index] } else { let scan_area = &buf[flushed..]; - let scan_area = &scan_area[..self.buffer.capacity()]; + let scan_area = &scan_area[..this.buffer.capacity()]; match memchr::memrchr(b'\n', scan_area) { Some(newline_index) => &scan_area[..newline_index + 1], None => scan_area, } }; - let buffered = self.buffer.write_to_buf(tail); + let buffered = this.buffer.write_to_buf(tail); Poll::Ready(Ok(flushed + buffered)) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().buffer.poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().buffer.poll_close(cx) } } From 5667198c610e619d149ded79ee2a50e6f29262a0 Mon Sep 17 00:00:00 2001 From: Felipe Lema <1232306+FelipeLema@users.noreply.github.com> Date: Sun, 22 Aug 2021 01:09:50 -0400 Subject: [PATCH 3/9] get rid of LineWriterShim doing so saves me thinking on how to deal with pin and fields --- futures-util/src/io/line_writer.rs | 85 ++++++++---------------------- 1 file changed, 22 insertions(+), 63 deletions(-) diff --git a/futures-util/src/io/line_writer.rs b/futures-util/src/io/line_writer.rs index 7aef2b575e..a2f905e8d7 100644 --- a/futures-util/src/io/line_writer.rs +++ b/futures-util/src/io/line_writer.rs @@ -3,35 +3,40 @@ use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::AsyncWrite; use pin_project_lite::pin_project; -use pin_utils::pin_mut; use std::io; use std::pin::Pin; pin_project! { +/// TODO: WIP #[derive(Debug)] -struct LineWriterShim { +pub struct LineWriter { #[pin] - buffer: BufWriter, // TODO HELP what's this field's type suppossed to be? + inner: BufWriter, } } -impl LineWriterShim { - /// TODO WIP - fn buffered(&self) -> &[u8] { - self.buffer.buffer() +impl LineWriter { + /// TODO: WIP + pub fn new(inner: W) -> LineWriter { + // 1024 is taken from std::io::buffered::LineWriter + LineWriter::with_capacity(1024, inner) + } + /// TODO: WIP + pub fn with_capacity(capacity: usize, inner: W) -> LineWriter { + LineWriter { inner: BufWriter::with_capacity(capacity, inner) } } /// TODO WIP fn flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); //let this = &mut *self; - match this.buffer.buffer().last().copied() { - Some(b'\n') => this.buffer.flush_buf(cx), + match this.inner.buffer().last().copied() { + Some(b'\n') => this.inner.flush_buf(cx), _ => Poll::Ready(Ok(())), } } } -impl AsyncWrite for LineWriterShim { +impl AsyncWrite for LineWriter { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -41,16 +46,16 @@ impl AsyncWrite for LineWriterShim { let newline_index = match memchr::memrchr(b'\n', buf) { None => { ready!(self.as_mut().flush_if_completed_line(cx)?); - return self.project().buffer.poll_write(cx, buf); + return self.project().inner.poll_write(cx, buf); } Some(newline_index) => newline_index + 1, }; - this.buffer.as_mut().poll_flush(cx)?; + ready!(this.inner.as_mut().poll_flush(cx)?); let lines = &buf[..newline_index]; - let flushed = ready!(this.buffer.as_mut().poll_write(cx, lines))?; + let flushed = ready!(this.inner.as_mut().poll_write(cx, lines))?; if flushed == 0 { return Poll::Ready(Ok(0)); @@ -58,70 +63,24 @@ impl AsyncWrite for LineWriterShim { let tail = if flushed >= newline_index { &buf[flushed..] - } else if newline_index - flushed <= this.buffer.capacity() { + } else if newline_index - flushed <= this.inner.capacity() { &buf[flushed..newline_index] } else { let scan_area = &buf[flushed..]; - let scan_area = &scan_area[..this.buffer.capacity()]; + let scan_area = &scan_area[..this.inner.capacity()]; match memchr::memrchr(b'\n', scan_area) { Some(newline_index) => &scan_area[..newline_index + 1], None => scan_area, } }; - let buffered = this.buffer.write_to_buf(tail); + let buffered = this.inner.write_to_buf(tail); Poll::Ready(Ok(flushed + buffered)) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().buffer.poll_flush(cx) - } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().buffer.poll_close(cx) - } -} -pin_project! { -/// TODO: WIP -#[derive(Debug)] -pub struct LineWriter { - #[pin] - inner: BufWriter, -} -} - -impl LineWriter { - /// TODO: WIP - pub fn new(inner: W) -> LineWriter { - // 1024 is taken from std::io::buffered::LineWriter - LineWriter::with_capacity(1024, inner) - } - /// TODO: WIP - pub fn with_capacity(capacity: usize, inner: W) -> LineWriter { - LineWriter { inner: BufWriter::with_capacity(capacity, inner) } - } -} - -impl AsyncWrite for LineWriter { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let lws = LineWriterShim { buffer: Box::new(&self.inner).as_mut() }; - pin_mut!(lws); - lws.poll_write(cx, buf) - - //lws.poll_write(cx, buf) - - //Pin::new(&mut lws).poll_write(cx, buf) - - //Box::pin(&mut lws).poll_write(cx, buf) - - //Pin::new(&mut *lws).poll_write(cx, buf) - } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().inner.poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().inner.poll_close(cx) } } From c2d43569275c69d47aa6e9356890ecdcac5bbaa9 Mon Sep 17 00:00:00 2001 From: Felipe Lema <1232306+FelipeLema@users.noreply.github.com> Date: Sun, 22 Aug 2021 01:29:49 -0400 Subject: [PATCH 4/9] add documentation, point to original code --- futures-util/src/io/buf_writer.rs | 15 +++++++++++---- futures-util/src/io/line_writer.rs | 31 ++++++++++++++++++++---------- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/futures-util/src/io/buf_writer.rs b/futures-util/src/io/buf_writer.rs index bbf1df4a0d..91391f597a 100644 --- a/futures-util/src/io/buf_writer.rs +++ b/futures-util/src/io/buf_writer.rs @@ -85,18 +85,23 @@ impl BufWriter { &self.buf } - /// TODO WIP + /// Capacity of `buf`. how many chars can be held in buffer pub(super) fn capacity(&self) -> usize { self.buf.capacity() } + /// Remaining number of bytes to reach `buf` 's capacity #[inline] - /// TODO WIP pub(super) fn spare_capacity(&self) -> usize { self.buf.capacity() - self.buf.len() } - /// TODO WIP + /// Write a byte slice directly into buffer + /// + /// Will truncate the number of bytes written to `spare_capacity()` so you want to + /// calculate the size of your slice to avoid losing bytes + /// + /// Based on `std::io::BufWriter` pub(super) fn write_to_buf(self: Pin<&mut Self>, buf: &[u8]) -> usize { let available = self.spare_capacity(); let amt_to_buffer = available.min(buf.len()); @@ -109,8 +114,10 @@ impl BufWriter { amt_to_buffer } + /// Write byte slice directly into `self.buf` + /// + /// Based on `std::io::BufWriter` #[inline] - /// TODO WIP unsafe fn write_to_buffer_unchecked(self: Pin<&mut Self>, buf: &[u8]) { debug_assert!(buf.len() <= self.spare_capacity()); let this = self.project(); diff --git a/futures-util/src/io/line_writer.rs b/futures-util/src/io/line_writer.rs index a2f905e8d7..ab4c22f278 100644 --- a/futures-util/src/io/line_writer.rs +++ b/futures-util/src/io/line_writer.rs @@ -7,7 +7,13 @@ use std::io; use std::pin::Pin; pin_project! { -/// TODO: WIP +/// Wrap a writer, like [`BufWriter`] does, but prioritizes buffering lines +/// +/// This was written based on `std::io::LineWriter` which goes into further details +/// explaining the code. +/// +/// Buffering is actually done using `BufWriter`. This class will leverage `BufWriter` +/// to write on-each-line. #[derive(Debug)] pub struct LineWriter { #[pin] @@ -16,19 +22,20 @@ pub struct LineWriter { } impl LineWriter { - /// TODO: WIP + /// Create a new `LineWriter` with default buffer capacity. The default is currently 1KB + /// which was taken from `std::io::LineWriter` pub fn new(inner: W) -> LineWriter { - // 1024 is taken from std::io::buffered::LineWriter LineWriter::with_capacity(1024, inner) } - /// TODO: WIP + + /// Creates a new `LineWriter` with the specified buffer capacity. pub fn with_capacity(capacity: usize, inner: W) -> LineWriter { LineWriter { inner: BufWriter::with_capacity(capacity, inner) } } - /// TODO WIP + + /// Flush `inner` if last char is "new line" fn flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - //let this = &mut *self; match this.inner.buffer().last().copied() { Some(b'\n') => this.inner.flush_buf(cx), _ => Poll::Ready(Ok(())), @@ -77,10 +84,14 @@ impl AsyncWrite for LineWriter { let buffered = this.inner.write_to_buf(tail); Poll::Ready(Ok(flushed + buffered)) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_flush(cx) + + /// Forward to `inner` 's `BufWriter::poll_flush()` + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.as_mut().project().inner.poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_close(cx) + + /// Forward to `inner` 's `BufWriter::poll_close()` + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.as_mut().project().inner.poll_close(cx) } } From d019027dafafde24783dec2d916e760fafb2d708 Mon Sep 17 00:00:00 2001 From: Felipe Lema <1232306+FelipeLema@users.noreply.github.com> Date: Mon, 23 Aug 2021 02:22:00 -0400 Subject: [PATCH 5/9] first round of unit tests - translate `std::io::LineWriter`'s unit test - rename `inner` field to `buf_writer` to avoid confusion with `BufWriter::inner` - the above lead me to re-write the logic accordingly - add methods to access `buf_writer`'s fields - sorting out types, cannot compile for now --- futures-util/src/io/buf_writer.rs | 2 +- futures-util/src/io/line_writer.rs | 41 +++++++++++++++++++----------- futures/tests/io_line_writer.rs | 26 +++++++++++++++++++ 3 files changed, 53 insertions(+), 16 deletions(-) create mode 100644 futures/tests/io_line_writer.rs diff --git a/futures-util/src/io/buf_writer.rs b/futures-util/src/io/buf_writer.rs index 91391f597a..88ed2e9fe4 100644 --- a/futures-util/src/io/buf_writer.rs +++ b/futures-util/src/io/buf_writer.rs @@ -32,7 +32,7 @@ pin_project! { // TODO: Examples pub struct BufWriter { #[pin] - inner: W, + pub(super) inner: W, buf: Vec, written: usize, } diff --git a/futures-util/src/io/line_writer.rs b/futures-util/src/io/line_writer.rs index ab4c22f278..f0cd810a9f 100644 --- a/futures-util/src/io/line_writer.rs +++ b/futures-util/src/io/line_writer.rs @@ -17,7 +17,7 @@ pin_project! { #[derive(Debug)] pub struct LineWriter { #[pin] - inner: BufWriter, + buf_writer: BufWriter, } } @@ -30,17 +30,27 @@ impl LineWriter { /// Creates a new `LineWriter` with the specified buffer capacity. pub fn with_capacity(capacity: usize, inner: W) -> LineWriter { - LineWriter { inner: BufWriter::with_capacity(capacity, inner) } + LineWriter { buf_writer: BufWriter::with_capacity(capacity, inner) } } - /// Flush `inner` if last char is "new line" + /// Flush `buf_writer` if last char is "new line" fn flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - match this.inner.buffer().last().copied() { - Some(b'\n') => this.inner.flush_buf(cx), + match this.buf_writer.buffer().last().copied() { + Some(b'\n') => this.buf_writer.flush_buf(cx), _ => Poll::Ready(Ok(())), } } + + /// Returns a reference to `buf_writer`'s internally buffered data. + pub fn buffer(&self) -> &[u8] { + &self.buf_writer.buffer() + } + /// Acquires a reference to the underlying sink or stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &W { + self.buf_writer.get_ref() + } } impl AsyncWrite for LineWriter { @@ -53,16 +63,17 @@ impl AsyncWrite for LineWriter { let newline_index = match memchr::memrchr(b'\n', buf) { None => { ready!(self.as_mut().flush_if_completed_line(cx)?); - return self.project().inner.poll_write(cx, buf); + return self.project().buf_writer.poll_write(cx, buf); } Some(newline_index) => newline_index + 1, }; - ready!(this.inner.as_mut().poll_flush(cx)?); + ready!(this.buf_writer.as_mut().poll_flush(cx)?); let lines = &buf[..newline_index]; - let flushed = ready!(this.inner.as_mut().poll_write(cx, lines))?; + let _buf_writer = this.buf_writer.project(); + let flushed = ready!(_buf_writer.inner.poll_write(cx, lines))?; if flushed == 0 { return Poll::Ready(Ok(0)); @@ -70,28 +81,28 @@ impl AsyncWrite for LineWriter { let tail = if flushed >= newline_index { &buf[flushed..] - } else if newline_index - flushed <= this.inner.capacity() { + } else if newline_index - flushed <= this.buf_writer.capacity() { &buf[flushed..newline_index] } else { let scan_area = &buf[flushed..]; - let scan_area = &scan_area[..this.inner.capacity()]; + let scan_area = &scan_area[..this.buf_writer.capacity()]; match memchr::memrchr(b'\n', scan_area) { Some(newline_index) => &scan_area[..newline_index + 1], None => scan_area, } }; - let buffered = this.inner.write_to_buf(tail); + let buffered = _buf_writer.write_to_buf(tail); // TODO crap! Poll::Ready(Ok(flushed + buffered)) } - /// Forward to `inner` 's `BufWriter::poll_flush()` + /// Forward to `buf_writer` 's `BufWriter::poll_flush()` fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.as_mut().project().inner.poll_flush(cx) + self.as_mut().project().buf_writer.poll_flush(cx) } - /// Forward to `inner` 's `BufWriter::poll_close()` + /// Forward to `buf_writer` 's `BufWriter::poll_close()` fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.as_mut().project().inner.poll_close(cx) + self.as_mut().project().buf_writer.poll_close(cx) } } diff --git a/futures/tests/io_line_writer.rs b/futures/tests/io_line_writer.rs new file mode 100644 index 0000000000..b51ba2ee3d --- /dev/null +++ b/futures/tests/io_line_writer.rs @@ -0,0 +1,26 @@ +use futures::executor::block_on; + +use futures::io::{AsyncWriteExt, LineWriter}; + +#[test] +fn line_writer() { + let mut writer = LineWriter::new(Vec::new()); + + block_on(writer.write(&[0])).unwrap(); + assert_eq!(*writer.get_ref(), []); + + block_on(writer.write(&[1])).unwrap(); + assert_eq!(*writer.get_ref(), []); + + block_on(writer.flush()).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1]); + + block_on(writer.write(&[0, b'\n', 1, b'\n', 2])).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n']); + + block_on(writer.flush()).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2]); + + block_on(writer.write(&[3, b'\n'])).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']); +} From 50a609517e750aab5ad588cf1f3022dbde5e646f Mon Sep 17 00:00:00 2001 From: Felipe Lema <1232306+FelipeLema@users.noreply.github.com> Date: Tue, 24 Aug 2021 18:51:36 -0400 Subject: [PATCH 6/9] fix compilation --- futures-util/src/io/buf_writer.rs | 10 +++++++++- futures-util/src/io/line_writer.rs | 5 ++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/futures-util/src/io/buf_writer.rs b/futures-util/src/io/buf_writer.rs index 88ed2e9fe4..c925b9377f 100644 --- a/futures-util/src/io/buf_writer.rs +++ b/futures-util/src/io/buf_writer.rs @@ -32,7 +32,7 @@ pin_project! { // TODO: Examples pub struct BufWriter { #[pin] - pub(super) inner: W, + inner: W, buf: Vec, written: usize, } @@ -128,6 +128,14 @@ impl BufWriter { ptr::copy_nonoverlapping(src, dst, buf_len); this.buf.set_len(old_len + buf_len); } + /// Write directly using `inner`, bypassing buffering + pub(super) fn inner_poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().inner.poll_write(cx, buf) + } } impl AsyncWrite for BufWriter { diff --git a/futures-util/src/io/line_writer.rs b/futures-util/src/io/line_writer.rs index f0cd810a9f..ba3e107d82 100644 --- a/futures-util/src/io/line_writer.rs +++ b/futures-util/src/io/line_writer.rs @@ -72,8 +72,7 @@ impl AsyncWrite for LineWriter { let lines = &buf[..newline_index]; - let _buf_writer = this.buf_writer.project(); - let flushed = ready!(_buf_writer.inner.poll_write(cx, lines))?; + let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write(cx, lines))? }; if flushed == 0 { return Poll::Ready(Ok(0)); @@ -92,7 +91,7 @@ impl AsyncWrite for LineWriter { } }; - let buffered = _buf_writer.write_to_buf(tail); // TODO crap! + let buffered = this.buf_writer.as_mut().write_to_buf(tail); Poll::Ready(Ok(flushed + buffered)) } From 1163848af70c29cd905aaadac0e60df2c1e2e618 Mon Sep 17 00:00:00 2001 From: Felipe Lema <1232306+FelipeLema@users.noreply.github.com> Date: Sun, 29 Aug 2021 00:54:59 -0400 Subject: [PATCH 7/9] add `poll_write_vectored` - default `poll_write_vectored` behaves differently of `std::io::LineWriter::write_vectored` - ported from https://github.com/rust-lang/rust/blob/673d0db5e393e9c64897005b470bfeb6d5aec61b/library/std/src/io/buffered/tests.rs - that code has lots of comments, you may wanna check it out - translated tests for `std` 's `line_vectored` - added `inner_poll_write_vectored` to keep code explicit and verbose --- futures-util/src/io/buf_writer.rs | 10 +++++ futures-util/src/io/line_writer.rs | 48 +++++++++++++++++++++++ futures/tests/io_line_writer.rs | 63 ++++++++++++++++++++++++++++++ 3 files changed, 121 insertions(+) diff --git a/futures-util/src/io/buf_writer.rs b/futures-util/src/io/buf_writer.rs index c925b9377f..cb74863ad0 100644 --- a/futures-util/src/io/buf_writer.rs +++ b/futures-util/src/io/buf_writer.rs @@ -128,6 +128,7 @@ impl BufWriter { ptr::copy_nonoverlapping(src, dst, buf_len); this.buf.set_len(old_len + buf_len); } + /// Write directly using `inner`, bypassing buffering pub(super) fn inner_poll_write( self: Pin<&mut Self>, @@ -136,6 +137,15 @@ impl BufWriter { ) -> Poll> { self.project().inner.poll_write(cx, buf) } + + /// Write directly using `inner`, bypassing buffering + pub(super) fn inner_poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + self.project().inner.poll_write_vectored(cx, bufs) + } } impl AsyncWrite for BufWriter { diff --git a/futures-util/src/io/line_writer.rs b/futures-util/src/io/line_writer.rs index ba3e107d82..a37b154190 100644 --- a/futures-util/src/io/line_writer.rs +++ b/futures-util/src/io/line_writer.rs @@ -2,6 +2,7 @@ use super::buf_writer::BufWriter; use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::AsyncWrite; +use futures_io::IoSlice; use pin_project_lite::pin_project; use std::io; use std::pin::Pin; @@ -46,6 +47,7 @@ impl LineWriter { pub fn buffer(&self) -> &[u8] { &self.buf_writer.buffer() } + /// Acquires a reference to the underlying sink or stream that this combinator is /// pulling from. pub fn get_ref(&self) -> &W { @@ -95,6 +97,52 @@ impl AsyncWrite for LineWriter { Poll::Ready(Ok(flushed + buffered)) } + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + let mut this = self.as_mut().project(); + // `is_write_vectored()` is handled in original code, but not in this crate + // see https://github.com/rust-lang/rust/issues/70436 + + let last_newline_buf_idx = bufs + .iter() + .enumerate() + .rev() + .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i)); + let last_newline_buf_idx = match last_newline_buf_idx { + None => { + ready!(self.as_mut().flush_if_completed_line(cx)?); + return self.project().buf_writer.poll_write_vectored(cx, bufs); + } + Some(i) => i, + }; + + ready!(this.buf_writer.as_mut().poll_flush(cx)?); + + let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1); + + let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write_vectored(cx, lines))? }; + if flushed == 0 { + return Poll::Ready(Ok(0)); + } + + let lines_len = lines.iter().map(|buf| buf.len()).sum(); + if flushed < lines_len { + return Poll::Ready(Ok(flushed)); + } + + let buffered: usize = tail + .iter() + .filter(|buf| !buf.is_empty()) + .map(|buf| this.buf_writer.as_mut().write_to_buf(buf)) + .take_while(|&n| n > 0) + .sum(); + + Poll::Ready(Ok(flushed + buffered)) + } + /// Forward to `buf_writer` 's `BufWriter::poll_flush()` fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.as_mut().project().buf_writer.poll_flush(cx) diff --git a/futures/tests/io_line_writer.rs b/futures/tests/io_line_writer.rs index b51ba2ee3d..f91a094db6 100644 --- a/futures/tests/io_line_writer.rs +++ b/futures/tests/io_line_writer.rs @@ -1,4 +1,8 @@ use futures::executor::block_on; +use futures::io::AsyncWrite; +use futures_test::task::panic_context; +use std::io; +use std::{pin::Pin, task::Poll}; use futures::io::{AsyncWriteExt, LineWriter}; @@ -24,3 +28,62 @@ fn line_writer() { block_on(writer.write(&[3, b'\n'])).unwrap(); assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']); } + +#[test] +fn line_vectored() { + let mut line_writer = LineWriter::new(Vec::new()); + let cx = &mut panic_context(); + { + let bufs = &mut [ + io::IoSlice::new(&[]), + io::IoSlice::new(b"\n"), + io::IoSlice::new(&[]), + io::IoSlice::new(b"a"), + ]; + let result = Pin::new(&mut line_writer).poll_write_vectored(cx, bufs); + let result = result.map_err(|e| e.kind()); + assert_eq!(result, Poll::Ready(Ok(2))) + } + assert_eq!(line_writer.get_ref(), b"\n"); + + { + let bufs = &mut [ + io::IoSlice::new(&[]), + io::IoSlice::new(b"b"), + io::IoSlice::new(&[]), + io::IoSlice::new(b"a"), + io::IoSlice::new(&[]), + io::IoSlice::new(b"c"), + ]; + let result = Pin::new(&mut line_writer).poll_write_vectored(cx, bufs); + let result = result.map_err(|e| e.kind()); + assert_eq!(result, Poll::Ready(Ok(3))) + } + assert_eq!(line_writer.get_ref(), b"\n"); + block_on(line_writer.flush()).unwrap(); + assert_eq!(line_writer.get_ref(), b"\nabac"); + { + let res = Pin::new(&mut line_writer).poll_write_vectored(cx, &[]); + let res = res.map_err(|e| e.kind()); + assert_eq!(res, Poll::Ready(Ok(0))); + } + + { + let bufs = &mut [ + io::IoSlice::new(&[]), + io::IoSlice::new(&[]), + io::IoSlice::new(&[]), + io::IoSlice::new(&[]), + ]; + let result = Pin::new(&mut line_writer).poll_write_vectored(cx, bufs); + let result = result.map_err(|e| e.kind()); + assert_eq!(result, Poll::Ready(Ok(0))) + } + { + let result = + Pin::new(&mut line_writer).poll_write_vectored(cx, &[io::IoSlice::new(b"a\nb")]); + let result = result.map_err(|e| e.kind()); + assert_eq!(result, Poll::Ready(Ok(3))) + } + assert_eq!(line_writer.get_ref(), b"\nabaca\nb"); +} From fadf6860b24ffd693722e5098e6eb18fc22c400d Mon Sep 17 00:00:00 2001 From: Felipe Lema <1232306+FelipeLema@users.noreply.github.com> Date: Sun, 29 Aug 2021 01:10:27 -0400 Subject: [PATCH 8/9] fix clippy --- futures-util/src/io/line_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/io/line_writer.rs b/futures-util/src/io/line_writer.rs index a37b154190..71cd668325 100644 --- a/futures-util/src/io/line_writer.rs +++ b/futures-util/src/io/line_writer.rs @@ -45,7 +45,7 @@ impl LineWriter { /// Returns a reference to `buf_writer`'s internally buffered data. pub fn buffer(&self) -> &[u8] { - &self.buf_writer.buffer() + self.buf_writer.buffer() } /// Acquires a reference to the underlying sink or stream that this combinator is From dbb3ec1bf58428eee356d52c5bafa2d0be5830bc Mon Sep 17 00:00:00 2001 From: Felipe Lema <1232306+FelipeLema@users.noreply.github.com> Date: Wed, 1 Sep 2021 22:07:14 -0400 Subject: [PATCH 9/9] simplify testing code --- futures/tests/io_line_writer.rs | 60 ++++++++++++--------------------- 1 file changed, 22 insertions(+), 38 deletions(-) diff --git a/futures/tests/io_line_writer.rs b/futures/tests/io_line_writer.rs index f91a094db6..b483e0ff77 100644 --- a/futures/tests/io_line_writer.rs +++ b/futures/tests/io_line_writer.rs @@ -1,10 +1,6 @@ use futures::executor::block_on; -use futures::io::AsyncWrite; -use futures_test::task::panic_context; -use std::io; -use std::{pin::Pin, task::Poll}; - use futures::io::{AsyncWriteExt, LineWriter}; +use std::io; #[test] fn line_writer() { @@ -32,58 +28,46 @@ fn line_writer() { #[test] fn line_vectored() { let mut line_writer = LineWriter::new(Vec::new()); - let cx = &mut panic_context(); - { - let bufs = &mut [ + assert_eq!( + block_on(line_writer.write_vectored(&[ io::IoSlice::new(&[]), io::IoSlice::new(b"\n"), io::IoSlice::new(&[]), io::IoSlice::new(b"a"), - ]; - let result = Pin::new(&mut line_writer).poll_write_vectored(cx, bufs); - let result = result.map_err(|e| e.kind()); - assert_eq!(result, Poll::Ready(Ok(2))) - } + ])) + .unwrap(), + 2 + ); assert_eq!(line_writer.get_ref(), b"\n"); - { - let bufs = &mut [ + assert_eq!( + block_on(line_writer.write_vectored(&[ io::IoSlice::new(&[]), io::IoSlice::new(b"b"), io::IoSlice::new(&[]), io::IoSlice::new(b"a"), io::IoSlice::new(&[]), io::IoSlice::new(b"c"), - ]; - let result = Pin::new(&mut line_writer).poll_write_vectored(cx, bufs); - let result = result.map_err(|e| e.kind()); - assert_eq!(result, Poll::Ready(Ok(3))) - } + ])) + .unwrap(), + 3 + ); assert_eq!(line_writer.get_ref(), b"\n"); block_on(line_writer.flush()).unwrap(); assert_eq!(line_writer.get_ref(), b"\nabac"); - { - let res = Pin::new(&mut line_writer).poll_write_vectored(cx, &[]); - let res = res.map_err(|e| e.kind()); - assert_eq!(res, Poll::Ready(Ok(0))); - } + assert_eq!(block_on(line_writer.write_vectored(&[])).unwrap(), 0); - { - let bufs = &mut [ + assert_eq!( + block_on(line_writer.write_vectored(&[ io::IoSlice::new(&[]), io::IoSlice::new(&[]), io::IoSlice::new(&[]), io::IoSlice::new(&[]), - ]; - let result = Pin::new(&mut line_writer).poll_write_vectored(cx, bufs); - let result = result.map_err(|e| e.kind()); - assert_eq!(result, Poll::Ready(Ok(0))) - } - { - let result = - Pin::new(&mut line_writer).poll_write_vectored(cx, &[io::IoSlice::new(b"a\nb")]); - let result = result.map_err(|e| e.kind()); - assert_eq!(result, Poll::Ready(Ok(3))) - } + ])) + .unwrap(), + 0 + ); + + assert_eq!(block_on(line_writer.write_vectored(&[io::IoSlice::new(b"a\nb")])).unwrap(), 3); assert_eq!(line_writer.get_ref(), b"\nabaca\nb"); }