Skip to content

Commit

Permalink
io: remove Poll from the AsyncSeek::start_seek return value (#2885)
Browse files Browse the repository at this point in the history
  • Loading branch information
zaharidichev committed Oct 8, 2020
1 parent d94ab62 commit 43bd11b
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 66 deletions.
8 changes: 2 additions & 6 deletions tokio-util/src/either.rs
Expand Up @@ -125,12 +125,8 @@ where
L: AsyncSeek,
R: AsyncSeek,
{
fn start_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
position: SeekFrom,
) -> Poll<Result<()>> {
delegate_call!(self.start_seek(cx, position))
fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> Result<()> {
delegate_call!(self.start_seek(position))
}

fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<u64>> {
Expand Down
45 changes: 20 additions & 25 deletions tokio/src/fs/file.rs
Expand Up @@ -86,6 +86,8 @@ pub struct File {
/// error is observed while performing a read, it is saved until the next
/// write / flush call.
last_write_err: Option<io::ErrorKind>,

pos: u64,
}

#[derive(Debug)]
Expand Down Expand Up @@ -199,6 +201,7 @@ impl File {
std: Arc::new(std),
state: State::Idle(Some(Buf::with_capacity(0))),
last_write_err: None,
pos: 0,
}
}

Expand Down Expand Up @@ -332,7 +335,9 @@ impl File {
self.state = Idle(Some(buf));

match op {
Operation::Seek(res) => res.map(|_| ()),
Operation::Seek(res) => res.map(|pos| {
self.pos = pos;
}),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -524,9 +529,12 @@ impl AsyncRead for File {
self.last_write_err = Some(e.kind());
self.state = Idle(Some(buf));
}
Operation::Seek(_) => {
Operation::Seek(result) => {
assert!(buf.is_empty());
self.state = Idle(Some(buf));
if let Ok(pos) = result {
self.pos = pos;
}
continue;
}
}
Expand All @@ -537,13 +545,10 @@ impl AsyncRead for File {
}

impl AsyncSeek for File {
fn start_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut pos: SeekFrom,
) -> Poll<io::Result<()>> {
fn start_seek(mut self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
loop {
match self.state {
Busy(_) => panic!("must wait for poll_complete before calling start_seek"),
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();

Expand All @@ -562,22 +567,7 @@ impl AsyncSeek for File {
let res = (&*std).seek(pos);
(Operation::Seek(res), buf)
}));

return Ready(Ok(()));
}
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
self.state = Idle(Some(buf));

match op {
Operation::Read(_) => {}
Operation::Write(Err(e)) => {
assert!(self.last_write_err.is_none());
self.last_write_err = Some(e.kind());
}
Operation::Write(_) => {}
Operation::Seek(_) => {}
}
return Ok(());
}
}
}
Expand All @@ -586,7 +576,7 @@ impl AsyncSeek for File {
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
loop {
match self.state {
Idle(_) => panic!("must call start_seek before calling poll_complete"),
Idle(_) => return Poll::Ready(Ok(self.pos)),
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
self.state = Idle(Some(buf));
Expand All @@ -598,7 +588,12 @@ impl AsyncSeek for File {
self.last_write_err = Some(e.kind());
}
Operation::Write(_) => {}
Operation::Seek(res) => return Ready(res),
Operation::Seek(res) => {
if let Ok(pos) = res {
self.pos = pos;
}
return Ready(res);
}
}
}
}
Expand Down
45 changes: 17 additions & 28 deletions tokio/src/io/async_seek.rs
Expand Up @@ -23,36 +23,33 @@ pub trait AsyncSeek {
///
/// If this function returns successfully, then the job has been submitted.
/// To find out when it completes, call `poll_complete`.
fn start_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
position: SeekFrom,
) -> Poll<io::Result<()>>;
///
/// # Errors
///
/// This function can return [`io::ErrorKind::Other`] in case there is
/// another seek in progress. To avoid this, it is advisable that any call
/// to `start_seek` is preceded by a call to `poll_complete` to ensure all
/// pending seeks have completed.
fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()>;

/// Waits for a seek operation to complete.
///
/// If the seek operation completed successfully,
/// this method returns the new position from the start of the stream.
/// That position can be used later with [`SeekFrom::Start`].
/// That position can be used later with [`SeekFrom::Start`]. Repeatedly
/// calling this function without calling `start_seek` might return the
/// same result.
///
/// # Errors
///
/// Seeking to a negative offset is considered an error.
///
/// # Panics
///
/// Calling this method without calling `start_seek` first is an error.
fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>;
}

macro_rules! deref_async_seek {
() => {
fn start_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<()>> {
Pin::new(&mut **self).start_seek(cx, pos)
fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
Pin::new(&mut **self).start_seek(pos)
}

fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
Expand All @@ -74,12 +71,8 @@ where
P: DerefMut + Unpin,
P::Target: AsyncSeek,
{
fn start_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<()>> {
self.get_mut().as_mut().start_seek(cx, pos)
fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
self.get_mut().as_mut().start_seek(pos)
}

fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
Expand All @@ -88,12 +81,8 @@ where
}

impl<T: AsRef<[u8]> + Unpin> AsyncSeek for io::Cursor<T> {
fn start_seek(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<()>> {
Poll::Ready(io::Seek::seek(&mut *self, pos).map(drop))
fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
io::Seek::seek(&mut *self, pos).map(drop)
}
fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> {
Poll::Ready(Ok(self.get_mut().position()))
Expand Down
17 changes: 10 additions & 7 deletions tokio/src/io/seek.rs
Expand Up @@ -40,14 +40,17 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
match me.pos {
Some(pos) => match Pin::new(&mut *me.seek).start_seek(cx, *pos) {
Poll::Ready(Ok(())) => {
*me.pos = None;
Pin::new(&mut *me.seek).poll_complete(cx)
Some(pos) => {
// ensure no seek in progress
ready!(Pin::new(&mut *me.seek).poll_complete(cx))?;
match Pin::new(&mut *me.seek).start_seek(*pos) {
Ok(()) => {
*me.pos = None;
Pin::new(&mut *me.seek).poll_complete(cx)
}
Err(e) => Poll::Ready(Err(e)),
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
},
}
None => Pin::new(&mut *me.seek).poll_complete(cx),
}
}
Expand Down

0 comments on commit 43bd11b

Please sign in to comment.