Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: remove Poll from the AsyncSeek::start_seek return value #2885

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 2 additions & 6 deletions tokio-util/src/either.rs
Expand Up @@ -125,12 +125,8 @@ where
L: AsyncSeek,
R: AsyncSeek,
{
fn start_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
position: SeekFrom,
) -> Poll<Result<()>> {
delegate_call!(self.start_seek(cx, position))
fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> Result<()> {
delegate_call!(self.start_seek(position))
}

fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<u64>> {
Expand Down
45 changes: 20 additions & 25 deletions tokio/src/fs/file.rs
Expand Up @@ -86,6 +86,8 @@ pub struct File {
/// error is observed while performing a read, it is saved until the next
/// write / flush call.
last_write_err: Option<io::ErrorKind>,

pos: u64,
}

#[derive(Debug)]
Expand Down Expand Up @@ -199,6 +201,7 @@ impl File {
std: Arc::new(std),
state: State::Idle(Some(Buf::with_capacity(0))),
last_write_err: None,
pos: 0,
}
}

Expand Down Expand Up @@ -332,7 +335,9 @@ impl File {
self.state = Idle(Some(buf));

match op {
Operation::Seek(res) => res.map(|_| ()),
Operation::Seek(res) => res.map(|pos| {
self.pos = pos;
}),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -524,9 +529,12 @@ impl AsyncRead for File {
self.last_write_err = Some(e.kind());
self.state = Idle(Some(buf));
}
Operation::Seek(_) => {
Operation::Seek(result) => {
assert!(buf.is_empty());
self.state = Idle(Some(buf));
if let Ok(pos) = result {
self.pos = pos;
}
continue;
}
}
Expand All @@ -537,13 +545,10 @@ impl AsyncRead for File {
}

impl AsyncSeek for File {
fn start_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut pos: SeekFrom,
) -> Poll<io::Result<()>> {
fn start_seek(mut self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
loop {
match self.state {
Busy(_) => panic!("must wait for poll_ready before calling start_seek"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this panic message is out of date?

Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();

Expand All @@ -562,22 +567,7 @@ impl AsyncSeek for File {
let res = (&*std).seek(pos);
(Operation::Seek(res), buf)
}));

return Ready(Ok(()));
}
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
self.state = Idle(Some(buf));

match op {
Operation::Read(_) => {}
Operation::Write(Err(e)) => {
assert!(self.last_write_err.is_none());
self.last_write_err = Some(e.kind());
}
Operation::Write(_) => {}
Operation::Seek(_) => {}
}
return Ok(());
}
}
}
Expand All @@ -586,7 +576,7 @@ impl AsyncSeek for File {
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
loop {
match self.state {
Idle(_) => panic!("must call start_seek before calling poll_complete"),
Idle(_) => return Poll::Ready(Ok(self.pos)),
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
self.state = Idle(Some(buf));
Expand All @@ -598,7 +588,12 @@ impl AsyncSeek for File {
self.last_write_err = Some(e.kind());
}
Operation::Write(_) => {}
Operation::Seek(res) => return Ready(res),
Operation::Seek(res) => {
if let Ok(pos) = res {
self.pos = pos;
}
return Ready(res);
}
}
}
}
Expand Down
45 changes: 17 additions & 28 deletions tokio/src/io/async_seek.rs
Expand Up @@ -23,36 +23,33 @@ pub trait AsyncSeek {
///
/// If this function returns successfully, then the job has been submitted.
/// To find out when it completes, call `poll_complete`.
fn start_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
position: SeekFrom,
) -> Poll<io::Result<()>>;
///
/// # Errors
///
/// This function can return [`io::ErrorKind::Other`] in case there is
/// another seek in progress. To avoid this, it is advisable that any call
/// to `start_seek` is preceded by a call to `poll_complete` to ensure all
/// pending seeks have completed.
fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()>;

/// Waits for a seek operation to complete.
///
/// If the seek operation completed successfully,
/// this method returns the new position from the start of the stream.
/// That position can be used later with [`SeekFrom::Start`].
/// That position can be used later with [`SeekFrom::Start`]. Repeatedly
/// calling this function without calling `start_seek` might return the
/// same result.
///
/// # Errors
///
/// Seeking to a negative offset is considered an error.
///
/// # Panics
///
/// Calling this method without calling `start_seek` first is an error.
fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>;
}

macro_rules! deref_async_seek {
() => {
fn start_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<()>> {
Pin::new(&mut **self).start_seek(cx, pos)
fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
Pin::new(&mut **self).start_seek(pos)
}

fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
Expand All @@ -74,12 +71,8 @@ where
P: DerefMut + Unpin,
P::Target: AsyncSeek,
{
fn start_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<()>> {
self.get_mut().as_mut().start_seek(cx, pos)
fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
self.get_mut().as_mut().start_seek(pos)
}

fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
Expand All @@ -88,12 +81,8 @@ where
}

impl<T: AsRef<[u8]> + Unpin> AsyncSeek for io::Cursor<T> {
fn start_seek(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<()>> {
Poll::Ready(io::Seek::seek(&mut *self, pos).map(drop))
fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
io::Seek::seek(&mut *self, pos).map(drop)
}
fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> {
Poll::Ready(Ok(self.get_mut().position()))
Expand Down
17 changes: 10 additions & 7 deletions tokio/src/io/seek.rs
Expand Up @@ -40,14 +40,17 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
match me.pos {
Some(pos) => match Pin::new(&mut *me.seek).start_seek(cx, *pos) {
Poll::Ready(Ok(())) => {
*me.pos = None;
Pin::new(&mut *me.seek).poll_complete(cx)
Some(pos) => {
// ensure no seek in progress
ready!(Pin::new(&mut *me.seek).poll_complete(cx))?;
match Pin::new(&mut *me.seek).start_seek(*pos) {
Ok(()) => {
*me.pos = None;
Pin::new(&mut *me.seek).poll_complete(cx)
}
Err(e) => Poll::Ready(Err(e)),
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
},
}
None => Pin::new(&mut *me.seek).poll_complete(cx),
}
}
Expand Down