Skip to content

Commit

Permalink
Merge pull request #2 from paolobarbolini/tokio-stream
Browse files Browse the repository at this point in the history
Use tokio-stream wrapper types
  • Loading branch information
aknuds1 committed Jan 5, 2021
2 parents 2f2206c + a9f2885 commit 7ff37e3
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 42 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ all-features = true

[dependencies]
async-compression = { version = "0.3.7", features = ["brotli", "deflate", "gzip", "tokio"], optional = true }
async-stream = "0.3"
bytes = "1.0"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
headers = "0.3"
Expand All @@ -33,6 +32,7 @@ serde = "1.0"
serde_json = "1.0"
serde_urlencoded = "0.7"
tokio = { version = "1.0", features = ["fs", "sync", "time"] }
tokio-stream = "0.1.1"
tokio-util = { version = "0.6", features = ["io"] }
tracing = { version = "0.1", default-features = false, features = ["log", "std"] }
tracing-futures = { version = "0.2", default-features = false, features = ["std-future"] }
Expand All @@ -50,6 +50,7 @@ tracing-log = "0.1"
serde_derive = "1.0"
handlebars = "3.0.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = { version = "0.1.1", features = ["net"] }
listenfd = "0.3"

[features]
Expand Down
10 changes: 3 additions & 7 deletions examples/sse.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use async_stream::stream;
use futures::StreamExt;
use std::convert::Infallible;
use std::time::Duration;
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
use warp::{sse::ServerSentEvent, Filter};

// create server-sent event
Expand All @@ -17,12 +17,8 @@ async fn main() {
let routes = warp::path("ticks").and(warp::get()).map(|| {
let mut counter: u64 = 0;
// create server event source
let mut interval = interval(Duration::from_secs(1));
let stream = stream! {
while let item = interval.tick().await {
yield item;
}
};
let interval = interval(Duration::from_secs(1));
let stream = IntervalStream::new(interval);
let event_stream = stream.map(move |_| {
counter += 1;
sse_counter(counter)
Expand Down
5 changes: 3 additions & 2 deletions examples/sse_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::{
Arc, Mutex,
};
use tokio::sync::mpsc;
use warp::test;
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::{sse::ServerSentEvent, Filter};

#[tokio::main]
Expand Down Expand Up @@ -86,7 +86,8 @@ fn user_connected(

// Use an unbounded channel to handle buffering and flushing of messages
// to the event source...
let (tx, rx) = test::unbounded_channel_stream();
let (tx, rx) = mpsc::unbounded_channel();
let rx = UnboundedReceiverStream::new(rx);

tx.send(Message::UserId(my_id))
// rx is right above, so this cannot fail
Expand Down
8 changes: 2 additions & 6 deletions examples/unix_socket.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
#![deny(warnings)]

use async_stream::stream;
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;

#[tokio::main]
async fn main() {
pretty_env_logger::init();

let listener = UnixListener::bind("/tmp/warp.sock").unwrap();
let incoming = stream! {
while let item = listener.accept().await {
yield item.map(|item| item.0);
}
};
let incoming = UnixListenerStream::new(listener);
warp::serve(warp::fs::dir("examples/dir"))
.run_incoming(incoming)
.await;
Expand Down
5 changes: 3 additions & 2 deletions examples/websockets_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::{

use futures::{FutureExt, StreamExt};
use tokio::sync::{mpsc, RwLock};
use warp::test;
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::ws::{Message, WebSocket};
use warp::Filter;

Expand Down Expand Up @@ -59,7 +59,8 @@ async fn user_connected(ws: WebSocket, users: Users) {

// Use an unbounded channel to handle buffering and flushing of messages
// to the websocket...
let (tx, rx) = test::unbounded_channel_stream();
let (tx, rx) = mpsc::unbounded_channel();
let rx = UnboundedReceiverStream::new(rx);
tokio::task::spawn(rx.forward(user_ws_tx).map(|result| {
if let Err(e) = result {
eprintln!("websocket send error: {}", e);
Expand Down
10 changes: 3 additions & 7 deletions src/filters/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,8 @@ where
/// use std::convert::Infallible;
/// use futures::StreamExt;
/// use tokio::time::interval;
/// use tokio_stream::wrappers::IntervalStream;
/// use warp::{Filter, Stream, sse::ServerSentEvent};
/// use async_stream::stream;
///
/// // create server-sent event
/// fn sse_counter(counter: u64) -> Result<impl ServerSentEvent, Infallible> {
Expand All @@ -543,12 +543,8 @@ where
/// .and(warp::get())
/// .map(|| {
/// let mut counter: u64 = 0;
/// let mut interval = interval(Duration::from_secs(15));
/// let stream = stream! {
/// while let item = interval.tick().await {
/// yield item;
/// }
/// };
/// let interval = interval(Duration::from_secs(15));
/// let stream = IntervalStream::new(interval);
/// let event_stream = stream.map(move |_| {
/// counter += 1;
/// sse_counter(counter)
Expand Down
25 changes: 8 additions & 17 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,20 @@ use std::pin::Pin;
#[cfg(feature = "websocket")]
use std::task::{self, Poll};

use async_stream::stream;
use bytes::Bytes;
use futures::{future, FutureExt, StreamExt, TryFutureExt};
#[cfg(feature = "websocket")]
use futures::StreamExt;
use futures::{future, FutureExt, TryFutureExt};
use http::{
header::{HeaderName, HeaderValue},
Response,
};
use serde::Serialize;
use serde_json;
use tokio::sync::mpsc::{self, UnboundedSender};
#[cfg(feature = "websocket")]
use tokio::sync::oneshot;
use tokio::sync::{mpsc, oneshot};
#[cfg(feature = "websocket")]
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::filter::Filter;
use crate::reject::IsReject;
Expand Down Expand Up @@ -405,18 +407,6 @@ impl RequestBuilder {
}
}

/// Get an unbounded channel with a stream for the reader.
pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl StreamExt<Item = T>) {
let (tx, mut rx) = mpsc::unbounded_channel();
let stream = stream! {
while let Some(item) = rx.recv().await {
yield item;
}
};

(tx, stream)
}

#[cfg(feature = "websocket")]
impl WsBuilder {
/// Sets the request path of this builder.
Expand Down Expand Up @@ -494,7 +484,8 @@ impl WsBuilder {
F::Error: IsReject + Send,
{
let (upgraded_tx, upgraded_rx) = oneshot::channel();
let (wr_tx, wr_rx) = unbounded_channel_stream();
let (wr_tx, wr_rx) = mpsc::unbounded_channel();
let wr_rx = UnboundedReceiverStream::new(wr_rx);
let (rd_tx, rd_rx) = mpsc::unbounded_channel();

tokio::spawn(async move {
Expand Down

0 comments on commit 7ff37e3

Please sign in to comment.