diff --git a/futures-util/src/future/poll_immediate.rs b/futures-util/src/future/poll_immediate.rs index 5f3c7a104b..29c832c7df 100644 --- a/futures-util/src/future/poll_immediate.rs +++ b/futures-util/src/future/poll_immediate.rs @@ -3,11 +3,17 @@ use crate::FutureExt; use core::pin::Pin; use futures_core::task::{Context, Poll}; use futures_core::{FusedFuture, Future, Stream}; +use pin_project_lite::pin_project; -/// Future for the [`poll_immediate`](poll_immediate()) function. -#[derive(Debug, Clone)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct PollImmediate(Option); +pin_project! { + /// Future for the [`poll_immediate`](poll_immediate()) function. + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PollImmediate { + #[pin] + future: Option + } +} impl Future for PollImmediate where @@ -17,13 +23,14 @@ where #[inline] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // # Safety - // This is the only time that this future will ever be polled. + let mut this = self.project(); let inner = - unsafe { self.get_unchecked_mut().0.take().expect("PollOnce polled after completion") }; - crate::pin_mut!(inner); + this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion"); match inner.poll(cx) { - Poll::Ready(t) => Poll::Ready(Some(t)), + Poll::Ready(t) => { + this.future.set(None); + Poll::Ready(Some(t)) + } Poll::Pending => Poll::Ready(None), } } @@ -31,7 +38,7 @@ where impl FusedFuture for PollImmediate { fn is_terminated(&self) -> bool { - self.0.is_none() + self.future.is_none() } } @@ -64,25 +71,14 @@ where type Item = Poll; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { - // # Safety - // We never move the inner value until it is done. We only get a reference to it. - let inner = &mut self.get_unchecked_mut().0; - let fut = match inner.as_mut() { - // inner is gone, so we can signal that the stream is closed. - None => return Poll::Ready(None), - Some(inner) => inner, - }; - let fut = Pin::new_unchecked(fut); - Poll::Ready(Some(fut.poll(cx).map(|t| { - // # Safety - // The inner option value is done, so we need to drop it. We do it without moving it - // by using drop in place. We then write over the value without trying to drop it first - // This should uphold all the safety requirements of `Pin` - std::ptr::drop_in_place(inner); - std::ptr::write(inner, None); + let mut this = self.project(); + match this.future.as_mut().as_pin_mut() { + // inner is gone, so we can signal that the stream is closed. + None => return Poll::Ready(None), + Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| { + this.future.set(None); t - }))) + }))), } } } @@ -103,7 +99,7 @@ where /// # }); /// ``` pub fn poll_immediate(f: F) -> PollImmediate { - assert_future::, PollImmediate>(PollImmediate(Some(f))) + assert_future::, PollImmediate>(PollImmediate { future: Some(f) }) } /// Future for the [`poll_immediate_reuse`](poll_immediate_reuse()) function. @@ -119,7 +115,8 @@ where #[inline] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut inner = self.get_mut().0.take().expect("PollOnceReuse polled after completion"); + let mut inner = + self.get_mut().0.take().expect("PollImmediateReuse polled after completion"); match inner.poll_unpin(cx) { Poll::Ready(t) => Poll::Ready(Ok(t)), Poll::Pending => Poll::Ready(Err(inner)), diff --git a/futures-util/src/stream/poll_immediate.rs b/futures-util/src/stream/poll_immediate.rs index d945ea493b..5786e97de4 100644 --- a/futures-util/src/stream/poll_immediate.rs +++ b/futures-util/src/stream/poll_immediate.rs @@ -1,53 +1,50 @@ use futures_core::task::{Context, Poll}; use futures_core::Stream; +use pin_project_lite::pin_project; use std::pin::Pin; -/// Stream for the [`poll_immediate`](poll_immediate()) function. -#[derive(Debug, Clone)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct PollImmediate(Option); +pin_project! { + /// Stream for the [`poll_immediate`](poll_immediate()) function. + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PollImmediate { + #[pin] + stream: Option + } +} impl Stream for PollImmediate where - S: Stream, + S: Stream, { type Item = Poll; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { - // # Safety - // We never move the inner value until it is done. We only get a reference to it. - let inner = &mut self.get_unchecked_mut().0; - let fut = match inner.as_mut() { - // inner is gone, so we can continue to signal that the stream is closed. - None => return Poll::Ready(None), - Some(inner) => inner, - }; - let stream = Pin::new_unchecked(fut); - match stream.poll_next(cx) { - Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))), - Poll::Ready(None) => { - // # Safety - // The inner stream is done, so we need to drop it. We do it without moving it - // by using drop in place. We then write over the value without trying to drop it first - // This should uphold all the safety requirements of `Pin` - std::ptr::drop_in_place(inner); - std::ptr::write(inner, None); - Poll::Ready(None) - } - Poll::Pending => Poll::Ready(Some(Poll::Pending)), + let mut this = self.project(); + let stream = match this.stream.as_mut().as_pin_mut() { + // inner is gone, so we can continue to signal that the stream is closed. + None => return Poll::Ready(None), + Some(inner) => inner, + }; + + match stream.poll_next(cx) { + Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))), + Poll::Ready(None) => { + this.stream.set(None); + Poll::Ready(None) } + Poll::Pending => Poll::Ready(Some(Poll::Pending)), } } fn size_hint(&self) -> (usize, Option) { - self.0.as_ref().map_or((0, Some(0)), Stream::size_hint) + self.stream.as_ref().map_or((0, Some(0)), Stream::size_hint) } } impl super::FusedStream for PollImmediate { fn is_terminated(&self) -> bool { - self.0.is_none() + self.stream.is_none() } } @@ -76,5 +73,5 @@ impl super::FusedStream for PollImmediate { /// # }); /// ``` pub fn poll_immediate(s: S) -> PollImmediate { - super::assert_stream::, PollImmediate>(PollImmediate(Some(s))) + super::assert_stream::, PollImmediate>(PollImmediate { stream: Some(s) }) }