Skip to content

Commit

Permalink
Remove Unpin requirements from get_pin_mut and AsyncRead::take
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Aug 21, 2019
1 parent 7902054 commit 3c9a4ed
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 47 deletions.
22 changes: 17 additions & 5 deletions futures-util/src/io/chain.rs
Expand Up @@ -37,11 +37,6 @@ where
}
}

/// Consumes the `Chain`, returning the wrapped readers.
pub fn into_inner(self) -> (T, U) {
(self.first, self.second)
}

/// Gets references to the underlying readers in this `Chain`.
pub fn get_ref(&self) -> (&T, &U) {
(&self.first, &self.second)
Expand All @@ -55,6 +50,23 @@ where
pub fn get_mut(&mut self) -> (&mut T, &mut U) {
(&mut self.first, &mut self.second)
}

/// Gets pinned mutable references to the underlying readers in this `Chain`.
///
/// Care should be taken to avoid modifying the internal I/O state of the
/// underlying readers as doing so may corrupt the internal state of this
/// `Chain`.
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut T>, Pin<&mut U>) {
unsafe {
let Self { first, second, .. } = self.get_unchecked_mut();
(Pin::new_unchecked(first), Pin::new_unchecked(second))
}
}

/// Consumes the `Chain`, returning the wrapped readers.
pub fn into_inner(self) -> (T, U) {
(self.first, self.second)
}
}

