From a4c53c0f9247b1dd9be9d4498376765becd82498 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 5 Sep 2020 18:58:59 +0900 Subject: [PATCH 1/3] Implement more traits for AssertUnmoved Implements the following traits for AssertUnmoved: * FusedFuture * Stream * FusedStream * Sink * AsyncRead * AsyncWrite * AsyncSeek * AsyncBufRead And adds the following methods: * StreamTestExt::assert_unmoved * SinkTestExt::assert_unmoved_sink * AsyncReadTestExt::assert_unmoved * AsyncWriteTestExt::assert_unmoved_write --- futures-test/src/assert_unmoved.rs | 235 ++++++++++++++++++++++ futures-test/src/future/assert_unmoved.rs | 103 ---------- futures-test/src/future/mod.rs | 4 +- futures-test/src/io/read/mod.rs | 16 ++ futures-test/src/io/write/mod.rs | 16 ++ futures-test/src/lib.rs | 1 + futures-test/src/sink/mod.rs | 18 +- futures-test/src/stream/mod.rs | 16 ++ 8 files changed, 302 insertions(+), 107 deletions(-) create mode 100644 futures-test/src/assert_unmoved.rs delete mode 100644 futures-test/src/future/assert_unmoved.rs diff --git a/futures-test/src/assert_unmoved.rs b/futures-test/src/assert_unmoved.rs new file mode 100644 index 0000000000..112c690f87 --- /dev/null +++ b/futures-test/src/assert_unmoved.rs @@ -0,0 +1,235 @@ +use futures_core::future::{FusedFuture, Future}; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use futures_io::{ + self as io, AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom, +}; +use futures_sink::Sink; +use pin_project::{pin_project, pinned_drop}; +use std::pin::Pin; +use std::ptr; +use std::thread::panicking; + +/// Combinator that asserts that the underlying type is not moved after being polled. +/// +/// See the `assert_unmoved` methods on: +/// * [`FutureTestExt`](crate::future::FutureTestExt::assert_unmoved) +/// * [`StreamTestExt`](crate::stream::StreamTestExt::assert_unmoved) +/// * [`SinkTestExt`](crate::sink::SinkTestExt::assert_unmoved_sink) +/// * [`AsyncReadTestExt`](crate::io::AsyncReadTestExt::assert_unmoved) +/// * [`AsyncWriteTestExt`](crate::io::AsyncWriteTestExt::assert_unmoved_write) +#[pin_project(PinnedDrop, !Unpin)] +#[derive(Debug, Clone)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct AssertUnmoved { + #[pin] + inner: T, + this_ptr: *const AssertUnmoved, +} + +// Safety: having a raw pointer in a struct makes it `!Send`, however the +// pointer is never dereferenced so this is safe. +unsafe impl Send for AssertUnmoved {} +unsafe impl Sync for AssertUnmoved {} + +impl AssertUnmoved { + pub(crate) fn new(inner: T) -> Self { + Self { + inner, + this_ptr: ptr::null(), + } + } + + fn poll_with(mut self: Pin<&mut Self>, f: impl FnOnce(Pin<&mut T>) -> U) -> U { + let cur_this = &*self as *const Self; + if self.this_ptr.is_null() { + // First time being polled + *self.as_mut().project().this_ptr = cur_this; + } else { + assert_eq!( + self.this_ptr, cur_this, + "AssertUnmoved moved between poll calls" + ); + } + f(self.project().inner) + } +} + +impl Future for AssertUnmoved { + type Output = Fut::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.poll_with(|f| f.poll(cx)) + } +} + +impl FusedFuture for AssertUnmoved { + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + +impl Stream for AssertUnmoved { + type Item = St::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_with(|s| s.poll_next(cx)) + } +} + +impl FusedStream for AssertUnmoved { + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + +impl, Item> Sink for AssertUnmoved { + type Error = Si::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_with(|s| s.poll_ready(cx)) + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + self.poll_with(|s| s.start_send(item)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_with(|s| s.poll_flush(cx)) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_with(|s| s.poll_close(cx)) + } +} + +impl AsyncRead for AssertUnmoved { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.poll_with(|r| r.poll_read(cx, buf)) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + self.poll_with(|r| r.poll_read_vectored(cx, bufs)) + } +} + +impl AsyncWrite for AssertUnmoved { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.poll_with(|w| w.poll_write(cx, buf)) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + self.poll_with(|w| w.poll_write_vectored(cx, bufs)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_with(|w| w.poll_flush(cx)) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_with(|w| w.poll_close(cx)) + } +} + +impl AsyncSeek for AssertUnmoved { + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + self.poll_with(|s| s.poll_seek(cx, pos)) + } +} + +impl AsyncBufRead for AssertUnmoved { + fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // FIXME: We cannot use `poll_with` here because it causes a lifetime error. + let cur_this = &*self as *const Self; + if self.this_ptr.is_null() { + // First time being polled + *self.as_mut().project().this_ptr = cur_this; + } else { + assert_eq!( + self.this_ptr, cur_this, + "AssertUnmoved moved between poll calls" + ); + } + self.project().inner.poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.poll_with(|r| r.consume(amt)) + } +} + +#[pinned_drop] +impl PinnedDrop for AssertUnmoved { + fn drop(self: Pin<&mut Self>) { + // If the thread is panicking then we can't panic again as that will + // cause the process to be aborted. + if !panicking() && !self.this_ptr.is_null() { + let cur_this = &*self as *const Self; + assert_eq!(self.this_ptr, cur_this, "AssertUnmoved moved before drop"); + } + } +} + +#[cfg(test)] +mod tests { + use futures_core::future::Future; + use futures_core::task::{Context, Poll}; + use futures_util::future::pending; + use futures_util::task::noop_waker; + use std::pin::Pin; + + use super::AssertUnmoved; + + #[test] + fn assert_send_sync() { + fn assert() {} + assert::>(); + } + + #[test] + fn dont_panic_when_not_polled() { + // This shouldn't panic. + let future = AssertUnmoved::new(pending::<()>()); + drop(future); + } + + #[test] + #[should_panic(expected = "AssertUnmoved moved between poll calls")] + fn dont_double_panic() { + // This test should only panic, not abort the process. + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + // First we allocate the future on the stack and poll it. + let mut future = AssertUnmoved::new(pending::<()>()); + let pinned_future = unsafe { Pin::new_unchecked(&mut future) }; + assert_eq!(pinned_future.poll(&mut cx), Poll::Pending); + + // Next we move it back to the heap and poll it again. This second call + // should panic (as the future is moved), but we shouldn't panic again + // whilst dropping `AssertUnmoved`. + let mut future = Box::new(future); + let pinned_boxed_future = unsafe { Pin::new_unchecked(&mut *future) }; + assert_eq!(pinned_boxed_future.poll(&mut cx), Poll::Pending); + } +} diff --git a/futures-test/src/future/assert_unmoved.rs b/futures-test/src/future/assert_unmoved.rs deleted file mode 100644 index 531fd4d818..0000000000 --- a/futures-test/src/future/assert_unmoved.rs +++ /dev/null @@ -1,103 +0,0 @@ -use futures_core::future::Future; -use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, pinned_drop}; -use std::pin::Pin; -use std::ptr; -use std::thread::panicking; - -/// Combinator for the -/// [`FutureTestExt::assert_unmoved`](super::FutureTestExt::assert_unmoved) -/// method. -#[pin_project(PinnedDrop, !Unpin)] -#[derive(Debug, Clone)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct AssertUnmoved { - #[pin] - future: Fut, - this_ptr: *const AssertUnmoved, -} - -// Safety: having a raw pointer in a struct makes it `!Send`, however the -// pointer is never dereferenced so this is safe. -unsafe impl Send for AssertUnmoved {} -unsafe impl Sync for AssertUnmoved {} - -impl AssertUnmoved { - pub(super) fn new(future: Fut) -> Self { - Self { - future, - this_ptr: ptr::null(), - } - } -} - -impl Future for AssertUnmoved { - type Output = Fut::Output; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let cur_this = &*self as *const Self; - if self.this_ptr.is_null() { - // First time being polled - *self.as_mut().project().this_ptr = cur_this; - } else { - assert_eq!(self.this_ptr, cur_this, "Future moved between poll calls"); - } - self.project().future.poll(cx) - } -} - -#[pinned_drop] -impl PinnedDrop for AssertUnmoved { - fn drop(self: Pin<&mut Self>) { - // If the thread is panicking then we can't panic again as that will - // cause the process to be aborted. - if !panicking() && !self.this_ptr.is_null() { - let cur_this = &*self as *const Self; - assert_eq!(self.this_ptr, cur_this, "Future moved before drop"); - } - } -} - -#[cfg(test)] -mod tests { - use futures_core::future::Future; - use futures_core::task::{Context, Poll}; - use futures_util::future::pending; - use futures_util::task::noop_waker; - use std::pin::Pin; - - use super::AssertUnmoved; - - #[test] - fn assert_send_sync() { - fn assert() {} - assert::>(); - } - - #[test] - fn dont_panic_when_not_polled() { - // This shouldn't panic. - let future = AssertUnmoved::new(pending::<()>()); - drop(future); - } - - #[test] - #[should_panic(expected = "Future moved between poll calls")] - fn dont_double_panic() { - // This test should only panic, not abort the process. - let waker = noop_waker(); - let mut cx = Context::from_waker(&waker); - - // First we allocate the future on the stack and poll it. - let mut future = AssertUnmoved::new(pending::<()>()); - let pinned_future = unsafe { Pin::new_unchecked(&mut future) }; - assert_eq!(pinned_future.poll(&mut cx), Poll::Pending); - - // Next we move it back to the heap and poll it again. This second call - // should panic (as the future is moved), but we shouldn't panic again - // whilst dropping `AssertUnmoved`. - let mut future = Box::new(future); - let pinned_boxed_future = unsafe { Pin::new_unchecked(&mut *future) }; - assert_eq!(pinned_boxed_future.poll(&mut cx), Poll::Pending); - } -} diff --git a/futures-test/src/future/mod.rs b/futures-test/src/future/mod.rs index caf814d5a8..ee5c6ddd5d 100644 --- a/futures-test/src/future/mod.rs +++ b/futures-test/src/future/mod.rs @@ -1,14 +1,12 @@ //! Additional combinators for testing futures. -mod assert_unmoved; -pub use self::assert_unmoved::AssertUnmoved; - mod pending_once; pub use self::pending_once::PendingOnce; use futures_core::future::Future; use std::thread; +pub use crate::assert_unmoved::AssertUnmoved; pub use crate::interleave_pending::InterleavePending; /// Additional combinators for testing futures. diff --git a/futures-test/src/io/read/mod.rs b/futures-test/src/io/read/mod.rs index f6c51fd799..cb5f1d3f86 100644 --- a/futures-test/src/io/read/mod.rs +++ b/futures-test/src/io/read/mod.rs @@ -3,10 +3,26 @@ use futures_io::AsyncRead; pub use super::limited::Limited; +pub use crate::assert_unmoved::AssertUnmoved; pub use crate::interleave_pending::InterleavePending; /// Additional combinators for testing async readers. pub trait AsyncReadTestExt: AsyncRead { + /// Asserts that the given is not moved after being polled. + /// + /// A check for movement is performed each time the reader is polled + /// and when `Drop` is called. + /// + /// Aside from keeping track of the location at which the reader was first + /// polled and providing assertions, this reader adds no runtime behavior + /// and simply delegates to the child reader. + fn assert_unmoved(self) -> AssertUnmoved + where + Self: Sized, + { + AssertUnmoved::new(self) + } + /// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending) /// in between each read of the reader. /// diff --git a/futures-test/src/io/write/mod.rs b/futures-test/src/io/write/mod.rs index 9d34ee00a8..01ca4b2fef 100644 --- a/futures-test/src/io/write/mod.rs +++ b/futures-test/src/io/write/mod.rs @@ -3,11 +3,27 @@ use futures_io::AsyncWrite; pub use super::limited::Limited; +pub use crate::assert_unmoved::AssertUnmoved; pub use crate::interleave_pending::InterleavePending; pub use crate::track_closed::TrackClosed; /// Additional combinators for testing async writers. pub trait AsyncWriteTestExt: AsyncWrite { + /// Asserts that the given is not moved after being polled. + /// + /// A check for movement is performed each time the writer is polled + /// and when `Drop` is called. + /// + /// Aside from keeping track of the location at which the writer was first + /// polled and providing assertions, this writer adds no runtime behavior + /// and simply delegates to the child writer. + fn assert_unmoved_write(self) -> AssertUnmoved + where + Self: Sized, + { + AssertUnmoved::new(self) + } + /// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending) /// in between each operation on the writer. /// diff --git a/futures-test/src/lib.rs b/futures-test/src/lib.rs index 06d6c61105..3c9723b662 100644 --- a/futures-test/src/lib.rs +++ b/futures-test/src/lib.rs @@ -45,5 +45,6 @@ pub mod sink; #[cfg(feature = "std")] pub mod io; +mod assert_unmoved; mod interleave_pending; mod track_closed; diff --git a/futures-test/src/sink/mod.rs b/futures-test/src/sink/mod.rs index c3ebcfd65e..b677edaf03 100644 --- a/futures-test/src/sink/mod.rs +++ b/futures-test/src/sink/mod.rs @@ -2,10 +2,26 @@ use futures_sink::Sink; +pub use crate::assert_unmoved::AssertUnmoved; pub use crate::track_closed::TrackClosed; /// Additional combinators for testing sinks. pub trait SinkTestExt: Sink { + /// Asserts that the given is not moved after being polled. + /// + /// A check for movement is performed each time the sink is polled + /// and when `Drop` is called. + /// + /// Aside from keeping track of the location at which the sink was first + /// polled and providing assertions, this sink adds no runtime behavior + /// and simply delegates to the child sink. + fn assert_unmoved_sink(self) -> AssertUnmoved + where + Self: Sized, + { + AssertUnmoved::new(self) + } + /// Track whether this sink has been closed and panics if it is used after closing. /// /// # Examples @@ -29,7 +45,7 @@ pub trait SinkTestExt: Sink { /// Note: Unlike [`AsyncWriteTestExt::track_closed`] when /// used as a sink the adaptor will panic if closed too early as there's no easy way to /// integrate as an error. - /// + /// /// [`AsyncWriteTestExt::track_closed`]: crate::io::AsyncWriteTestExt::track_closed /// /// ``` diff --git a/futures-test/src/stream/mod.rs b/futures-test/src/stream/mod.rs index 5c194e71ad..9151a21c89 100644 --- a/futures-test/src/stream/mod.rs +++ b/futures-test/src/stream/mod.rs @@ -2,10 +2,26 @@ use futures_core::stream::Stream; +pub use crate::assert_unmoved::AssertUnmoved; pub use crate::interleave_pending::InterleavePending; /// Additional combinators for testing streams. pub trait StreamTestExt: Stream { + /// Asserts that the given is not moved after being polled. + /// + /// A check for movement is performed each time the stream is polled + /// and when `Drop` is called. + /// + /// Aside from keeping track of the location at which the stream was first + /// polled and providing assertions, this stream adds no runtime behavior + /// and simply delegates to the child stream. + fn assert_unmoved(self) -> AssertUnmoved + where + Self: Sized, + { + AssertUnmoved::new(self) + } + /// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending) /// in between each item of the stream. /// From 38db3c14b93d1115680157a2da089f4a6b2e0e25 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 5 Sep 2020 19:18:38 +0900 Subject: [PATCH 2/3] Implement more traits for InterleavePending Implements the following traits for InterleavePending: * Sink * AsyncSeek And adds the following methods: * SinkTestExt::interleave_pending_sink --- futures-test/src/interleave_pending.rs | 167 ++++++++++++------------- futures-test/src/sink/mod.rs | 10 ++ 2 files changed, 88 insertions(+), 89 deletions(-) diff --git a/futures-test/src/interleave_pending.rs b/futures-test/src/interleave_pending.rs index 59de2879a9..b065116f42 100644 --- a/futures-test/src/interleave_pending.rs +++ b/futures-test/src/interleave_pending.rs @@ -1,6 +1,9 @@ -use futures_core::future::{Future, FusedFuture}; -use futures_core::stream::{Stream, FusedStream}; -use futures_io::{self as io, AsyncBufRead, AsyncRead, AsyncWrite}; +use futures_core::future::{FusedFuture, Future}; +use futures_core::stream::{FusedStream, Stream}; +use futures_io::{ + self as io, AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom, +}; +use futures_sink::Sink; use pin_project::pin_project; use std::{ pin::Pin, @@ -12,6 +15,7 @@ use std::{ /// See the `interleave_pending` methods on: /// * [`FutureTestExt`](crate::future::FutureTestExt::interleave_pending) /// * [`StreamTestExt`](crate::stream::StreamTestExt::interleave_pending) +/// * [`SinkTestExt`](crate::sink::SinkTestExt::interleave_pending_sink) /// * [`AsyncReadTestExt`](crate::io::AsyncReadTestExt::interleave_pending) /// * [`AsyncWriteTestExt`](crate::io::AsyncWriteTestExt::interleave_pending_write) #[pin_project] @@ -52,18 +56,15 @@ impl InterleavePending { pub fn into_inner(self) -> T { self.inner } -} -impl Future for InterleavePending { - type Output = Fut::Output; - - fn poll( + fn poll_with( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll { + f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll, + ) -> Poll { let this = self.project(); if *this.pended { - let next = this.inner.poll(cx); + let next = f(this.inner, cx); if next.is_ready() { *this.pended = false; } @@ -76,6 +77,14 @@ impl Future for InterleavePending { } } +impl Future for InterleavePending { + type Output = Fut::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.poll_with(cx, Fut::poll) + } +} + impl FusedFuture for InterleavePending { fn is_terminated(&self) -> bool { self.inner.is_terminated() @@ -85,22 +94,8 @@ impl FusedFuture for InterleavePending { impl Stream for InterleavePending { type Item = St::Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let this = self.project(); - if *this.pended { - let next = this.inner.poll_next(cx); - if next.is_ready() { - *this.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - *this.pended = true; - Poll::Pending - } + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_with(cx, St::poll_next) } fn size_hint(&self) -> (usize, Option) { @@ -108,95 +103,89 @@ impl Stream for InterleavePending { } } -impl FusedStream for InterleavePending { +impl FusedStream for InterleavePending { fn is_terminated(&self) -> bool { self.inner.is_terminated() } } -impl AsyncWrite for InterleavePending { - fn poll_write( +impl, Item> Sink for InterleavePending { + type Error = Si::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_with(cx, Si::poll_ready) + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + self.project().inner.start_send(item) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_with(cx, Si::poll_flush) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_with(cx, Si::poll_close) + } +} + +impl AsyncRead for InterleavePending { + fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &[u8], + buf: &mut [u8], ) -> Poll> { - let this = self.project(); - if *this.pended { - let next = this.inner.poll_write(cx, buf); - if next.is_ready() { - *this.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - *this.pended = true; - Poll::Pending - } + self.poll_with(cx, |r, cx| r.poll_read(cx, buf)) } - fn poll_flush( + fn poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { - let this = self.project(); - if *this.pended { - let next = this.inner.poll_flush(cx); - if next.is_ready() { - *this.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - *this.pended = true; - Poll::Pending - } + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + self.poll_with(cx, |r, cx| r.poll_read_vectored(cx, bufs)) } +} - fn poll_close( +impl AsyncWrite for InterleavePending { + fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { - let this = self.project(); - if *this.pended { - let next = this.inner.poll_close(cx); - if next.is_ready() { - *this.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - *this.pended = true; - Poll::Pending - } + buf: &[u8], + ) -> Poll> { + self.poll_with(cx, |w, cx| w.poll_write(cx, buf)) } -} -impl AsyncRead for InterleavePending { - fn poll_read( + fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], + bufs: &[IoSlice<'_>], ) -> Poll> { - let this = self.project(); - if *this.pended { - let next = this.inner.poll_read(cx, buf); - if next.is_ready() { - *this.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - *this.pended = true; - Poll::Pending - } + self.poll_with(cx, |w, cx| w.poll_write_vectored(cx, bufs)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_with(cx, W::poll_flush) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_with(cx, W::poll_close) } } -impl AsyncBufRead for InterleavePending { - fn poll_fill_buf( +impl AsyncSeek for InterleavePending { + fn poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + pos: SeekFrom, + ) -> Poll> { + self.poll_with(cx, |s, cx| s.poll_seek(cx, pos)) + } +} + +impl AsyncBufRead for InterleavePending { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // FIXME: We cannot use `poll_with` here because it causes a lifetime error. let this = self.project(); if *this.pended { let next = this.inner.poll_fill_buf(cx); diff --git a/futures-test/src/sink/mod.rs b/futures-test/src/sink/mod.rs index b677edaf03..eb5a6efa84 100644 --- a/futures-test/src/sink/mod.rs +++ b/futures-test/src/sink/mod.rs @@ -3,6 +3,7 @@ use futures_sink::Sink; pub use crate::assert_unmoved::AssertUnmoved; +pub use crate::interleave_pending::InterleavePending; pub use crate::track_closed::TrackClosed; /// Additional combinators for testing sinks. @@ -22,6 +23,15 @@ pub trait SinkTestExt: Sink { AssertUnmoved::new(self) } + /// Introduces an extra [`Poll::Pending`](futures_core::task::Poll::Pending) + /// in between each operation on the sink. + fn interleave_pending_sink(self) -> InterleavePending + where + Self: Sized, + { + InterleavePending::new(self) + } + /// Track whether this sink has been closed and panics if it is used after closing. /// /// # Examples From c447e5ee294eebf56d46791abc146e9eba45ffb9 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 5 Sep 2020 23:26:29 +0900 Subject: [PATCH 3/3] Avoid lifetime inference errors --- futures-test/src/assert_unmoved.rs | 17 +++-------------- futures-test/src/interleave_pending.rs | 20 ++++---------------- 2 files changed, 7 insertions(+), 30 deletions(-) diff --git a/futures-test/src/assert_unmoved.rs b/futures-test/src/assert_unmoved.rs index 112c690f87..ec81b662a5 100644 --- a/futures-test/src/assert_unmoved.rs +++ b/futures-test/src/assert_unmoved.rs @@ -40,7 +40,7 @@ impl AssertUnmoved { } } - fn poll_with(mut self: Pin<&mut Self>, f: impl FnOnce(Pin<&mut T>) -> U) -> U { + fn poll_with<'a, U>(mut self: Pin<&'a mut Self>, f: impl FnOnce(Pin<&'a mut T>) -> U) -> U { let cur_this = &*self as *const Self; if self.this_ptr.is_null() { // First time being polled @@ -158,19 +158,8 @@ impl AsyncSeek for AssertUnmoved { } impl AsyncBufRead for AssertUnmoved { - fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // FIXME: We cannot use `poll_with` here because it causes a lifetime error. - let cur_this = &*self as *const Self; - if self.this_ptr.is_null() { - // First time being polled - *self.as_mut().project().this_ptr = cur_this; - } else { - assert_eq!( - self.this_ptr, cur_this, - "AssertUnmoved moved between poll calls" - ); - } - self.project().inner.poll_fill_buf(cx) + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_with(|r| r.poll_fill_buf(cx)) } fn consume(self: Pin<&mut Self>, amt: usize) { diff --git a/futures-test/src/interleave_pending.rs b/futures-test/src/interleave_pending.rs index b065116f42..7bc8706388 100644 --- a/futures-test/src/interleave_pending.rs +++ b/futures-test/src/interleave_pending.rs @@ -57,10 +57,10 @@ impl InterleavePending { self.inner } - fn poll_with( - self: Pin<&mut Self>, + fn poll_with<'a, U>( + self: Pin<&'a mut Self>, cx: &mut Context<'_>, - f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll, + f: impl FnOnce(Pin<&'a mut T>, &mut Context<'_>) -> Poll, ) -> Poll { let this = self.project(); if *this.pended { @@ -185,19 +185,7 @@ impl AsyncSeek for InterleavePending { impl AsyncBufRead for InterleavePending { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // FIXME: We cannot use `poll_with` here because it causes a lifetime error. - let this = self.project(); - if *this.pended { - let next = this.inner.poll_fill_buf(cx); - if next.is_ready() { - *this.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - *this.pended = true; - Poll::Pending - } + self.poll_with(cx, R::poll_fill_buf) } fn consume(self: Pin<&mut Self>, amount: usize) {