Skip to content

Commit

Permalink
Use TryStream in Sink::send_all
Browse files Browse the repository at this point in the history
  • Loading branch information
cramertj committed Nov 5, 2019
1 parent 0c165a3 commit 812ce0a
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 25 deletions.
4 changes: 2 additions & 2 deletions futures-util/src/sink/mod.rs
Expand Up @@ -7,7 +7,7 @@
//! library is activated, and it is activated by default.

use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::stream::{Stream, TryStream};
use crate::future::Either;

#[cfg(feature = "compat")]
Expand Down Expand Up @@ -223,7 +223,7 @@ pub trait SinkExt<Item>: Sink<Item> {
&'a mut self,
stream: &'a mut St
) -> SendAll<'a, Self, St>
where St: Stream<Item = Item> + Unpin,
where &'a mut St: TryStream<Ok = Item, Error = Self::Error> + Unpin,
Self: Unpin,
{
SendAll::new(self, stream)
Expand Down
56 changes: 38 additions & 18 deletions futures-util/src/sink/send_all.rs
@@ -1,51 +1,70 @@
use crate::stream::{StreamExt, Fuse};
use crate::stream::{StreamExt, TryStreamExt, Fuse, IntoStream};
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;

/// Future for the [`send_all`](super::SinkExt::send_all) method.
#[allow(explicit_outlives_requirements)] // https://github.com/rust-lang/rust/issues/60993
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SendAll<'a, Si, St>
where
Si: ?Sized,
St: Stream + ?Sized,
St: ?Sized,
&'a mut St: TryStream,
{
sink: &'a mut Si,
stream: Fuse<&'a mut St>,
buffered: Option<St::Item>,
stream: Fuse<IntoStream<&'a mut St>>,
buffered: Option<<&'a mut St as TryStream>::Ok>,
}

impl<'a, Si, St> fmt::Debug for SendAll<'a, Si, St>
where
Si: fmt::Debug + ?Sized,
St: fmt::Debug + ?Sized,
&'a mut St: TryStream,
<&'a mut St as TryStream>::Ok: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendAll")
.field("sink", &self.sink)
.field("stream", &self.stream)
.field("buffered", &self.buffered)
.finish()
}
}

// Pinning is never projected to any fields
impl<Si, St> Unpin for SendAll<'_, Si, St>
impl<'a, Si, St> Unpin for SendAll<'a, Si, St>
where
Si: Unpin + ?Sized,
St: Stream + Unpin + ?Sized,
St: ?Sized,
&'a mut St: TryStream + Unpin,
{}

impl<'a, Si, St> SendAll<'a, Si, St>
impl<'a, Si, St, Ok, Error> SendAll<'a, Si, St>
where
Si: Sink<St::Item> + Unpin + ?Sized,
St: Stream + Unpin + ?Sized,
Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
St: ?Sized,
&'a mut St: TryStream<Ok = Ok, Error = Error> + Unpin,
{
pub(super) fn new(
sink: &'a mut Si,
stream: &'a mut St,
) -> SendAll<'a, Si, St> {
SendAll {
sink,
stream: stream.fuse(),
stream: stream.into_stream().fuse(),
buffered: None,
}
}

fn try_start_send(
&mut self,
cx: &mut Context<'_>,
item: St::Item,
item: <&'a mut St as TryStream>::Ok,
) -> Poll<Result<(), Si::Error>> {
debug_assert!(self.buffered.is_none());
match Pin::new(&mut self.sink).poll_ready(cx)? {
Expand All @@ -60,12 +79,13 @@ where
}
}

impl<Si, St> Future for SendAll<'_, Si, St>
impl<'a, Si, St, Ok, Error> Future for SendAll<'a, Si, St>
where
Si: Sink<St::Item> + Unpin + ?Sized,
St: Stream + Unpin + ?Sized,
Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
St: ?Sized,
&'a mut St: TryStream<Ok = Ok, Error = Error> + Unpin,
{
type Output = Result<(), Si::Error>;
type Output = Result<(), Error>;

fn poll(
mut self: Pin<&mut Self>,
Expand All @@ -79,7 +99,7 @@ where
}

loop {
match this.stream.poll_next_unpin(cx) {
match this.stream.try_poll_next_unpin(cx)? {
Poll::Ready(Some(item)) => {
ready!(this.try_start_send(cx, item))?
}
Expand Down
8 changes: 4 additions & 4 deletions futures/tests/sink.rs
Expand Up @@ -86,13 +86,13 @@ fn send() {
fn send_all() {
let mut v = Vec::new();

block_on(v.send_all(&mut stream::iter(vec![0, 1]))).unwrap();
block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap();
assert_eq!(v, vec![0, 1]);

block_on(v.send_all(&mut stream::iter(vec![2, 3]))).unwrap();
block_on(v.send_all(&mut stream::iter(vec![2, 3]).map(Ok))).unwrap();
assert_eq!(v, vec![0, 1, 2, 3]);

block_on(v.send_all(&mut stream::iter(vec![4, 5]))).unwrap();
block_on(v.send_all(&mut stream::iter(vec![4, 5]).map(Ok))).unwrap();
assert_eq!(v, vec![0, 1, 2, 3, 4, 5]);
}

Expand Down Expand Up @@ -434,7 +434,7 @@ fn fanout_smoke() {
let sink1 = Vec::new();
let sink2 = Vec::new();
let mut sink = sink1.fanout(sink2);
block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]))).unwrap();
block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]).map(Ok))).unwrap();
let (sink1, sink2) = sink.into_inner();
assert_eq!(sink1, vec![1, 2, 3]);
assert_eq!(sink2, vec![1, 2, 3]);
Expand Down
3 changes: 2 additions & 1 deletion futures/tests/split.rs
Expand Up @@ -69,7 +69,8 @@ fn test_split() {

let (sink, stream) = join.split();
let join = sink.reunite(stream).expect("test_split: reunite error");
let (mut sink, mut stream) = join.split();
let (mut sink, stream) = join.split();
let mut stream = stream.map(Ok);
block_on(sink.send_all(&mut stream)).unwrap();
}
assert_eq!(dest, vec![10, 20, 30]);
Expand Down

0 comments on commit 812ce0a

Please sign in to comment.