Skip to content

Commit

Permalink
[EXPERIMENTAL] try to remove usage of try_poll(_next)
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Aug 23, 2019
1 parent 6f3717d commit bfe1730
Show file tree
Hide file tree
Showing 34 changed files with 47 additions and 71 deletions.
4 changes: 2 additions & 2 deletions futures-util/src/compat/compat03as01.rs
Expand Up @@ -119,7 +119,7 @@ where
type Error = Fut::Error;

fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx)))
with_context(self, |inner, cx| poll_03_to_01(inner.poll(cx)))
}
}

Expand All @@ -131,7 +131,7 @@ where
type Error = St::Error;

fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
with_context(self, |inner, cx| match inner.poll_next(cx)? {
task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
task03::Poll::Pending => Ok(Async01::NotReady),
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/err_into.rs
Expand Up @@ -42,7 +42,7 @@ impl<Fut, E> Future for ErrInto<Fut, E>
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
self.future().try_poll(cx)
self.future().poll(cx)
.map(|res| res.map_err(Into::into))
}
}
4 changes: 2 additions & 2 deletions futures-util/src/try_future/flatten_stream_sink.rs
Expand Up @@ -78,7 +78,7 @@ where
{
fn poll_future(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Fut::Error>> {
if let State::Future(f) = self.as_mut().get_pin_mut() {
match ready!(f.try_poll(cx)) {
match ready!(f.poll(cx)) {
Ok(s) => {
// Future resolved to stream.
// We do not return, but poll that
Expand Down Expand Up @@ -121,7 +121,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
ready!(self.as_mut().state().poll_future(cx)?);
match self.as_mut().state().get_pin_mut() {
State::StreamOrSink(s) => s.try_poll_next(cx),
State::StreamOrSink(s) => s.poll_next(cx),
State::Done => Poll::Ready(None),
State::Future(_) => unreachable!(),
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/inspect_err.rs
Expand Up @@ -44,7 +44,7 @@ where
type Output = Result<Fut::Ok, Fut::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let e = ready!(self.as_mut().future().try_poll(cx));
let e = ready!(self.as_mut().future().poll(cx));
if let Err(e) = &e {
self.as_mut().f().take().expect("cannot poll InspectErr twice")(e);
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/inspect_ok.rs
Expand Up @@ -44,7 +44,7 @@ where
type Output = Result<Fut::Ok, Fut::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let e = ready!(self.as_mut().future().try_poll(cx));
let e = ready!(self.as_mut().future().poll(cx));
if let Ok(e) = &e {
self.as_mut().f().take().expect("cannot poll InspectOk twice")(e);
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/into_future.rs
Expand Up @@ -31,6 +31,6 @@ impl<Fut: TryFuture> Future for IntoFuture<Fut> {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
self.future().try_poll(cx)
self.future().poll(cx)
}
}
2 changes: 1 addition & 1 deletion futures-util/src/try_future/map_err.rs
Expand Up @@ -42,7 +42,7 @@ impl<Fut, F, E> Future for MapErr<Fut, F>
) -> Poll<Self::Output> {
self.as_mut()
.future()
.try_poll(cx)
.poll(cx)
.map(|result| {
let f = self.as_mut().f().take()
.expect("MapErr must not be polled after it returned `Poll::Ready`");
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/map_ok.rs
Expand Up @@ -44,7 +44,7 @@ impl<Fut, F, T> Future for MapOk<Fut, F>
) -> Poll<Self::Output> {
self.as_mut()
.future()
.try_poll(cx)
.poll(cx)
.map(|result| {
let op = self.as_mut().f().take()
.expect("MapOk must not be polled after it returned `Poll::Ready`");
Expand Down
11 changes: 0 additions & 11 deletions futures-util/src/try_future/mod.rs
Expand Up @@ -473,15 +473,4 @@ pub trait TryFutureExt: TryFuture {
{
IntoFuture::new(self)
}

/// A convenience method for calling [`TryFuture::try_poll`] on [`Unpin`]
/// future types.
fn try_poll_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Ok, Self::Error>>
where Self: Unpin,
{
Pin::new(self).try_poll(cx)
}
}
2 changes: 1 addition & 1 deletion futures-util/src/try_future/select_ok.rs
Expand Up @@ -46,7 +46,7 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
// loop until we've either exhausted all errors, a success was hit, or nothing is ready
loop {
let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| {
match f.try_poll_unpin(cx) {
match f.poll_unpin(cx) {
Poll::Pending => None,
Poll::Ready(e) => Some((i, e)),
}
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_future/try_chain.rs
Expand Up @@ -50,13 +50,13 @@ impl<Fut1, Fut2, Data> TryChain<Fut1, Fut2, Data>
let (output, data) = match this {
TryChain::First(fut1, data) => {
// Poll the first future
let output = ready!(unsafe { Pin::new_unchecked(fut1) }.try_poll(cx));
let output = ready!(unsafe { Pin::new_unchecked(fut1) }.poll(cx));
(output, data.take().unwrap())
}
TryChain::Second(fut2) => {
// Poll the second future
return unsafe { Pin::new_unchecked(fut2) }
.try_poll(cx)
.poll(cx)
.map(|res| {
*this = TryChain::Empty; // Drop fut2.
res
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/try_join_all.rs
Expand Up @@ -142,7 +142,7 @@ where

for mut elem in iter_pin_mut(self.elems.as_mut()) {
if let Some(pending) = elem.as_mut().pending_pin_mut() {
match pending.try_poll(cx) {
match pending.poll(cx) {
Poll::Pending => state = FinalState::Pending,
Poll::Ready(output) => match output {
Ok(item) => elem.set(ElemState::Done(Some(item))),
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_future/try_select.rs
Expand Up @@ -65,10 +65,10 @@ impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
match a.try_poll_unpin(cx) {
match a.poll_unpin(cx) {
Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Left((x, b)))),
Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Left((x, b)))),
Poll::Pending => match b.try_poll_unpin(cx) {
Poll::Pending => match b.poll_unpin(cx) {
Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Right((x, a)))),
Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Right((x, a)))),
Poll::Pending => {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/unwrap_or_else.rs
Expand Up @@ -45,7 +45,7 @@ impl<Fut, F> Future for UnwrapOrElse<Fut, F>
) -> Poll<Self::Output> {
self.as_mut()
.future()
.try_poll(cx)
.poll(cx)
.map(|result| {
let op = self.as_mut().f().take()
.expect("UnwrapOrElse already returned `Poll::Ready` before");
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_stream/and_then.rs
Expand Up @@ -90,15 +90,15 @@ impl<St, Fut, F> Stream for AndThen<St, Fut, F>
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.future.is_none() {
let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
let item = match ready!(self.as_mut().stream().poll_next(cx)?) {
None => return Poll::Ready(None),
Some(e) => e,
};
let fut = (self.as_mut().f())(item);
self.as_mut().future().set(Some(fut));
}

let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx));
let e = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
self.as_mut().future().set(None);
Poll::Ready(Some(e))
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/err_into.rs
Expand Up @@ -77,7 +77,7 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.stream().try_poll_next(cx)
self.stream().poll_next(cx)
.map(|res| res.map(|some| some.map_err(Into::into)))
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/inspect_err.rs
Expand Up @@ -97,7 +97,7 @@ where
) -> Poll<Option<Self::Item>> {
self.as_mut()
.stream()
.try_poll_next(cx)
.poll_next(cx)
.map(|opt| opt.map(|res| res.map_err(|e| inspect(e, self.as_mut().f()))))
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/inspect_ok.rs
Expand Up @@ -97,7 +97,7 @@ where
) -> Poll<Option<Self::Item>> {
self.as_mut()
.stream()
.try_poll_next(cx)
.poll_next(cx)
.map(|opt| opt.map(|res| res.map(|e| inspect(e, self.as_mut().f()))))
}
}
Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/try_stream/into_async_read.rs
@@ -1,4 +1,4 @@
use crate::try_stream::TryStreamExt;
use crate::stream::StreamExt;
use core::pin::Pin;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
Expand Down Expand Up @@ -73,7 +73,7 @@ where
return Poll::Ready(Ok(len));
}
ReadState::PendingChunk => {
match ready!(self.stream.try_poll_next_unpin(cx)) {
match ready!(self.stream.poll_next_unpin(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
self.state = ReadState::Ready {
Expand Down Expand Up @@ -138,7 +138,7 @@ where
cx: &mut Context<'_>,
) -> Poll<Result<&[u8]>> {
while let ReadState::PendingChunk = self.state {
match ready!(self.stream.try_poll_next_unpin(cx)) {
match ready!(self.stream.poll_next_unpin(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
self.state = ReadState::Ready {
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_stream/into_stream.rs
Expand Up @@ -60,14 +60,14 @@ impl<St: TryStream + FusedStream> FusedStream for IntoStream<St> {
}

impl<St: TryStream> Stream for IntoStream<St> {
type Item = Result<St::Ok, St::Error>;
type Item = St::Item;

#[inline]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.stream().try_poll_next(cx)
self.stream().poll_next(cx)
}
}

Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/map_err.rs
Expand Up @@ -91,7 +91,7 @@ where
) -> Poll<Option<Self::Item>> {
self.as_mut()
.stream()
.try_poll_next(cx)
.poll_next(cx)
.map(|opt| opt.map(|res| res.map_err(|e| self.as_mut().f()(e))))
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/map_ok.rs
Expand Up @@ -91,7 +91,7 @@ where
) -> Poll<Option<Self::Item>> {
self.as_mut()
.stream()
.try_poll_next(cx)
.poll_next(cx)
.map(|opt| opt.map(|res| res.map(|x| self.as_mut().f()(x))))
}
}
Expand Down
13 changes: 0 additions & 13 deletions futures-util/src/try_stream/mod.rs
Expand Up @@ -745,19 +745,6 @@ pub trait TryStreamExt: TryStream {
TryBufferUnordered::new(self, n)
}

// TODO: false positive warning from rustdoc. Verify once #43466 settles
//
/// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
/// stream types.
fn try_poll_next_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Ok, Self::Error>>>
where Self: Unpin,
{
Pin::new(self).try_poll_next(cx)
}

/// Wraps a [`TryStream`] into a stream compatible with libraries using
/// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
/// ```
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_stream/or_else.rs
Expand Up @@ -90,7 +90,7 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F>
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.future.is_none() {
let item = match ready!(self.as_mut().stream().try_poll_next(cx)) {
let item = match ready!(self.as_mut().stream().poll_next(cx)) {
None => return Poll::Ready(None),
Some(Ok(e)) => return Poll::Ready(Some(Ok(e))),
Some(Err(e)) => e,
Expand All @@ -99,7 +99,7 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F>
self.as_mut().future().set(Some(fut));
}

let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx));
let e = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
self.as_mut().future().set(None);
Poll::Ready(Some(e))
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/try_collect.rs
Expand Up @@ -53,7 +53,7 @@ where
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
loop {
match ready!(self.as_mut().stream().try_poll_next(cx)?) {
match ready!(self.as_mut().stream().poll_next(cx)?) {
Some(x) => self.as_mut().items().extend(Some(x)),
None => return Poll::Ready(Ok(self.as_mut().finish())),
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/try_concat.rs
Expand Up @@ -39,7 +39,7 @@ where

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.as_mut().stream().try_poll_next(cx)?) {
match ready!(self.as_mut().stream().poll_next(cx)?) {
Some(x) => {
let accum = self.as_mut().accum();
if let Some(a) = accum {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/try_filter.rs
Expand Up @@ -111,7 +111,7 @@ impl<St, Fut, F> Stream for TryFilter<St, Fut, F>
) -> Poll<Option<Result<St::Ok, St::Error>>> {
loop {
if self.pending_fut.is_none() {
let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
let item = match ready!(self.as_mut().stream().poll_next(cx)?) {
Some(x) => x,
None => return Poll::Ready(None),
};
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_stream/try_filter_map.rs
Expand Up @@ -98,15 +98,15 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F>
) -> Poll<Option<Result<T, St::Error>>> {
loop {
if self.pending.is_none() {
let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
let item = match ready!(self.as_mut().stream().poll_next(cx)?) {
Some(x) => x,
None => return Poll::Ready(None),
};
let fut = (self.as_mut().f())(item);
self.as_mut().pending().set(Some(fut));
}

let result = ready!(self.as_mut().pending().as_pin_mut().unwrap().try_poll(cx));
let result = ready!(self.as_mut().pending().as_pin_mut().unwrap().poll(cx));
self.as_mut().pending().set(None);
if let Some(x) = result? {
return Poll::Ready(Some(Ok(x)));
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_stream/try_flatten.rs
Expand Up @@ -96,7 +96,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if self.next.is_none() {
match ready!(self.as_mut().stream().try_poll_next(cx)?) {
match ready!(self.as_mut().stream().poll_next(cx)?) {
Some(e) => self.as_mut().next().set(Some(e)),
None => return Poll::Ready(None),
}
Expand All @@ -107,7 +107,7 @@ where
.next()
.as_pin_mut()
.unwrap()
.try_poll_next(cx)?)
.poll_next(cx)?)
{
return Poll::Ready(Some(Ok(item)));
} else {
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_stream/try_fold.rs
Expand Up @@ -75,7 +75,7 @@ impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F>
let accum = match ready!(
self.as_mut().future().as_pin_mut()
.expect("TryFold polled after completion")
.try_poll(cx)
.poll(cx)
) {
Ok(accum) => accum,
Err(e) => {
Expand All @@ -88,7 +88,7 @@ impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F>
self.as_mut().future().set(None);
}

let item = match ready!(self.as_mut().stream().try_poll_next(cx)) {
let item = match ready!(self.as_mut().stream().poll_next(cx)) {
Some(Ok(item)) => Some(item),
Some(Err(e)) => {
// Indicate that the future can no longer be polled.
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_stream/try_for_each.rs
Expand Up @@ -56,11 +56,11 @@ impl<St, Fut, F> Future for TryForEach<St, Fut, F>
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
if let Some(future) = self.as_mut().future().as_pin_mut() {
ready!(future.try_poll(cx))?;
ready!(future.poll(cx))?;
}
self.as_mut().future().set(None);

match ready!(self.as_mut().stream().try_poll_next(cx)?) {
match ready!(self.as_mut().stream().poll_next(cx)?) {
Some(e) => {
let future = (self.as_mut().f())(e);
self.as_mut().future().set(Some(future));
Expand Down

0 comments on commit bfe1730

Please sign in to comment.