impl<T, U> fmt::Debug for Chain<T, U>
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/io/mod.rs
Expand Up @@ -376,7 +376,7 @@ pub trait AsyncReadExt: AsyncRead {
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
fn take(self, limit: u64) -> Take<Self>
where Self: Sized + Unpin
where Self: Sized
{
Take::new(self, limit)
}
Expand Down
64 changes: 39 additions & 25 deletions futures-util/src/io/take.rs
@@ -1,21 +1,26 @@
use futures_core::task::{Context, Poll};
use futures_io::AsyncRead;
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::io;
use std::pin::Pin;

/// Future for the [`take`](super::AsyncReadExt::take) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Take<R: Unpin> {
pub struct Take<R> {
inner: R,
limit: u64,
// Add '_' to avoid conflicts with `limit` method.
limit_: u64,
}

impl<R: Unpin> Unpin for Take<R> { }

impl<R: AsyncRead + Unpin> Take<R> {
impl<R: AsyncRead> Take<R> {
unsafe_pinned!(inner: R);
unsafe_unpinned!(limit_: u64);

pub(super) fn new(inner: R, limit: u64) -> Self {
Take { inner, limit }
Self { inner, limit_: limit }
}

/// Returns the remaining number of bytes that can be
Expand Down Expand Up @@ -44,7 +49,7 @@ impl<R: AsyncRead + Unpin> Take<R> {
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
pub fn limit(&self) -> u64 {
self.limit
self.limit_
}

/// Sets the number of bytes that can be read before this instance will
Expand Down Expand Up @@ -76,10 +81,10 @@ impl<R: AsyncRead + Unpin> Take<R> {
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
pub fn set_limit(&mut self, limit: u64) {
self.limit = limit
self.limit_ = limit
}

/// Consumes the `Take`, returning the wrapped reader.
/// Gets a reference to the underlying reader.
///
/// # Examples
///
Expand All @@ -95,16 +100,20 @@ impl<R: AsyncRead + Unpin> Take<R> {
/// let mut take = reader.take(4);
/// let n = take.read(&mut buffer).await?;
///
/// let cursor = take.into_inner();
/// assert_eq!(cursor.position(), 4);
/// let cursor_ref = take.get_ref();
/// assert_eq!(cursor_ref.position(), 4);
///
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
pub fn into_inner(self) -> R {
self.inner
pub fn get_ref(&self) -> &R {
&self.inner
}

/// Gets a reference to the underlying reader.
/// Gets a mutable reference to the underlying reader.
///
/// Care should be taken to avoid modifying the internal I/O state of the
/// underlying reader as doing so may corrupt the internal limit of this
/// `Take`.
///
/// # Examples
///
Expand All @@ -120,20 +129,24 @@ impl<R: AsyncRead + Unpin> Take<R> {
/// let mut take = reader.take(4);
/// let n = take.read(&mut buffer).await?;
///
/// let cursor_ref = take.get_ref();
/// assert_eq!(cursor_ref.position(), 4);
/// let cursor_mut = take.get_mut();
///
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
pub fn get_ref(&self) -> &R {
&self.inner
pub fn get_mut(&mut self) -> &mut R {
&mut self.inner
}

/// Gets a mutable reference to the underlying reader.
/// Gets a pinned mutable reference to the underlying reader.
///
/// Care should be taken to avoid modifying the internal I/O state of the
/// underlying reader as doing so may corrupt the internal limit of this
/// `Take`.
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
self.inner()
}

/// Consumes the `Take`, returning the wrapped reader.
///
/// # Examples
///
Expand All @@ -149,28 +162,29 @@ impl<R: AsyncRead + Unpin> Take<R> {
/// let mut take = reader.take(4);
/// let n = take.read(&mut buffer).await?;
///
/// let cursor_mut = take.get_mut();
/// let cursor = take.into_inner();
/// assert_eq!(cursor.position(), 4);
///
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
pub fn get_mut(&mut self) -> &mut R {
&mut self.inner
pub fn into_inner(self) -> R {
self.inner
}
}

impl<R: AsyncRead + Unpin> AsyncRead for Take<R> {
impl<R: AsyncRead> AsyncRead for Take<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
if self.limit == 0 {
if self.limit_ == 0 {
return Poll::Ready(Ok(0));
}

let max = std::cmp::min(buf.len() as u64, self.limit) as usize;
let n = ready!(Pin::new(&mut self.inner).poll_read(cx, &mut buf[..max]))?;
self.limit -= n as u64;
let max = std::cmp::min(buf.len() as u64, self.limit_) as usize;
let n = ready!(self.as_mut().inner().poll_read(cx, &mut buf[..max]))?;
*self.as_mut().limit_() -= n as u64;
Poll::Ready(Ok(n))
}
}
10 changes: 5 additions & 5 deletions futures-util/src/sink/fanout.rs
Expand Up @@ -33,11 +33,11 @@ impl<Si1, Si2> Fanout<Si1, Si2> {
}

/// Get a pinned mutable reference to the inner sinks.
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>)
where Si1: Unpin, Si2: Unpin,
{
let Self { sink1, sink2 } = self.get_mut();
(Pin::new(sink1), Pin::new(sink2))
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) {
unsafe {
let Self { sink1, sink2 } = self.get_unchecked_mut();
(Pin::new_unchecked(sink1), Pin::new_unchecked(sink2))
}
}

/// Consumes this combinator, returning the underlying sinks.
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/stream/into_future.rs
Expand Up @@ -3,6 +3,7 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;

/// Future for the [`into_future`](super::StreamExt::into_future) method.
#[derive(Debug)]
Expand All @@ -14,6 +15,8 @@ pub struct StreamFuture<St> {
impl<St: Stream + Unpin> Unpin for StreamFuture<St> {}

impl<St: Stream + Unpin> StreamFuture<St> {
unsafe_pinned!(stream: Option<St>);

pub(super) fn new(stream: St) -> StreamFuture<St> {
StreamFuture { stream: Some(stream) }
}
Expand Down Expand Up @@ -54,7 +57,7 @@ impl<St: Stream + Unpin> StreamFuture<St> {
/// in order to return it to the caller of `Future::poll` if the stream yielded
/// an element.
pub fn get_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut St>> {
Pin::new(&mut self.get_mut().stream).as_pin_mut()
self.stream().as_pin_mut()
}

/// Consumes this combinator, returning the underlying stream.
Expand Down
10 changes: 5 additions & 5 deletions futures-util/src/stream/select.rs
Expand Up @@ -56,11 +56,11 @@ impl<St1, St2> Select<St1, St2> {
///
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>)
where St1: Unpin, St2: Unpin,
{
let Self { stream1, stream2, .. } = self.get_mut();
(Pin::new(stream1.get_mut()), Pin::new(stream2.get_mut()))
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
unsafe {
let Self { stream1, stream2, .. } = self.get_unchecked_mut();
(Pin::new_unchecked(stream1).get_pin_mut(), Pin::new_unchecked(stream2).get_pin_mut())
}
}

/// Consumes this combinator, returning the underlying streams.
Expand Down
10 changes: 5 additions & 5 deletions futures-util/src/stream/zip.rs
Expand Up @@ -58,11 +58,11 @@ impl<St1: Stream, St2: Stream> Zip<St1, St2> {
///
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>)
where St1: Unpin, St2: Unpin,
{
let Self { stream1, stream2, .. } = self.get_mut();
(Pin::new(stream1.get_mut()), Pin::new(stream2.get_mut()))
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
unsafe {
let Self { stream1, stream2, .. } = self.get_unchecked_mut();
(Pin::new_unchecked(stream1).get_pin_mut(), Pin::new_unchecked(stream2).get_pin_mut())
}
}

/// Consumes this combinator, returning the underlying streams.
Expand Down

0 comments on commit 3c9a4ed

Please sign in to comment.