From 3c9a4edec5ddb597ac6b1210f9551e45d2bef2b4 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Wed, 21 Aug 2019 09:36:05 +0900 Subject: [PATCH] Remove Unpin requirements from get_pin_mut and AsyncRead::take --- futures-util/src/io/chain.rs | 22 +++++++-- futures-util/src/io/mod.rs | 2 +- futures-util/src/io/take.rs | 64 ++++++++++++++++---------- futures-util/src/sink/fanout.rs | 10 ++-- futures-util/src/stream/into_future.rs | 5 +- futures-util/src/stream/select.rs | 10 ++-- futures-util/src/stream/zip.rs | 10 ++-- 7 files changed, 76 insertions(+), 47 deletions(-) diff --git a/futures-util/src/io/chain.rs b/futures-util/src/io/chain.rs index 9c0b5bf1cd..15758e3a07 100644 --- a/futures-util/src/io/chain.rs +++ b/futures-util/src/io/chain.rs @@ -37,11 +37,6 @@ where } } - /// Consumes the `Chain`, returning the wrapped readers. - pub fn into_inner(self) -> (T, U) { - (self.first, self.second) - } - /// Gets references to the underlying readers in this `Chain`. pub fn get_ref(&self) -> (&T, &U) { (&self.first, &self.second) @@ -55,6 +50,23 @@ where pub fn get_mut(&mut self) -> (&mut T, &mut U) { (&mut self.first, &mut self.second) } + + /// Gets pinned mutable references to the underlying readers in this `Chain`. + /// + /// Care should be taken to avoid modifying the internal I/O state of the + /// underlying readers as doing so may corrupt the internal state of this + /// `Chain`. + pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut T>, Pin<&mut U>) { + unsafe { + let Self { first, second, .. } = self.get_unchecked_mut(); + (Pin::new_unchecked(first), Pin::new_unchecked(second)) + } + } + + /// Consumes the `Chain`, returning the wrapped readers. + pub fn into_inner(self) -> (T, U) { + (self.first, self.second) + } } impl fmt::Debug for Chain diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 7affcd2f16..b8ad7c96d1 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -376,7 +376,7 @@ pub trait AsyncReadExt: AsyncRead { /// # Ok::<(), Box>(()) }).unwrap(); /// ``` fn take(self, limit: u64) -> Take - where Self: Sized + Unpin + where Self: Sized { Take::new(self, limit) } diff --git a/futures-util/src/io/take.rs b/futures-util/src/io/take.rs index 0bf963ac74..b304897b59 100644 --- a/futures-util/src/io/take.rs +++ b/futures-util/src/io/take.rs @@ -1,21 +1,26 @@ use futures_core::task::{Context, Poll}; use futures_io::AsyncRead; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::io; use std::pin::Pin; /// Future for the [`take`](super::AsyncReadExt::take) method. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Take { +pub struct Take { inner: R, - limit: u64, + // Add '_' to avoid conflicts with `limit` method. + limit_: u64, } impl Unpin for Take { } -impl Take { +impl Take { + unsafe_pinned!(inner: R); + unsafe_unpinned!(limit_: u64); + pub(super) fn new(inner: R, limit: u64) -> Self { - Take { inner, limit } + Self { inner, limit_: limit } } /// Returns the remaining number of bytes that can be @@ -44,7 +49,7 @@ impl Take { /// # Ok::<(), Box>(()) }).unwrap(); /// ``` pub fn limit(&self) -> u64 { - self.limit + self.limit_ } /// Sets the number of bytes that can be read before this instance will @@ -76,10 +81,10 @@ impl Take { /// # Ok::<(), Box>(()) }).unwrap(); /// ``` pub fn set_limit(&mut self, limit: u64) { - self.limit = limit + self.limit_ = limit } - /// Consumes the `Take`, returning the wrapped reader. + /// Gets a reference to the underlying reader. /// /// # Examples /// @@ -95,16 +100,20 @@ impl Take { /// let mut take = reader.take(4); /// let n = take.read(&mut buffer).await?; /// - /// let cursor = take.into_inner(); - /// assert_eq!(cursor.position(), 4); + /// let cursor_ref = take.get_ref(); + /// assert_eq!(cursor_ref.position(), 4); /// /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - pub fn into_inner(self) -> R { - self.inner + pub fn get_ref(&self) -> &R { + &self.inner } - /// Gets a reference to the underlying reader. + /// Gets a mutable reference to the underlying reader. + /// + /// Care should be taken to avoid modifying the internal I/O state of the + /// underlying reader as doing so may corrupt the internal limit of this + /// `Take`. /// /// # Examples /// @@ -120,20 +129,24 @@ impl Take { /// let mut take = reader.take(4); /// let n = take.read(&mut buffer).await?; /// - /// let cursor_ref = take.get_ref(); - /// assert_eq!(cursor_ref.position(), 4); + /// let cursor_mut = take.get_mut(); /// /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - pub fn get_ref(&self) -> &R { - &self.inner + pub fn get_mut(&mut self) -> &mut R { + &mut self.inner } - /// Gets a mutable reference to the underlying reader. + /// Gets a pinned mutable reference to the underlying reader. /// /// Care should be taken to avoid modifying the internal I/O state of the /// underlying reader as doing so may corrupt the internal limit of this /// `Take`. + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { + self.inner() + } + + /// Consumes the `Take`, returning the wrapped reader. /// /// # Examples /// @@ -149,28 +162,29 @@ impl Take { /// let mut take = reader.take(4); /// let n = take.read(&mut buffer).await?; /// - /// let cursor_mut = take.get_mut(); + /// let cursor = take.into_inner(); + /// assert_eq!(cursor.position(), 4); /// /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - pub fn get_mut(&mut self) -> &mut R { - &mut self.inner + pub fn into_inner(self) -> R { + self.inner } } -impl AsyncRead for Take { +impl AsyncRead for Take { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - if self.limit == 0 { + if self.limit_ == 0 { return Poll::Ready(Ok(0)); } - let max = std::cmp::min(buf.len() as u64, self.limit) as usize; - let n = ready!(Pin::new(&mut self.inner).poll_read(cx, &mut buf[..max]))?; - self.limit -= n as u64; + let max = std::cmp::min(buf.len() as u64, self.limit_) as usize; + let n = ready!(self.as_mut().inner().poll_read(cx, &mut buf[..max]))?; + *self.as_mut().limit_() -= n as u64; Poll::Ready(Ok(n)) } } diff --git a/futures-util/src/sink/fanout.rs b/futures-util/src/sink/fanout.rs index 00d306241f..24e4de95eb 100644 --- a/futures-util/src/sink/fanout.rs +++ b/futures-util/src/sink/fanout.rs @@ -33,11 +33,11 @@ impl Fanout { } /// Get a pinned mutable reference to the inner sinks. - pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) - where Si1: Unpin, Si2: Unpin, - { - let Self { sink1, sink2 } = self.get_mut(); - (Pin::new(sink1), Pin::new(sink2)) + pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) { + unsafe { + let Self { sink1, sink2 } = self.get_unchecked_mut(); + (Pin::new_unchecked(sink1), Pin::new_unchecked(sink2)) + } } /// Consumes this combinator, returning the underlying sinks. diff --git a/futures-util/src/stream/into_future.rs b/futures-util/src/stream/into_future.rs index 556e388f83..dce4b40209 100644 --- a/futures-util/src/stream/into_future.rs +++ b/futures-util/src/stream/into_future.rs @@ -3,6 +3,7 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; /// Future for the [`into_future`](super::StreamExt::into_future) method. #[derive(Debug)] @@ -14,6 +15,8 @@ pub struct StreamFuture { impl Unpin for StreamFuture {} impl StreamFuture { + unsafe_pinned!(stream: Option); + pub(super) fn new(stream: St) -> StreamFuture { StreamFuture { stream: Some(stream) } } @@ -54,7 +57,7 @@ impl StreamFuture { /// in order to return it to the caller of `Future::poll` if the stream yielded /// an element. pub fn get_pin_mut(self: Pin<&mut Self>) -> Option> { - Pin::new(&mut self.get_mut().stream).as_pin_mut() + self.stream().as_pin_mut() } /// Consumes this combinator, returning the underlying stream. diff --git a/futures-util/src/stream/select.rs b/futures-util/src/stream/select.rs index e40d3f826e..b5fb8133b2 100644 --- a/futures-util/src/stream/select.rs +++ b/futures-util/src/stream/select.rs @@ -56,11 +56,11 @@ impl Select { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) - where St1: Unpin, St2: Unpin, - { - let Self { stream1, stream2, .. } = self.get_mut(); - (Pin::new(stream1.get_mut()), Pin::new(stream2.get_mut())) + pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { + unsafe { + let Self { stream1, stream2, .. } = self.get_unchecked_mut(); + (Pin::new_unchecked(stream1).get_pin_mut(), Pin::new_unchecked(stream2).get_pin_mut()) + } } /// Consumes this combinator, returning the underlying streams. diff --git a/futures-util/src/stream/zip.rs b/futures-util/src/stream/zip.rs index 83b4a0a78a..c8d7f3df0b 100644 --- a/futures-util/src/stream/zip.rs +++ b/futures-util/src/stream/zip.rs @@ -58,11 +58,11 @@ impl Zip { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) - where St1: Unpin, St2: Unpin, - { - let Self { stream1, stream2, .. } = self.get_mut(); - (Pin::new(stream1.get_mut()), Pin::new(stream2.get_mut())) + pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { + unsafe { + let Self { stream1, stream2, .. } = self.get_unchecked_mut(); + (Pin::new_unchecked(stream1).get_pin_mut(), Pin::new_unchecked(stream2).get_pin_mut()) + } } /// Consumes this combinator, returning the underlying streams.