From 7b372fec205f8c48d302395e08361c8d36430ce6 Mon Sep 17 00:00:00 2001 From: kazuki0824 Date: Mon, 11 Oct 2021 16:45:59 +0900 Subject: [PATCH 1/9] Create copy_buf_abortable, which allows stopping copying in the middle. --- futures-util/src/abortable.rs | 8 +- futures-util/src/io/copy_buf_abortable.rs | 108 ++++++++++++++++++++++ 2 files changed, 112 insertions(+), 4 deletions(-) create mode 100644 futures-util/src/io/copy_buf_abortable.rs diff --git a/futures-util/src/abortable.rs b/futures-util/src/abortable.rs index bb82dd0db8..e0afd47218 100644 --- a/futures-util/src/abortable.rs +++ b/futures-util/src/abortable.rs @@ -75,7 +75,7 @@ impl Abortable { /// in calls to `Abortable::new`. #[derive(Debug)] pub struct AbortRegistration { - inner: Arc, + pub(crate) inner: Arc, } /// A handle to an `Abortable` task. @@ -100,9 +100,9 @@ impl AbortHandle { // Inner type storing the waker to awaken and a bool indicating that it // should be aborted. #[derive(Debug)] -struct AbortInner { - waker: AtomicWaker, - aborted: AtomicBool, +pub(crate) struct AbortInner { + pub(crate) waker: AtomicWaker, + pub(crate) aborted: AtomicBool, } /// Indicator that the `Abortable` task was aborted. diff --git a/futures-util/src/io/copy_buf_abortable.rs b/futures-util/src/io/copy_buf_abortable.rs new file mode 100644 index 0000000000..4b6105e93e --- /dev/null +++ b/futures-util/src/io/copy_buf_abortable.rs @@ -0,0 +1,108 @@ +use crate::abortable::{AbortHandle, AbortInner, Aborted}; +use futures_core::future::Future; +use futures_core::ready; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncBufRead, AsyncWrite}; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +/// Creates a future which copies all the bytes from one object to another, with its `AbortHandle`. +/// +/// The returned future will copy all the bytes read from this `AsyncBufRead` into the +/// `writer` specified. This future will only complete once abort has been requested or the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned. If aborted, `Aborted` is returned. Otherwise, the underlying error is returned. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::io::{self, AsyncWriteExt, Cursor}; +/// use futures::future::Aborted; +/// +/// let reader = Cursor::new([1, 2, 3, 4]); +/// let mut writer = Cursor::new(vec![0u8; 5]); +/// +/// let (fut, abort_handle) = io::copy_buf_abortable(reader, &mut writer); +/// let bytes = fut.await?; +/// abort_handle.abort(); +/// writer.close().await?; +/// match bytes { +/// Ok(n) => { +/// assert_eq!(bytes.unwrap(), 4); +/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); +/// Ok(n) +/// }, +/// Err(a) => { +/// Err::(a) +/// } +/// } +/// +/// ``` +pub fn copy_buf_abortable( + reader: R, + writer: &mut W, +) -> (CopyBufAbortable<'_, R, W>, AbortHandle) +where + R: AsyncBufRead, + W: AsyncWrite + Unpin + ?Sized, +{ + let (handle, reg) = AbortHandle::new_pair(); + (CopyBufAbortable { reader, writer, amt: 0, inner: reg.inner }, handle) +} + +pin_project! { + /// Future for the [`copy_buf()`] function. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CopyBufAbortable<'a, R, W: ?Sized> { + #[pin] + reader: R, + writer: &'a mut W, + amt: u64, + inner: Arc + } +} + +impl Future for CopyBufAbortable<'_, R, W> +where + R: AsyncBufRead, + W: AsyncWrite + Unpin + ?Sized, +{ + type Output = Result, io::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + // Check if the task has been aborted + if this.inner.aborted.load(Ordering::Relaxed) { + return Poll::Ready(Ok(Err(Aborted))); + } + + let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?; + if buffer.is_empty() { + ready!(Pin::new(&mut this.writer).poll_flush(cx))?; + return Poll::Ready(Ok(Ok(*this.amt))); + } + + let i = ready!(Pin::new(&mut this.writer).poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + *this.amt += i as u64; + this.reader.as_mut().consume(i); + + // Register to receive a wakeup if the task is aborted in the future + this.inner.waker.register(cx.waker()); + + if this.inner.aborted.load(Ordering::Relaxed) { + return Poll::Ready(Ok(Err(Aborted))); + } + } + } +} From 9e9ae28d274273ad71e9f8e5294cdfec546aebc4 Mon Sep 17 00:00:00 2001 From: kazuki0824 Date: Mon, 11 Oct 2021 17:05:43 +0900 Subject: [PATCH 2/9] Add mod copy_buf_abortable --- futures-util/src/io/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index bb576c0245..54c87c104f 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -76,6 +76,9 @@ pub use self::copy::{copy, Copy}; mod copy_buf; pub use self::copy_buf::{copy_buf, CopyBuf}; +mod copy_buf_abortable; +pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable}; + mod cursor; pub use self::cursor::Cursor; From 4b4b91315d56a87e57f6b21c4fce507a6c78e1b1 Mon Sep 17 00:00:00 2001 From: kazuki0824 Date: Mon, 11 Oct 2021 17:24:17 +0900 Subject: [PATCH 3/9] fix docs --- futures-util/src/io/copy_buf_abortable.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/io/copy_buf_abortable.rs b/futures-util/src/io/copy_buf_abortable.rs index 4b6105e93e..27ee514f2a 100644 --- a/futures-util/src/io/copy_buf_abortable.rs +++ b/futures-util/src/io/copy_buf_abortable.rs @@ -42,7 +42,7 @@ use std::sync::Arc; /// Err::(a) /// } /// } -/// +/// # }).unwrap(); /// ``` pub fn copy_buf_abortable( reader: R, From 4208c26fc10bbbe73fca04c227f231d1a1a1604e Mon Sep 17 00:00:00 2001 From: kazuki0824 Date: Mon, 11 Oct 2021 17:48:59 +0900 Subject: [PATCH 4/9] fix docs --- futures-util/src/io/copy_buf_abortable.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/futures-util/src/io/copy_buf_abortable.rs b/futures-util/src/io/copy_buf_abortable.rs index 27ee514f2a..1ea61b6414 100644 --- a/futures-util/src/io/copy_buf_abortable.rs +++ b/futures-util/src/io/copy_buf_abortable.rs @@ -29,18 +29,19 @@ use std::sync::Arc; /// let mut writer = Cursor::new(vec![0u8; 5]); /// /// let (fut, abort_handle) = io::copy_buf_abortable(reader, &mut writer); -/// let bytes = fut.await?; +/// let bytes = fut.await; /// abort_handle.abort(); -/// writer.close().await?; +/// writer.close().await.unwrap(); /// match bytes { -/// Ok(n) => { -/// assert_eq!(bytes.unwrap(), 4); +/// Ok(Ok(n)) => { +/// assert_eq!(n, 4); /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); /// Ok(n) /// }, -/// Err(a) => { -/// Err::(a) +/// Ok(Err(a)) => { +/// Err::(a) /// } +/// Err(e) => panic!("{}", e) /// } /// # }).unwrap(); /// ``` From 362ed8c94894ca44493dad26ace4bef0585cd6df Mon Sep 17 00:00:00 2001 From: kazuki0824 Date: Tue, 23 Nov 2021 21:59:36 +0900 Subject: [PATCH 5/9] Remove unnecessary wakeups and flag checking --- futures-util/src/io/copy_buf_abortable.rs | 29 ++++++++++++++--------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/futures-util/src/io/copy_buf_abortable.rs b/futures-util/src/io/copy_buf_abortable.rs index 1ea61b6414..14eb6b6790 100644 --- a/futures-util/src/io/copy_buf_abortable.rs +++ b/futures-util/src/io/copy_buf_abortable.rs @@ -1,6 +1,5 @@ use crate::abortable::{AbortHandle, AbortInner, Aborted}; use futures_core::future::Future; -use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncWrite}; use pin_project_lite::pin_project; @@ -70,6 +69,15 @@ pin_project! { } } +macro_rules! ready_or_break { + ($e:expr $(,)?) => { + match $e { + $crate::task::Poll::Ready(t) => t, + $crate::task::Poll::Pending => break, + } + }; +} + impl Future for CopyBufAbortable<'_, R, W> where R: AsyncBufRead, @@ -85,25 +93,24 @@ where return Poll::Ready(Ok(Err(Aborted))); } - let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?; + // Read some bytes from the reader, and if we have reached EOF, return total bytes read + let buffer = ready_or_break!(this.reader.as_mut().poll_fill_buf(cx))?; if buffer.is_empty() { - ready!(Pin::new(&mut this.writer).poll_flush(cx))?; + ready_or_break!(Pin::new(&mut this.writer).poll_flush(cx))?; return Poll::Ready(Ok(Ok(*this.amt))); } - let i = ready!(Pin::new(&mut this.writer).poll_write(cx, buffer))?; + // Pass the buffer to the writer, and update the amount written + let i = ready_or_break!(Pin::new(&mut this.writer).poll_write(cx, buffer))?; if i == 0 { return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); } *this.amt += i as u64; this.reader.as_mut().consume(i); - - // Register to receive a wakeup if the task is aborted in the future - this.inner.waker.register(cx.waker()); - - if this.inner.aborted.load(Ordering::Relaxed) { - return Poll::Ready(Ok(Err(Aborted))); - } } + // Schedule the task to be woken up again. + // Never called unless Poll::Pending is returned from io objects. + cx.waker().wake_by_ref(); + Poll::Pending } } From 87168829d2e970da169f6c5f277963076409711b Mon Sep 17 00:00:00 2001 From: kazuki0824 Date: Sat, 9 Apr 2022 19:25:33 +0900 Subject: [PATCH 6/9] Register the waker --- futures-util/src/io/copy_buf_abortable.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/io/copy_buf_abortable.rs b/futures-util/src/io/copy_buf_abortable.rs index 14eb6b6790..4d32643c9d 100644 --- a/futures-util/src/io/copy_buf_abortable.rs +++ b/futures-util/src/io/copy_buf_abortable.rs @@ -110,7 +110,7 @@ where } // Schedule the task to be woken up again. // Never called unless Poll::Pending is returned from io objects. - cx.waker().wake_by_ref(); + self.inner.waker.register(cx.waker()); Poll::Pending } } From f9a0139b4ce9f1f75969744cabeb6bd2b49a71fa Mon Sep 17 00:00:00 2001 From: kazuki0824 Date: Tue, 12 Apr 2022 01:43:49 +0900 Subject: [PATCH 7/9] Use this for pinned structs instead of self --- futures-util/src/io/copy_buf_abortable.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/io/copy_buf_abortable.rs b/futures-util/src/io/copy_buf_abortable.rs index 4d32643c9d..cba9787d0b 100644 --- a/futures-util/src/io/copy_buf_abortable.rs +++ b/futures-util/src/io/copy_buf_abortable.rs @@ -110,7 +110,7 @@ where } // Schedule the task to be woken up again. // Never called unless Poll::Pending is returned from io objects. - self.inner.waker.register(cx.waker()); + this.inner.waker.register(cx.waker()); Poll::Pending } } From e49bf7d987d2ec95fa9b9439688e209f8ee859f2 Mon Sep 17 00:00:00 2001 From: kazuki0824 Date: Mon, 18 Apr 2022 01:52:53 +0900 Subject: [PATCH 8/9] Add checks for the aborted flag --- futures-util/src/io/copy_buf_abortable.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/futures-util/src/io/copy_buf_abortable.rs b/futures-util/src/io/copy_buf_abortable.rs index cba9787d0b..08b29e898d 100644 --- a/futures-util/src/io/copy_buf_abortable.rs +++ b/futures-util/src/io/copy_buf_abortable.rs @@ -111,6 +111,26 @@ where // Schedule the task to be woken up again. // Never called unless Poll::Pending is returned from io objects. this.inner.waker.register(cx.waker()); + + // Check to see if the task was aborted between the first check and + // registration. + // Checking with `is_aborted` which uses `Relaxed` is sufficient because + // `register` introduces an `AcqRel` barrier. + if self.is_aborted() { + return Poll::Ready(Err(Aborted)); + } Poll::Pending } } + +impl CopyBufAbortable<'_, &'_ [u8], &'_ mut [u8]> { + /// Checks whether the task has been aborted. Note that all this + /// method indicates is whether [`AbortHandle::abort`] was *called*. + /// This means that it will return `true` even if: + /// * `abort` was called after the task had completed. + /// * `abort` was called while the task was being polled - the task may still be running and + /// will not be stopped until `poll` returns. + pub fn is_aborted(&self) -> bool { + self.inner.aborted.load(Ordering::Relaxed) + } +} \ No newline at end of file From 9f965997a234ebcd18cbd4d9971aa8e3c195642d Mon Sep 17 00:00:00 2001 From: kazuki0824 Date: Mon, 18 Apr 2022 02:10:42 +0900 Subject: [PATCH 9/9] Remove is_aborted and insert in line L119 --- futures-util/src/io/copy_buf_abortable.rs | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/futures-util/src/io/copy_buf_abortable.rs b/futures-util/src/io/copy_buf_abortable.rs index 08b29e898d..fdbc4a5f00 100644 --- a/futures-util/src/io/copy_buf_abortable.rs +++ b/futures-util/src/io/copy_buf_abortable.rs @@ -81,7 +81,7 @@ macro_rules! ready_or_break { impl Future for CopyBufAbortable<'_, R, W> where R: AsyncBufRead, - W: AsyncWrite + Unpin + ?Sized, + W: AsyncWrite + Unpin + Sized, { type Output = Result, io::Error>; @@ -114,23 +114,11 @@ where // Check to see if the task was aborted between the first check and // registration. - // Checking with `is_aborted` which uses `Relaxed` is sufficient because + // Checking with `Relaxed` is sufficient because // `register` introduces an `AcqRel` barrier. - if self.is_aborted() { - return Poll::Ready(Err(Aborted)); + if this.inner.aborted.load(Ordering::Relaxed) { + return Poll::Ready(Ok(Err(Aborted))); } Poll::Pending } } - -impl CopyBufAbortable<'_, &'_ [u8], &'_ mut [u8]> { - /// Checks whether the task has been aborted. Note that all this - /// method indicates is whether [`AbortHandle::abort`] was *called*. - /// This means that it will return `true` even if: - /// * `abort` was called after the task had completed. - /// * `abort` was called while the task was being polled - the task may still be running and - /// will not be stopped until `poll` returns. - pub fn is_aborted(&self) -> bool { - self.inner.aborted.load(Ordering::Relaxed) - } -} \ No newline at end of file