From 1a6f7eca84707192eda646e2e0f72b22c7fe712c Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 28 Aug 2021 22:46:03 +0900 Subject: [PATCH] Add BufReader::seek_relative (#2489) --- futures-util/src/io/buf_reader.rs | 69 +++++++ futures-util/src/io/mod.rs | 2 +- futures/tests/auto_traits.rs | 6 + futures/tests/io_buf_reader.rs | 292 ++++++++++++++++++++---------- futures/tests/stream_peekable.rs | 5 + 5 files changed, 279 insertions(+), 95 deletions(-) diff --git a/futures-util/src/io/buf_reader.rs b/futures-util/src/io/buf_reader.rs index 5931edc1b2..2d585a9eb6 100644 --- a/futures-util/src/io/buf_reader.rs +++ b/futures-util/src/io/buf_reader.rs @@ -1,4 +1,5 @@ use super::DEFAULT_BUF_SIZE; +use futures_core::future::Future; use futures_core::ready; use futures_core::task::{Context, Poll}; #[cfg(feature = "read-initializer")] @@ -73,6 +74,40 @@ impl BufReader { } } +impl BufReader { + /// Seeks relative to the current position. If the new position lies within the buffer, + /// the buffer will not be flushed, allowing for more efficient seeks. + /// This method does not return the location of the underlying reader, so the caller + /// must track this information themselves if it is required. + pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> { + SeeKRelative { inner: self, offset, first: true } + } + + /// Attempts to seek relative to the current position. If the new position lies within the buffer, + /// the buffer will not be flushed, allowing for more efficient seeks. + /// This method does not return the location of the underlying reader, so the caller + /// must track this information themselves if it is required. + pub fn poll_seek_relative( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + offset: i64, + ) -> Poll> { + let pos = self.pos as u64; + if offset < 0 { + if let Some(new_pos) = pos.checked_sub((-offset) as u64) { + *self.project().pos = new_pos as usize; + return Poll::Ready(Ok(())); + } + } else if let Some(new_pos) = pos.checked_add(offset as u64) { + if new_pos <= self.cap as u64 { + *self.project().pos = new_pos as usize; + return Poll::Ready(Ok(())); + } + } + self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ())) + } +} + impl AsyncRead for BufReader { fn poll_read( mut self: Pin<&mut Self>, @@ -163,6 +198,10 @@ impl AsyncSeek for BufReader { /// `.into_inner()` immediately after a seek yields the underlying reader /// at the same position. /// + /// To seek without discarding the internal buffer, use + /// [`BufReader::seek_relative`](BufReader::seek_relative) or + /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative). + /// /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details. /// /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` @@ -200,3 +239,33 @@ impl AsyncSeek for BufReader { Poll::Ready(Ok(result)) } } + +/// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct SeeKRelative<'a, R> { + inner: Pin<&'a mut BufReader>, + offset: i64, + first: bool, +} + +impl Future for SeeKRelative<'_, R> +where + R: AsyncRead + AsyncSeek, +{ + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let offset = self.offset; + if self.first { + self.first = false; + self.inner.as_mut().poll_seek_relative(cx, offset) + } else { + self.inner + .as_mut() + .as_mut() + .poll_seek(cx, SeekFrom::Current(offset)) + .map(|res| res.map(|_| ())) + } + } +} diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index b96223d1c1..16cf5a7bab 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -56,7 +56,7 @@ mod allow_std; pub use self::allow_std::AllowStdIo; mod buf_reader; -pub use self::buf_reader::BufReader; +pub use self::buf_reader::{BufReader, SeeKRelative}; mod buf_writer; pub use self::buf_writer::BufWriter; diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 63ac44fcfb..8021cc45e6 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -817,6 +817,12 @@ pub mod io { assert_impl!(Seek<'_, ()>: Unpin); assert_not_impl!(Seek<'_, PhantomPinned>: Unpin); + assert_impl!(SeeKRelative<'_, ()>: Send); + assert_not_impl!(SeeKRelative<'_, *const ()>: Send); + assert_impl!(SeeKRelative<'_, ()>: Sync); + assert_not_impl!(SeeKRelative<'_, *const ()>: Sync); + assert_impl!(SeeKRelative<'_, PhantomPinned>: Unpin); + assert_impl!(Sink: Send); assert_impl!(Sink: Sync); assert_impl!(Sink: Unpin); diff --git a/futures/tests/io_buf_reader.rs b/futures/tests/io_buf_reader.rs index d60df879c2..717297ccea 100644 --- a/futures/tests/io_buf_reader.rs +++ b/futures/tests/io_buf_reader.rs @@ -2,25 +2,17 @@ use futures::executor::block_on; use futures::future::{Future, FutureExt}; use futures::io::{ AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, - BufReader, Cursor, SeekFrom, + BufReader, SeekFrom, }; +use futures::pin_mut; use futures::task::{Context, Poll}; use futures_test::task::noop_context; +use pin_project::pin_project; use std::cmp; use std::io; use std::pin::Pin; -macro_rules! run_fill_buf { - ($reader:expr) => {{ - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) { - break x; - } - } - }}; -} - +// helper for maybe_pending_* tests fn run(mut f: F) -> F::Output { let mut cx = noop_context(); loop { @@ -30,6 +22,49 @@ fn run(mut f: F) -> F::Output { } } +// https://github.com/rust-lang/futures-rs/pull/2489#discussion_r697865719 +#[pin_project(!Unpin)] +struct Cursor { + #[pin] + inner: futures::io::Cursor, +} + +impl Cursor { + fn new(inner: T) -> Self { + Self { inner: futures::io::Cursor::new(inner) } + } +} + +impl AsyncRead for Cursor<&[u8]> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.project().inner.poll_read(cx, buf) + } +} + +impl AsyncBufRead for Cursor<&[u8]> { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().inner.consume(amt) + } +} + +impl AsyncSeek for Cursor<&[u8]> { + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + self.project().inner.poll_seek(cx, pos) + } +} + struct MaybePending<'a> { inner: &'a [u8], ready_read: bool, @@ -80,54 +115,119 @@ impl AsyncBufRead for MaybePending<'_> { #[test] fn test_buffered_reader() { - let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; - let mut reader = BufReader::with_capacity(2, inner); - - let mut buf = [0, 0, 0]; - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 3); - assert_eq!(buf, [5, 6, 7]); - assert_eq!(reader.buffer(), []); - - let mut buf = [0, 0]; - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 2); - assert_eq!(buf, [0, 1]); - assert_eq!(reader.buffer(), []); - - let mut buf = [0]; - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 1); - assert_eq!(buf, [2]); - assert_eq!(reader.buffer(), [3]); + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, inner); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + }); +} - let mut buf = [0, 0, 0]; - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 1); - assert_eq!(buf, [3, 0, 0]); - assert_eq!(reader.buffer(), []); +#[test] +fn test_buffered_reader_seek() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(2, Cursor::new(inner)); + pin_mut!(reader); + + assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1, 2][..]); + reader.as_mut().consume(1); + assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3); + }); +} - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 1); - assert_eq!(buf, [4, 0, 0]); - assert_eq!(reader.buffer(), []); +#[test] +fn test_buffered_reader_seek_relative() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(2, Cursor::new(inner)); + pin_mut!(reader); + + assert!(reader.as_mut().seek_relative(3).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.as_mut().seek_relative(0).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.as_mut().seek_relative(1).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1][..]); + assert!(reader.as_mut().seek_relative(-1).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.as_mut().seek_relative(2).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[2, 3][..]); + }); +} - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); +#[test] +fn test_buffered_reader_invalidated_after_read() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(3, Cursor::new(inner)); + pin_mut!(reader); + + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]); + reader.as_mut().consume(3); + + let mut buffer = [0, 0, 0, 0, 0]; + assert_eq!(reader.read(&mut buffer).await.unwrap(), 5); + assert_eq!(buffer, [0, 1, 2, 3, 4]); + + assert!(reader.as_mut().seek_relative(-2).await.is_ok()); + let mut buffer = [0, 0]; + assert_eq!(reader.read(&mut buffer).await.unwrap(), 2); + assert_eq!(buffer, [3, 4]); + }); } #[test] -fn test_buffered_reader_seek() { - let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; - let mut reader = BufReader::with_capacity(2, Cursor::new(inner)); - - assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3)); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); - assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); - assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4)); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..])); - Pin::new(&mut reader).consume(1); - assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3)); +fn test_buffered_reader_invalidated_after_seek() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(3, Cursor::new(inner)); + pin_mut!(reader); + + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]); + reader.as_mut().consume(3); + + assert!(reader.seek(SeekFrom::Current(5)).await.is_ok()); + + assert!(reader.as_mut().seek_relative(-2).await.is_ok()); + let mut buffer = [0, 0]; + assert_eq!(reader.read(&mut buffer).await.unwrap(), 2); + assert_eq!(buffer, [3, 4]); + }); } #[test] @@ -156,24 +256,27 @@ fn test_buffered_reader_seek_underflow() { self.pos = self.pos.wrapping_add(n as u64); } SeekFrom::End(n) => { - self.pos = u64::max_value().wrapping_add(n as u64); + self.pos = u64::MAX.wrapping_add(n as u64); } } Ok(self.pos) } } - let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 })); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..])); - assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value() - 5)); - assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5)); - // the following seek will require two underlying seeks - let expected = 9_223_372_036_854_775_802; - assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected)); - assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5)); - // seeking to 0 should empty the buffer. - assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected)); - assert_eq!(reader.get_ref().get_ref().pos, expected); + block_on(async { + let reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 })); + pin_mut!(reader); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1, 2, 3, 4][..]); + assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5); + assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5); + // the following seek will require two underlying seeks + let expected = 9_223_372_036_854_775_802; + assert_eq!(reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(), expected); + assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5); + // seeking to 0 should empty the buffer. + assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected); + assert_eq!(reader.get_ref().get_ref().pos, expected); + }); } #[test] @@ -193,16 +296,18 @@ fn test_short_reads() { } } - let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] }; - let mut reader = BufReader::new(AllowStdIo::new(inner)); - let mut buf = [0, 0]; - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); + block_on(async { + let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] }; + let mut reader = BufReader::new(AllowStdIo::new(inner)); + let mut buf = [0, 0]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 1); + assert_eq!(reader.read(&mut buf).await.unwrap(), 2); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 1); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + }); } #[test] @@ -263,7 +368,9 @@ fn maybe_pending_buf_read() { // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 #[test] fn maybe_pending_seek() { + #[pin_project] struct MaybePendingSeek<'a> { + #[pin] inner: Cursor<&'a [u8]>, ready: bool, } @@ -276,25 +383,21 @@ fn maybe_pending_seek() { impl AsyncRead for MaybePendingSeek<'_> { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - Pin::new(&mut self.inner).poll_read(cx, buf) + self.project().inner.poll_read(cx, buf) } } impl AsyncBufRead for MaybePendingSeek<'_> { - fn poll_fill_buf( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let this: *mut Self = &mut *self as *mut _; - Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx) + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_fill_buf(cx) } - fn consume(mut self: Pin<&mut Self>, amt: usize) { - Pin::new(&mut self.inner).consume(amt) + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().inner.consume(amt) } } @@ -305,24 +408,25 @@ fn maybe_pending_seek() { pos: SeekFrom, ) -> Poll> { if self.ready { - self.ready = false; - Pin::new(&mut self.inner).poll_seek(cx, pos) + *self.as_mut().project().ready = false; + self.project().inner.poll_seek(cx, pos) } else { - self.ready = true; + *self.project().ready = true; Poll::Pending } } } let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; - let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner)); + let reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner)); + pin_mut!(reader); assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3)); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); - assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); + assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..])); + assert_eq!(run(reader.seek(SeekFrom::Current(i64::MIN))).ok(), None); + assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..])); assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4)); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..])); + assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[1, 2][..])); Pin::new(&mut reader).consume(1); assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3)); } diff --git a/futures/tests/stream_peekable.rs b/futures/tests/stream_peekable.rs index 2af20f4fa9..153fcc25b4 100644 --- a/futures/tests/stream_peekable.rs +++ b/futures/tests/stream_peekable.rs @@ -9,6 +9,11 @@ fn peekable() { pin_mut!(peekable); assert_eq!(peekable.as_mut().peek().await, Some(&1u8)); assert_eq!(peekable.collect::>().await, vec![1, 2, 3]); + + let s = stream::once(async { 1 }).peekable(); + pin_mut!(s); + assert_eq!(s.as_mut().peek().await, Some(&1u8)); + assert_eq!(s.collect::>().await, vec![1]); }); }