From 5cd5228ec7164d5cdaa176999d587e6b19ff0353 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Fri, 25 Sep 2020 16:26:17 +0300 Subject: [PATCH 1/5] io: remove `Poll` from the `AsyncSeek::start_seek` return value Signed-off-by: Zahari Dichev --- tokio-util/src/either.rs | 12 ++++----- tokio/src/fs/file.rs | 45 ++++++++++++++++--------------- tokio/src/io/async_seek.rs | 54 ++++++++++++++++++++++---------------- tokio/src/io/seek.rs | 8 +++--- 4 files changed, 65 insertions(+), 54 deletions(-) diff --git a/tokio-util/src/either.rs b/tokio-util/src/either.rs index 3c749c1b49d..88db84a2a5d 100644 --- a/tokio-util/src/either.rs +++ b/tokio-util/src/either.rs @@ -125,12 +125,12 @@ where L: AsyncSeek, R: AsyncSeek, { - fn start_seek( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - position: SeekFrom, - ) -> Poll> { - delegate_call!(self.start_seek(cx, position)) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + delegate_call!(self.poll_ready(cx)) + } + + fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> Result<()> { + delegate_call!(self.start_seek(position)) } fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 319c2c7fc9f..f6b6e4416ff 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -537,13 +537,31 @@ impl AsyncRead for File { } impl AsyncSeek for File { - fn start_seek( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut pos: SeekFrom, - ) -> Poll> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { match self.state { + Idle(_) => return Poll::Ready(Ok(())), + Busy(ref mut rx) => { + let (op, buf) = ready!(Pin::new(rx).poll(cx))?; + self.state = Idle(Some(buf)); + match op { + Operation::Read(_) => {} + Operation::Write(Err(e)) => { + assert!(self.last_write_err.is_none()); + self.last_write_err = Some(e.kind()); + } + Operation::Write(_) => {} + Operation::Seek(_) => {} + } + } + } + } + } + + fn start_seek(mut self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> { + loop { + match self.state { + Busy(_) => panic!("must wait for poll_ready before calling start_seek"), Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); @@ -562,22 +580,7 @@ impl AsyncSeek for File { let res = (&*std).seek(pos); (Operation::Seek(res), buf) })); - - return Ready(Ok(())); - } - Busy(ref mut rx) => { - let (op, buf) = ready!(Pin::new(rx).poll(cx))?; - self.state = Idle(Some(buf)); - - match op { - Operation::Read(_) => {} - Operation::Write(Err(e)) => { - assert!(self.last_write_err.is_none()); - self.last_write_err = Some(e.kind()); - } - Operation::Write(_) => {} - Operation::Seek(_) => {} - } + return Ok(()); } } } diff --git a/tokio/src/io/async_seek.rs b/tokio/src/io/async_seek.rs index 32ed0a22ab9..5b4ae88794e 100644 --- a/tokio/src/io/async_seek.rs +++ b/tokio/src/io/async_seek.rs @@ -16,18 +16,26 @@ use std::task::{Context, Poll}; /// [`Seek::seek`]: std::io::Seek::seek() /// [`AsyncSeekExt`]: crate::io::AsyncSeekExt pub trait AsyncSeek { + /// Ensures that the `AsyncSeek` is ready for `start_seek` to be called. + /// + /// This method must be called and return `Poll::Ready(Ok(()))` prior to + /// each call to `start_seek`. + /// + /// If this method returns `Poll::Pending`, the current task + /// is registered to be notified (via `cx.waker().wake_by_ref()`) when `poll_ready` + /// should be called again. + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + /// Attempts to seek to an offset, in bytes, in a stream. /// /// A seek beyond the end of a stream is allowed, but behavior is defined /// by the implementation. /// + /// Each call to this function must be preceded by a successful call to + /// `poll_ready` which returned `Poll::Ready(Ok(()))`. /// If this function returns successfully, then the job has been submitted. /// To find out when it completes, call `poll_complete`. - fn start_seek( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - position: SeekFrom, - ) -> Poll>; + fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()>; /// Waits for a seek operation to complete. /// @@ -47,12 +55,12 @@ pub trait AsyncSeek { macro_rules! deref_async_seek { () => { - fn start_seek( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - pos: SeekFrom, - ) -> Poll> { - Pin::new(&mut **self).start_seek(cx, pos) + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self).poll_ready(cx) + } + + fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + Pin::new(&mut **self).start_seek(pos) } fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -74,12 +82,12 @@ where P: DerefMut + Unpin, P::Target: AsyncSeek, { - fn start_seek( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - pos: SeekFrom, - ) -> Poll> { - self.get_mut().as_mut().start_seek(cx, pos) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().as_mut().poll_ready(cx) + } + + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + self.get_mut().as_mut().start_seek(pos) } fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -88,12 +96,12 @@ where } impl + Unpin> AsyncSeek for io::Cursor { - fn start_seek( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - pos: SeekFrom, - ) -> Poll> { - Poll::Ready(io::Seek::seek(&mut *self, pos).map(drop)) + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + io::Seek::seek(&mut *self, pos).map(drop) } fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(self.get_mut().position())) diff --git a/tokio/src/io/seek.rs b/tokio/src/io/seek.rs index 8f071167f6c..001fa212e33 100644 --- a/tokio/src/io/seek.rs +++ b/tokio/src/io/seek.rs @@ -32,14 +32,14 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = &mut *self; + ready!(Pin::new(&mut me.seek).poll_ready(cx))?; match me.pos { - Some(pos) => match Pin::new(&mut me.seek).start_seek(cx, pos) { - Poll::Ready(Ok(())) => { + Some(pos) => match Pin::new(&mut me.seek).start_seek(pos) { + Ok(()) => { me.pos = None; Pin::new(&mut me.seek).poll_complete(cx) } - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Pending => Poll::Pending, + Err(e) => Poll::Ready(Err(e)), }, None => Pin::new(&mut me.seek).poll_complete(cx), } From b672c01372a749eb584e1030cfa328b65b3442b0 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 6 Oct 2020 11:26:19 +0000 Subject: [PATCH 2/5] Remove poll_ready --- tokio-util/src/either.rs | 4 ---- tokio/src/fs/file.rs | 43 ++++++++++++++++---------------------- tokio/src/io/async_seek.rs | 24 --------------------- tokio/src/io/seek.rs | 17 ++++++++------- 4 files changed, 28 insertions(+), 60 deletions(-) diff --git a/tokio-util/src/either.rs b/tokio-util/src/either.rs index 88db84a2a5d..f5246af27b2 100644 --- a/tokio-util/src/either.rs +++ b/tokio-util/src/either.rs @@ -125,10 +125,6 @@ where L: AsyncSeek, R: AsyncSeek, { - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - delegate_call!(self.poll_ready(cx)) - } - fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> Result<()> { delegate_call!(self.start_seek(position)) } diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index f6b6e4416ff..5236ab74b68 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -86,6 +86,8 @@ pub struct File { /// error is observed while performing a read, it is saved until the next /// write / flush call. last_write_err: Option, + + pos: u64, } #[derive(Debug)] @@ -199,6 +201,7 @@ impl File { std: Arc::new(std), state: State::Idle(Some(Buf::with_capacity(0))), last_write_err: None, + pos: 0, } } @@ -332,7 +335,10 @@ impl File { self.state = Idle(Some(buf)); match op { - Operation::Seek(res) => res.map(|_| ()), + Operation::Seek(res) => res.map(|pos| { + self.pos = pos; + () + }), _ => unreachable!(), } } @@ -524,9 +530,12 @@ impl AsyncRead for File { self.last_write_err = Some(e.kind()); self.state = Idle(Some(buf)); } - Operation::Seek(_) => { + Operation::Seek(result) => { assert!(buf.is_empty()); self.state = Idle(Some(buf)); + if let Ok(pos) = result { + self.pos = pos; + } continue; } } @@ -537,27 +546,6 @@ impl AsyncRead for File { } impl AsyncSeek for File { - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - match self.state { - Idle(_) => return Poll::Ready(Ok(())), - Busy(ref mut rx) => { - let (op, buf) = ready!(Pin::new(rx).poll(cx))?; - self.state = Idle(Some(buf)); - match op { - Operation::Read(_) => {} - Operation::Write(Err(e)) => { - assert!(self.last_write_err.is_none()); - self.last_write_err = Some(e.kind()); - } - Operation::Write(_) => {} - Operation::Seek(_) => {} - } - } - } - } - } - fn start_seek(mut self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> { loop { match self.state { @@ -589,7 +577,7 @@ impl AsyncSeek for File { fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { match self.state { - Idle(_) => panic!("must call start_seek before calling poll_complete"), + Idle(_) => return Poll::Ready(Ok(self.pos)), Busy(ref mut rx) => { let (op, buf) = ready!(Pin::new(rx).poll(cx))?; self.state = Idle(Some(buf)); @@ -601,7 +589,12 @@ impl AsyncSeek for File { self.last_write_err = Some(e.kind()); } Operation::Write(_) => {} - Operation::Seek(res) => return Ready(res), + Operation::Seek(res) => { + if let Ok(pos) = res { + self.pos = pos; + } + return Ready(res); + } } } } diff --git a/tokio/src/io/async_seek.rs b/tokio/src/io/async_seek.rs index 5b4ae88794e..0a792b8ba0a 100644 --- a/tokio/src/io/async_seek.rs +++ b/tokio/src/io/async_seek.rs @@ -16,23 +16,11 @@ use std::task::{Context, Poll}; /// [`Seek::seek`]: std::io::Seek::seek() /// [`AsyncSeekExt`]: crate::io::AsyncSeekExt pub trait AsyncSeek { - /// Ensures that the `AsyncSeek` is ready for `start_seek` to be called. - /// - /// This method must be called and return `Poll::Ready(Ok(()))` prior to - /// each call to `start_seek`. - /// - /// If this method returns `Poll::Pending`, the current task - /// is registered to be notified (via `cx.waker().wake_by_ref()`) when `poll_ready` - /// should be called again. - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - /// Attempts to seek to an offset, in bytes, in a stream. /// /// A seek beyond the end of a stream is allowed, but behavior is defined /// by the implementation. /// - /// Each call to this function must be preceded by a successful call to - /// `poll_ready` which returned `Poll::Ready(Ok(()))`. /// If this function returns successfully, then the job has been submitted. /// To find out when it completes, call `poll_complete`. fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()>; @@ -55,10 +43,6 @@ pub trait AsyncSeek { macro_rules! deref_async_seek { () => { - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut **self).poll_ready(cx) - } - fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { Pin::new(&mut **self).start_seek(pos) } @@ -82,10 +66,6 @@ where P: DerefMut + Unpin, P::Target: AsyncSeek, { - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().as_mut().poll_ready(cx) - } - fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { self.get_mut().as_mut().start_seek(pos) } @@ -96,10 +76,6 @@ where } impl + Unpin> AsyncSeek for io::Cursor { - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { io::Seek::seek(&mut *self, pos).map(drop) } diff --git a/tokio/src/io/seek.rs b/tokio/src/io/seek.rs index 6bc5cb22035..e64205d9cf6 100644 --- a/tokio/src/io/seek.rs +++ b/tokio/src/io/seek.rs @@ -39,15 +39,18 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); - ready!(Pin::new(&mut *me.seek).poll_ready(cx))?; match me.pos { - Some(pos) => match Pin::new(&mut *me.seek).start_seek(*pos) { - Ok(()) => { - *me.pos = None; - Pin::new(&mut *me.seek).poll_complete(cx) + Some(pos) => { + // ensure no seek in progress + ready!(Pin::new(&mut *me.seek).poll_complete(cx))?; + match Pin::new(&mut *me.seek).start_seek(*pos) { + Ok(()) => { + *me.pos = None; + Pin::new(&mut *me.seek).poll_complete(cx) + } + Err(e) => Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), - }, + } None => Pin::new(&mut *me.seek).poll_complete(cx), } } From d5a79eacb8a20ed545ae22d407ae730f8a5705f7 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 6 Oct 2020 11:31:59 +0000 Subject: [PATCH 3/5] clippy + docs --- tokio/src/fs/file.rs | 1 - tokio/src/io/async_seek.rs | 15 ++++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 5236ab74b68..7efc65952d6 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -337,7 +337,6 @@ impl File { match op { Operation::Seek(res) => res.map(|pos| { self.pos = pos; - () }), _ => unreachable!(), } diff --git a/tokio/src/io/async_seek.rs b/tokio/src/io/async_seek.rs index 0a792b8ba0a..ee32eeb7028 100644 --- a/tokio/src/io/async_seek.rs +++ b/tokio/src/io/async_seek.rs @@ -23,21 +23,26 @@ pub trait AsyncSeek { /// /// If this function returns successfully, then the job has been submitted. /// To find out when it completes, call `poll_complete`. + /// + /// # Errors + /// + /// This function can return [`ErrorKind::Other`] in case there is another + /// seek in progress. To avoid this, it is advisable that any call to + /// `start_seek` is preceded by a call to `poll_complete` to ensure all + /// pending seeks have completed. fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()>; /// Waits for a seek operation to complete. /// /// If the seek operation completed successfully, /// this method returns the new position from the start of the stream. - /// That position can be used later with [`SeekFrom::Start`]. + /// That position can be used later with [`SeekFrom::Start`]. Repeatedly + /// calling this function without calling `start_seek` might return the + /// same result. /// /// # Errors /// /// Seeking to a negative offset is considered an error. - /// - /// # Panics - /// - /// Calling this method without calling `start_seek` first is an error. fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; } From 7c3c0d02bd7b1a3dfac014b8e6bfccd7d9d546bb Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 6 Oct 2020 12:16:24 +0000 Subject: [PATCH 4/5] fix docs link --- tokio/src/io/async_seek.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/io/async_seek.rs b/tokio/src/io/async_seek.rs index ee32eeb7028..bd7a992e4da 100644 --- a/tokio/src/io/async_seek.rs +++ b/tokio/src/io/async_seek.rs @@ -26,9 +26,9 @@ pub trait AsyncSeek { /// /// # Errors /// - /// This function can return [`ErrorKind::Other`] in case there is another - /// seek in progress. To avoid this, it is advisable that any call to - /// `start_seek` is preceded by a call to `poll_complete` to ensure all + /// This function can return [`io::ErrorKind::Other`] in case there is + /// another seek in progress. To avoid this, it is advisable that any call + /// to `start_seek` is preceded by a call to `poll_complete` to ensure all /// pending seeks have completed. fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()>; From dad4b74d09e3ff47a4159b72c0f83d7e4428b812 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Wed, 7 Oct 2020 06:31:45 +0000 Subject: [PATCH 5/5] update panic message --- tokio/src/fs/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 7efc65952d6..9556a22fe40 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -548,7 +548,7 @@ impl AsyncSeek for File { fn start_seek(mut self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> { loop { match self.state { - Busy(_) => panic!("must wait for poll_ready before calling start_seek"), + Busy(_) => panic!("must wait for poll_complete before calling start_seek"), Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap();