Skip to content

Commit

Permalink
Change stream bounds of send_all
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e authored and cramertj committed Nov 6, 2019
1 parent 9490dc7 commit d774379
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 21 deletions.
2 changes: 1 addition & 1 deletion futures-util/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub trait SinkExt<Item>: Sink<Item> {
/// `stream` and send them to `self`.
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
where
&'a mut St: TryStream<Ok = Item, Error = Self::Error> + Unpin,
St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
Self: Unpin,
{
SendAll::new(self, stream)
Expand Down
35 changes: 15 additions & 20 deletions futures-util/src/sink/send_all.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::stream::{Fuse, IntoStream, StreamExt, TryStreamExt};
use crate::stream::{Fuse, StreamExt, TryStreamExt};
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::TryStream;
use futures_core::stream::{Stream, TryStream};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;

Expand All @@ -12,20 +12,18 @@ use futures_sink::Sink;
pub struct SendAll<'a, Si, St>
where
Si: ?Sized,
St: ?Sized,
&'a mut St: TryStream,
St: ?Sized + TryStream,
{
sink: &'a mut Si,
stream: Fuse<IntoStream<&'a mut St>>,
buffered: Option<<&'a mut St as TryStream>::Ok>,
stream: Fuse<&'a mut St>,
buffered: Option<St::Ok>,
}

impl<'a, Si, St> fmt::Debug for SendAll<'a, Si, St>
impl<Si, St> fmt::Debug for SendAll<'_, Si, St>
where
Si: fmt::Debug + ?Sized,
St: fmt::Debug + ?Sized,
&'a mut St: TryStream,
<&'a mut St as TryStream>::Ok: fmt::Debug,
St: fmt::Debug + ?Sized + TryStream,
St::Ok: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendAll")
Expand All @@ -37,28 +35,26 @@ where
}

// Pinning is never projected to any fields
impl<'a, Si, St> Unpin for SendAll<'a, Si, St>
impl<Si, St> Unpin for SendAll<'_, Si, St>
where
Si: Unpin + ?Sized,
St: ?Sized,
&'a mut St: TryStream + Unpin,
St: TryStream + Unpin + ?Sized,
{
}

impl<'a, Si, St, Ok, Error> SendAll<'a, Si, St>
where
Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
St: ?Sized,
&'a mut St: TryStream<Ok = Ok, Error = Error> + Unpin,
St: TryStream<Ok = Ok, Error = Error> + Stream + Unpin + ?Sized,
{
pub(super) fn new(sink: &'a mut Si, stream: &'a mut St) -> SendAll<'a, Si, St> {
SendAll { sink, stream: stream.into_stream().fuse(), buffered: None }
SendAll { sink, stream: stream.fuse(), buffered: None }
}

fn try_start_send(
&mut self,
cx: &mut Context<'_>,
item: <&'a mut St as TryStream>::Ok,
item: St::Ok,
) -> Poll<Result<(), Si::Error>> {
debug_assert!(self.buffered.is_none());
match Pin::new(&mut self.sink).poll_ready(cx)? {
Expand All @@ -71,11 +67,10 @@ where
}
}

impl<'a, Si, St, Ok, Error> Future for SendAll<'a, Si, St>
impl<Si, St, Ok, Error> Future for SendAll<'_, Si, St>
where
Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
St: ?Sized,
&'a mut St: TryStream<Ok = Ok, Error = Error> + Unpin,
St: Stream<Item = Result<Ok, Error>> + Unpin + ?Sized,
{
type Output = Result<(), Error>;

Expand Down

0 comments on commit d774379

Please sign in to comment.