Skip to content

Commit

Permalink
Move stream items into tokio-stream (#3277)
Browse files Browse the repository at this point in the history
This change removes all references to `Stream` from
within the `tokio` crate and moves them into a new
`tokio-stream` crate. Most types have had their
`impl Stream` removed as well in-favor of their
inherent methods.

Closes #2870
  • Loading branch information
LucioFranco committed Dec 16, 2020
1 parent fcce78b commit 8efa620
Show file tree
Hide file tree
Showing 101 changed files with 710 additions and 577 deletions.
1 change: 0 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ missing a difficulty rating, and you should feel free to add one.
- **M-process** The `tokio::process` module.
- **M-runtime** The `tokio::runtime` module.
- **M-signal** The `tokio::signal` module.
- **M-stream** The `tokio::stream` module.
- **M-sync** The `tokio::sync` module.
- **M-task** The `tokio::task` module.
- **M-time** The `tokio::time` module.
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"tokio",
"tokio-macros",
"tokio-test",
"tokio-stream",
"tokio-util",

# Internal
Expand Down
12 changes: 8 additions & 4 deletions benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,26 @@ fn create_100_000_medium(b: &mut Bencher) {
}

fn send_medium(b: &mut Bencher) {
let rt = rt();

b.iter(|| {
let (tx, mut rx) = mpsc::channel::<Medium>(1000);

let _ = tx.try_send([0; 64]);
let _ = rt.block_on(tx.send([0; 64]));

rx.try_recv().unwrap();
rt.block_on(rx.recv()).unwrap();
});
}

fn send_large(b: &mut Bencher) {
let rt = rt();

b.iter(|| {
let (tx, mut rx) = mpsc::channel::<Large>(1000);

let _ = tx.try_send([[0; 64]; 64]);
let _ = rt.block_on(tx.send([[0; 64]; 64]));

rx.try_recv().unwrap();
rt.block_on(rx.recv()).unwrap();
});
}

Expand Down
5 changes: 4 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ edition = "2018"
# [dependencies] instead.
[dev-dependencies]
tokio = { version = "1.0.0", path = "../tokio", features = ["full", "tracing"] }
tokio-util = { version = "0.6.0", path = "../tokio-util", features = ["full"] }
tokio-stream = { version = "0.1", path = "../tokio-stream" }

async-stream = "0.3"
tracing = "0.1"
tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] }
tokio-util = { version = "0.6.0", path = "../tokio-util", features = ["full"] }
bytes = "0.6"
futures = "0.3.0"
http = "0.2"
Expand Down
15 changes: 9 additions & 6 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
#![warn(rust_2018_idioms)]

use tokio::net::{TcpListener, TcpStream};
use tokio::stream::{Stream, StreamExt};
use tokio::sync::{mpsc, Mutex};
use tokio_stream::{Stream, StreamExt};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};

use futures::SinkExt;
Expand Down Expand Up @@ -101,9 +101,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
/// Shorthand for the transmit half of the message channel.
type Tx = mpsc::UnboundedSender<String>;

/// Shorthand for the receive half of the message channel.
type Rx = mpsc::UnboundedReceiver<String>;

/// Data that is shared between all peers in the chat server.
///
/// This is the set of `Tx` handles for all connected clients. Whenever a
Expand All @@ -127,7 +124,7 @@ struct Peer {
///
/// This is used to receive messages from peers. When a message is received
/// off of this `Rx`, it will be written to the socket.
rx: Rx,
rx: Pin<Box<dyn Stream<Item = String> + Send>>,
}

