Skip to content

Commit

Permalink
Add BufReader::seek_relative (#2489)
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Aug 30, 2021
1 parent 8b39aa1 commit efcdda8
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 95 deletions.
69 changes: 69 additions & 0 deletions futures-util/src/io/buf_reader.rs
@@ -1,4 +1,5 @@
use super::DEFAULT_BUF_SIZE;
use futures_core::future::Future;
use futures_core::ready;
use futures_core::task::{Context, Poll};
#[cfg(feature = "read-initializer")]
Expand Down Expand Up @@ -73,6 +74,40 @@ impl<R: AsyncRead> BufReader<R> {
}
}

impl<R: AsyncRead + AsyncSeek> BufReader<R> {
/// Seeks relative to the current position. If the new position lies within the buffer,
/// the buffer will not be flushed, allowing for more efficient seeks.
/// This method does not return the location of the underlying reader, so the caller
/// must track this information themselves if it is required.
pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> {
SeeKRelative { inner: self, offset, first: true }
}

/// Attempts to seek relative to the current position. If the new position lies within the buffer,
/// the buffer will not be flushed, allowing for more efficient seeks.
/// This method does not return the location of the underlying reader, so the caller
/// must track this information themselves if it is required.
pub fn poll_seek_relative(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
offset: i64,
) -> Poll<io::Result<()>> {
let pos = self.pos as u64;
if offset < 0 {
if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
*self.project().pos = new_pos as usize;
return Poll::Ready(Ok(()));
}
} else if let Some(new_pos) = pos.checked_add(offset as u64) {
if new_pos <= self.cap as u64 {
*self.project().pos = new_pos as usize;
return Poll::Ready(Ok(()));
}
}
self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
}
}

impl<R: AsyncRead> AsyncRead for BufReader<R> {
fn poll_read(
mut self: Pin<&mut Self>,
Expand Down Expand Up @@ -163,6 +198,10 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
/// `.into_inner()` immediately after a seek yields the underlying reader
/// at the same position.
///
/// To seek without discarding the internal buffer, use
/// [`BufReader::seek_relative`](BufReader::seek_relative) or
/// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
///
/// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
///
/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
Expand Down Expand Up @@ -200,3 +239,33 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
Poll::Ready(Ok(result))
}
}

/// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct SeeKRelative<'a, R> {
inner: Pin<&'a mut BufReader<R>>,
offset: i64,
first: bool,
}

impl<R> Future for SeeKRelative<'_, R>
where
R: AsyncRead + AsyncSeek,
{
type Output = io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let offset = self.offset;
if self.first {
self.first = false;
self.inner.as_mut().poll_seek_relative(cx, offset)
} else {
self.inner
.as_mut()
.as_mut()
.poll_seek(cx, SeekFrom::Current(offset))
.map(|res| res.map(|_| ()))
}
}
}
2 changes: 1 addition & 1 deletion futures-util/src/io/mod.rs
Expand Up @@ -56,7 +56,7 @@ mod allow_std;
pub use self::allow_std::AllowStdIo;

mod buf_reader;
pub use self::buf_reader::BufReader;
pub use self::buf_reader::{BufReader, SeeKRelative};

mod buf_writer;
pub use self::buf_writer::BufWriter;
Expand Down
6 changes: 6 additions & 0 deletions futures/tests/auto_traits.rs
Expand Up @@ -817,6 +817,12 @@ pub mod io {
assert_impl!(Seek<'_, ()>: Unpin);
assert_not_impl!(Seek<'_, PhantomPinned>: Unpin);

assert_impl!(SeeKRelative<'_, ()>: Send);
assert_not_impl!(SeeKRelative<'_, *const ()>: Send);
assert_impl!(SeeKRelative<'_, ()>: Sync);
assert_not_impl!(SeeKRelative<'_, *const ()>: Sync);
assert_impl!(SeeKRelative<'_, PhantomPinned>: Unpin);

assert_impl!(Sink: Send);
assert_impl!(Sink: Sync);
assert_impl!(Sink: Unpin);
Expand Down

0 comments on commit efcdda8

Please sign in to comment.