Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EXPERIMENTAL] Add Future/Stream bounds to TryFuture/TryStream #1824

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion futures-core/src/future/mod.rs
Expand Up @@ -57,7 +57,7 @@ mod private_try_future {

/// A convenience for futures that return `Result` values that includes
/// a variety of adapters tailored to such futures.
pub trait TryFuture: private_try_future::Sealed {
pub trait TryFuture: Future + private_try_future::Sealed {
/// The type of successful values yielded by this future
type Ok;

Expand Down
2 changes: 1 addition & 1 deletion futures-core/src/stream.rs
Expand Up @@ -126,7 +126,7 @@ mod private_try_stream {

/// A convenience for streams that return `Result` values that includes
/// a variety of adapters tailored to such futures.
pub trait TryStream: private_try_stream::Sealed {
pub trait TryStream: Stream + private_try_stream::Sealed {
/// The type of successful values yielded by this future
type Ok;

Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/compat/compat03as01.rs
Expand Up @@ -119,7 +119,7 @@ where
type Error = Fut::Error;

fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx)))
with_context(self, |inner, cx| poll_03_to_01(inner.poll(cx)))
}
}

Expand All @@ -131,7 +131,7 @@ where
type Error = St::Error;

fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
with_context(self, |inner, cx| match inner.poll_next(cx)? {
task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
task03::Poll::Pending => Ok(Async01::NotReady),
Expand Down
14 changes: 7 additions & 7 deletions futures-util/src/stream/forward.rs
Expand Up @@ -22,7 +22,7 @@ impl<St: TryStream + Unpin, Si: Sink<St::Ok> + Unpin> Unpin for Forward<St, Si>
impl<St, Si, E> Forward<St, Si>
where
Si: Sink<St::Ok, Error = E>,
St: TryStream<Error = E> + Stream,
St: TryStream<Error = E>,
{
unsafe_pinned!(sink: Option<Si>);
unsafe_pinned!(stream: Fuse<St>);
Expand Down Expand Up @@ -53,20 +53,20 @@ where
}
}

impl<St, Si, Item, E> FusedFuture for Forward<St, Si>
impl<St, Si, E> FusedFuture for Forward<St, Si>
where
Si: Sink<Item, Error = E>,
St: Stream<Item = Result<Item, E>>,
Si: Sink<St::Ok, Error = E>,
St: TryStream<Error = E>,
{
fn is_terminated(&self) -> bool {
self.sink.is_none()
}
}

impl<St, Si, Item, E> Future for Forward<St, Si>
impl<St, Si, E> Future for Forward<St, Si>
where
Si: Sink<Item, Error = E>,
St: Stream<Item = Result<Item, E>>,
Si: Sink<St::Ok, Error = E>,
St: TryStream<Error = E>,
{
type Output = Result<(), E>;

Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/err_into.rs
Expand Up @@ -42,7 +42,7 @@ impl<Fut, E> Future for ErrInto<Fut, E>
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
self.future().try_poll(cx)
self.future().poll(cx)
.map(|res| res.map_err(Into::into))
}
}
4 changes: 2 additions & 2 deletions futures-util/src/try_future/flatten_stream_sink.rs
Expand Up @@ -78,7 +78,7 @@ where
{
fn poll_future(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Fut::Error>> {
if let State::Future(f) = self.as_mut().get_pin_mut() {
match ready!(f.try_poll(cx)) {
match ready!(f.poll(cx)) {
Ok(s) => {
// Future resolved to stream.
// We do not return, but poll that
Expand Down Expand Up @@ -121,7 +121,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
ready!(self.as_mut().state().poll_future(cx)?);
match self.as_mut().state().get_pin_mut() {
State::StreamOrSink(s) => s.try_poll_next(cx),
State::StreamOrSink(s) => s.poll_next(cx),
State::Done => Poll::Ready(None),
State::Future(_) => unreachable!(),
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/inspect_err.rs
Expand Up @@ -44,7 +44,7 @@ where
type Output = Result<Fut::Ok, Fut::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let e = ready!(self.as_mut().future().try_poll(cx));
let e = ready!(self.as_mut().future().poll(cx));
if let Err(e) = &e {
self.as_mut().f().take().expect("cannot poll InspectErr twice")(e);
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/inspect_ok.rs
Expand Up @@ -44,7 +44,7 @@ where
type Output = Result<Fut::Ok, Fut::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let e = ready!(self.as_mut().future().try_poll(cx));
let e = ready!(self.as_mut().future().poll(cx));
if let Ok(e) = &e {
self.as_mut().f().take().expect("cannot poll InspectOk twice")(e);
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/into_future.rs
Expand Up @@ -31,6 +31,6 @@ impl<Fut: TryFuture> Future for IntoFuture<Fut> {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
self.future().try_poll(cx)
self.future().poll(cx)
}
}
2 changes: 1 addition & 1 deletion futures-util/src/try_future/map_err.rs
Expand Up @@ -42,7 +42,7 @@ impl<Fut, F, E> Future for MapErr<Fut, F>
) -> Poll<Self::Output> {
self.as_mut()
.future()
.try_poll(cx)
.poll(cx)
.map(|result| {
let f = self.as_mut().f().take()
.expect("MapErr must not be polled after it returned `Poll::Ready`");
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/map_ok.rs
Expand Up @@ -44,7 +44,7 @@ impl<Fut, F, T> Future for MapOk<Fut, F>
) -> Poll<Self::Output> {
self.as_mut()
.future()
.try_poll(cx)
.poll(cx)
.map(|result| {
let op = self.as_mut().f().take()
.expect("MapOk must not be polled after it returned `Poll::Ready`");
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_future/select_ok.rs
@@ -1,4 +1,4 @@
use crate::try_future::TryFutureExt;
use crate::future::FutureExt;
use core::iter::FromIterator;
use core::mem;
use core::pin::Pin;
Expand Down Expand Up @@ -46,7 +46,7 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
// loop until we've either exhausted all errors, a success was hit, or nothing is ready
loop {
let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| {
match f.try_poll_unpin(cx) {
match f.poll_unpin(cx) {
Poll::Pending => None,
Poll::Ready(e) => Some((i, e)),
}
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_future/try_chain.rs
Expand Up @@ -50,13 +50,13 @@ impl<Fut1, Fut2, Data> TryChain<Fut1, Fut2, Data>
let (output, data) = match this {
TryChain::First(fut1, data) => {
// Poll the first future
let output = ready!(unsafe { Pin::new_unchecked(fut1) }.try_poll(cx));
let output = ready!(unsafe { Pin::new_unchecked(fut1) }.poll(cx));
(output, data.take().unwrap())
}
TryChain::Second(fut2) => {
// Poll the second future
return unsafe { Pin::new_unchecked(fut2) }
.try_poll(cx)
.poll(cx)
.map(|res| {
*this = TryChain::Empty; // Drop fut2.
res
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/try_join_all.rs
Expand Up @@ -142,7 +142,7 @@ where

for mut elem in iter_pin_mut(self.elems.as_mut()) {
if let Some(pending) = elem.as_mut().pending_pin_mut() {
match pending.try_poll(cx) {
match pending.poll(cx) {
Poll::Pending => state = FinalState::Pending,
Poll::Ready(output) => match output {
Ok(item) => elem.set(ElemState::Done(Some(item))),
Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/try_future/try_select.rs
Expand Up @@ -2,7 +2,7 @@ use core::pin::Pin;
use futures_core::future::{Future, TryFuture};
use futures_core::task::{Context, Poll};
use crate::future::Either;
use crate::try_future::TryFutureExt;
use crate::future::FutureExt;

/// Future for the [`try_select()`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
Expand Down Expand Up @@ -65,10 +65,10 @@ impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
match a.try_poll_unpin(cx) {
match a.poll_unpin(cx) {
Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Left((x, b)))),
Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Left((x, b)))),
Poll::Pending => match b.try_poll_unpin(cx) {
Poll::Pending => match b.poll_unpin(cx) {
Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Right((x, a)))),
Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Right((x, a)))),
Poll::Pending => {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_future/unwrap_or_else.rs
Expand Up @@ -45,7 +45,7 @@ impl<Fut, F> Future for UnwrapOrElse<Fut, F>
) -> Poll<Self::Output> {
self.as_mut()
.future()
.try_poll(cx)
.poll(cx)
.map(|result| {
let op = self.as_mut().f().take()
.expect("UnwrapOrElse already returned `Poll::Ready` before");
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_stream/and_then.rs
Expand Up @@ -90,15 +90,15 @@ impl<St, Fut, F> Stream for AndThen<St, Fut, F>
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.future.is_none() {
let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
let item = match ready!(self.as_mut().stream().poll_next(cx)?) {
None => return Poll::Ready(None),
Some(e) => e,
};
let fut = (self.as_mut().f())(item);
self.as_mut().future().set(Some(fut));
}

let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx));
let e = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
self.as_mut().future().set(None);
Poll::Ready(Some(e))
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/err_into.rs
Expand Up @@ -77,7 +77,7 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.stream().try_poll_next(cx)
self.stream().poll_next(cx)
.map(|res| res.map(|some| some.map_err(Into::into)))
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/inspect_err.rs
Expand Up @@ -97,7 +97,7 @@ where
) -> Poll<Option<Self::Item>> {
self.as_mut()
.stream()
.try_poll_next(cx)
.poll_next(cx)
.map(|opt| opt.map(|res| res.map_err(|e| inspect(e, self.as_mut().f()))))
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/inspect_ok.rs
Expand Up @@ -97,7 +97,7 @@ where
) -> Poll<Option<Self::Item>> {
self.as_mut()
.stream()
.try_poll_next(cx)
.poll_next(cx)
.map(|opt| opt.map(|res| res.map(|e| inspect(e, self.as_mut().f()))))
}
}
Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/try_stream/into_async_read.rs
@@ -1,4 +1,4 @@
use crate::try_stream::TryStreamExt;
use crate::stream::StreamExt;
use core::pin::Pin;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
Expand Down Expand Up @@ -73,7 +73,7 @@ where
return Poll::Ready(Ok(len));
}
ReadState::PendingChunk => {
match ready!(self.stream.try_poll_next_unpin(cx)) {
match ready!(self.stream.poll_next_unpin(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
self.state = ReadState::Ready {
Expand Down Expand Up @@ -138,7 +138,7 @@ where
cx: &mut Context<'_>,
) -> Poll<Result<&[u8]>> {
while let ReadState::PendingChunk = self.state {
match ready!(self.stream.try_poll_next_unpin(cx)) {
match ready!(self.stream.poll_next_unpin(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
self.state = ReadState::Ready {
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_stream/into_stream.rs
Expand Up @@ -60,14 +60,14 @@ impl<St: TryStream + FusedStream> FusedStream for IntoStream<St> {
}

impl<St: TryStream> Stream for IntoStream<St> {
type Item = Result<St::Ok, St::Error>;
type Item = St::Item;

#[inline]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.stream().try_poll_next(cx)
self.stream().poll_next(cx)
}
}

Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/map_err.rs
Expand Up @@ -91,7 +91,7 @@ where
) -> Poll<Option<Self::Item>> {
self.as_mut()
.stream()
.try_poll_next(cx)
.poll_next(cx)
.map(|opt| opt.map(|res| res.map_err(|e| self.as_mut().f()(e))))
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/map_ok.rs
Expand Up @@ -91,7 +91,7 @@ where
) -> Poll<Option<Self::Item>> {
self.as_mut()
.stream()
.try_poll_next(cx)
.poll_next(cx)
.map(|opt| opt.map(|res| res.map(|x| self.as_mut().f()(x))))
}
}
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_stream/or_else.rs
Expand Up @@ -90,7 +90,7 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F>
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.future.is_none() {
let item = match ready!(self.as_mut().stream().try_poll_next(cx)) {
let item = match ready!(self.as_mut().stream().poll_next(cx)) {
None => return Poll::Ready(None),
Some(Ok(e)) => return Poll::Ready(Some(Ok(e))),
Some(Err(e)) => e,
Expand All @@ -99,7 +99,7 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F>
self.as_mut().future().set(Some(fut));
}

let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx));
let e = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
self.as_mut().future().set(None);
Poll::Ready(Some(e))
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/try_collect.rs
Expand Up @@ -53,7 +53,7 @@ where
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
loop {
match ready!(self.as_mut().stream().try_poll_next(cx)?) {
match ready!(self.as_mut().stream().poll_next(cx)?) {
Some(x) => self.as_mut().items().extend(Some(x)),
None => return Poll::Ready(Ok(self.as_mut().finish())),
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/try_concat.rs
Expand Up @@ -39,7 +39,7 @@ where

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.as_mut().stream().try_poll_next(cx)?) {
match ready!(self.as_mut().stream().poll_next(cx)?) {
Some(x) => {
let accum = self.as_mut().accum();
if let Some(a) = accum {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/try_filter.rs
Expand Up @@ -111,7 +111,7 @@ impl<St, Fut, F> Stream for TryFilter<St, Fut, F>
) -> Poll<Option<Result<St::Ok, St::Error>>> {
loop {
if self.pending_fut.is_none() {
let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
let item = match ready!(self.as_mut().stream().poll_next(cx)?) {
Some(x) => x,
None => return Poll::Ready(None),
};
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_stream/try_filter_map.rs
Expand Up @@ -98,15 +98,15 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F>
) -> Poll<Option<Result<T, St::Error>>> {
loop {
if self.pending.is_none() {
let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
let item = match ready!(self.as_mut().stream().poll_next(cx)?) {
Some(x) => x,
None => return Poll::Ready(None),
};
let fut = (self.as_mut().f())(item);
self.as_mut().pending().set(Some(fut));
}

let result = ready!(self.as_mut().pending().as_pin_mut().unwrap().try_poll(cx));
let result = ready!(self.as_mut().pending().as_pin_mut().unwrap().poll(cx));
self.as_mut().pending().set(None);
if let Some(x) = result? {
return Poll::Ready(Some(Ok(x)));
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/try_stream/try_flatten.rs
Expand Up @@ -96,7 +96,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if self.next.is_none() {
match ready!(self.as_mut().stream().try_poll_next(cx)?) {
match ready!(self.as_mut().stream().poll_next(cx)?) {
Some(e) => self.as_mut().next().set(Some(e)),
None => return Poll::Ready(None),
}
Expand All @@ -107,7 +107,7 @@ where
.next()
.as_pin_mut()
.unwrap()
.try_poll_next(cx)?)
.poll_next(cx)?)
{
return Poll::Ready(Some(Ok(item)));
} else {
Expand Down