impl Shared {
Expand Down Expand Up @@ -159,11 +156,17 @@ impl Peer {
let addr = lines.get_ref().peer_addr()?;

// Create a channel for this peer
let (tx, rx) = mpsc::unbounded_channel();
let (tx, mut rx) = mpsc::unbounded_channel();

// Add an entry for this `Peer` in the shared state map.
state.lock().await.peers.insert(addr, tx);

let rx = Box::pin(async_stream::stream! {
while let Some(item) = rx.recv().await {
yield item;
}
});

Ok(Peer { lines, rx })
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/print_each_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
#![warn(rust_2018_idioms)]

use tokio::net::TcpListener;
use tokio::stream::StreamExt;
use tokio_stream::StreamExt;
use tokio_util::codec::{BytesCodec, Decoder};

use std::env;
Expand Down
2 changes: 1 addition & 1 deletion examples/tinydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#![warn(rust_2018_idioms)]

use tokio::net::TcpListener;
use tokio::stream::StreamExt;
use tokio_stream::StreamExt;
use tokio_util::codec::{Framed, LinesCodec};

use futures::SinkExt;
Expand Down
2 changes: 1 addition & 1 deletion examples/tinyhttp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use http::{header::HeaderValue, Request, Response, StatusCode};
extern crate serde_derive;
use std::{env, error::Error, fmt, io};
use tokio::net::{TcpListener, TcpStream};
use tokio::stream::StreamExt;
use tokio_stream::StreamExt;
use tokio_util::codec::{Decoder, Encoder, Framed};

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/udp-codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#![warn(rust_2018_idioms)]

use tokio::net::UdpSocket;
use tokio::stream::StreamExt;
use tokio::{io, time};
use tokio_stream::StreamExt;
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;

Expand Down
38 changes: 38 additions & 0 deletions tokio-stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[package]
name = "tokio-stream"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - Update CHANGELOG.md.
# - Create "tokio-stream-0.1.x" git tag.
version = "0.1.0"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
documentation = "https://docs.rs/tokio-stream/0.1.0/tokio_stream"
description = """
Utilities to work with `Stream` and `tokio`.
"""
categories = ["asynchronous"]
publish = false

[features]
default = ["time"]
time = ["tokio/time"]

[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
tokio = { version = "1.0", path = "../tokio", features = ["sync"] }
async-stream = "0.3"

[dev-dependencies]
tokio = { version = "1.0", path = "../tokio", features = ["full"] }
tokio-test = { path = "../tokio-test" }
futures = { version = "0.3", default-features = false }

proptest = "0.10.0"
2 changes: 1 addition & 1 deletion tokio/src/stream/all.rs → tokio-stream/src/all.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::future::Future;
use core::marker::PhantomPinned;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/stream/any.rs → tokio-stream/src/any.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::future::Future;
use core::marker::PhantomPinned;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/stream/chain.rs → tokio-stream/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::{Fuse, Stream};
use crate::{Fuse, Stream};

use core::pin::Pin;
use core::task::{Context, Poll};
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/stream/collect.rs → tokio-stream/src/collect.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::future::Future;
use core::marker::PhantomPinned;
Expand Down Expand Up @@ -26,7 +26,7 @@ pin_project! {
}
}

/// Convert from a [`Stream`](crate::stream::Stream).
/// Convert from a [`Stream`](crate::Stream).
///
/// This trait is not intended to be used directly. Instead, call
/// [`StreamExt::collect()`](super::StreamExt::collect).
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/stream/empty.rs → tokio-stream/src/empty.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::marker::PhantomData;
use core::pin::Pin;
Expand All @@ -24,7 +24,7 @@ unsafe impl<T> Sync for Empty<T> {}
/// Basic usage:
///
/// ```
/// use tokio::stream::{self, StreamExt};
/// use tokio_stream::{self as stream, StreamExt};
///
/// #[tokio::main]
/// async fn main() {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/stream/filter.rs → tokio-stream/src/filter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::fmt;
use core::pin::Pin;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::fmt;
use core::pin::Pin;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/stream/fold.rs → tokio-stream/src/fold.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::future::Future;
use core::marker::PhantomPinned;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/stream/fuse.rs → tokio-stream/src/fuse.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use pin_project_lite::pin_project;
use std::pin::Pin;
Expand Down
21 changes: 16 additions & 5 deletions tokio/src/stream/iter.rs → tokio-stream/src/iter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::pin::Pin;
use core::task::{Context, Poll};
Expand All @@ -8,6 +8,7 @@ use core::task::{Context, Poll};
#[must_use = "streams do nothing unless polled"]
pub struct Iter<I> {
iter: I,
yield_amt: usize,
}

impl<I> Unpin for Iter<I> {}
Expand All @@ -20,7 +21,7 @@ impl<I> Unpin for Iter<I> {}
///
/// ```
/// # async fn dox() {
/// use tokio::stream::{self, StreamExt};
/// use tokio_stream::{self as stream, StreamExt};
///
/// let mut stream = stream::iter(vec![17, 19]);
///
Expand All @@ -35,6 +36,7 @@ where
{
Iter {
iter: i.into_iter(),
yield_amt: 0,
}
}

Expand All @@ -45,9 +47,18 @@ where
type Item = I::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<I::Item>> {
let coop = ready!(crate::coop::poll_proceed(cx));
coop.made_progress();
Poll::Ready(self.iter.next())
// TODO: add coop back
if self.yield_amt >= 32 {
self.yield_amt = 0;

cx.waker().wake_by_ref();

Poll::Pending
} else {
self.yield_amt += 1;

Poll::Ready(self.iter.next())
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down

0 comments on commit 8efa620

Please sign in to comment.