Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Tokio 1.0 #753

Merged
merged 36 commits into from Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e3c4c70
Upgrade to Tokio 0.3
janpetschexain Nov 20, 2020
fc71e96
Fix uds test
aknuds1 Nov 29, 2020
610f3dc
Merge remote-tracking branch 'seanmonstar/master' into chore/upgrade-…
aknuds1 Dec 6, 2020
d4c9344
Merge branch 'master' into chore/upgrade-tokio
jxs Dec 9, 2020
6308e90
Switch to tokio-tungstenite v0.12
aknuds1 Dec 10, 2020
2de7e08
Upgrade to tokio 1.0
aknuds1 Dec 24, 2020
305c888
Depend on forked tokio-tungstenite
aknuds1 Dec 24, 2020
b476d81
Upgrade tokio-tungstenite
aknuds1 Dec 24, 2020
fa8a9e8
Fix build failures
aknuds1 Dec 24, 2020
5d068ae
Update src/filters/body.rs
aknuds1 Dec 24, 2020
f242193
Fix build failures
aknuds1 Dec 24, 2020
3f038da
Update src/test.rs
aknuds1 Dec 24, 2020
2dd9bcf
Switch to async-compression 0.3.7
aknuds1 Dec 24, 2020
7bfc760
Fix build failures
aknuds1 Dec 24, 2020
9722b25
Fix build failures
aknuds1 Dec 24, 2020
7f97567
Upgrade to tokio 1.0
aknuds1 Dec 25, 2020
127fff2
Fix Unix socket example
aknuds1 Dec 25, 2020
79df1c5
Fix test
aknuds1 Dec 25, 2020
2df48ca
Fix websocket upgrading
aknuds1 Dec 25, 2020
f6508fa
Use poll_read_buf from tokio-util
aknuds1 Dec 26, 2020
3c1d0ad
Move dependency in Cargo.toml
aknuds1 Dec 28, 2020
2f2206c
Fix tokio-tungstenite dependency
aknuds1 Dec 31, 2020
a9f2885
Use tokio-stream wrapper types
paolobarbolini Jan 5, 2021
7ff37e3
Merge pull request #2 from paolobarbolini/tokio-stream
aknuds1 Jan 5, 2021
f98e363
Add route at `/` in routing example (#771)
hamza1311 Jan 2, 2021
6326d05
Fix typo in routing example (#772)
teenjuna Jan 3, 2021
b9ef973
Merge remote-tracking branch 'seanmonstar/master' into chore/upgrade-…
aknuds1 Jan 7, 2021
33ede73
Upgrade to latest tokio-tungstenite
aknuds1 Jan 9, 2021
e50c375
Use ReaderStream instead of FramedRead
aknuds1 Jan 10, 2021
d8bfb02
Fix style
aknuds1 Jan 10, 2021
e989869
Merge remote-tracking branch 'seanmonstar/master' into chore/upgrade-…
aknuds1 Jan 12, 2021
4d92fa8
CI: Remove net job
aknuds1 Jan 12, 2021
657f1c6
Remove client feature for non-dev hyper dependency
aknuds1 Jan 12, 2021
bb2f511
Simplify websocket filter
aknuds1 Jan 12, 2021
5ac7a62
Reintroduce dependency on hyper's client feature
aknuds1 Jan 12, 2021
b095a6d
Merge remote-tracking branch 'seanmonstar/master' into chore/upgrade-…
aknuds1 Jan 15, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Expand Up @@ -45,8 +45,8 @@ jobs:
benches: true
- build: tls
features: "--features tls"
- build: uds
features: "--features tokio/uds"
- build: net
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
features: "--features tokio/net"
- build: no-default-features
features: "--no-default-features"
- build: compression
Expand Down
15 changes: 7 additions & 8 deletions Cargo.toml
Expand Up @@ -17,12 +17,12 @@ edition = "2018"
all-features = true

[dependencies]
async-compression = { version = "0.3.1", features = ["brotli", "deflate", "gzip", "stream"], optional = true }
bytes = "0.5"
async-compression = { git = "https://github.com/aknuds1/async-compression", rev = "e4d903b8ff9972f056f714c61bd9d0f3321a4463", features = ["brotli", "deflate", "gzip", "stream"], optional = true }
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved
bytes = "0.6"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
headers = "0.3"
http = "0.2"
hyper = { version = "0.13", features = ["stream"] }
hyper = { git = "https://github.com/hyperium/hyper.git", rev = "1ba2a141a6f8736446ff4a0111df347c0dc66f6c", features = ["stream", "server", "http1", "http2", "tcp", "client"] }
log = "0.4"
mime = "0.3"
mime_guess = "2.0.0"
Expand All @@ -31,23 +31,23 @@ scoped-tls = "1.0"
serde = "1.0"
serde_json = "1.0"
serde_urlencoded = "0.7"
tokio = { version = "0.2", features = ["fs", "stream", "sync", "time"] }
tokio = { version = "0.3", features = ["fs", "stream", "sync", "time"] }
tracing = { version = "0.1", default-features = false, features = ["log", "std"] }
tracing-futures = { version = "0.2", default-features = false, features = ["std-future"] }
tower-service = "0.3"
# tls is enabled by default, we don't want that yet
tokio-tungstenite = { version = "0.11", default-features = false, optional = true }
tokio-tungstenite = { git = "https://github.com/snapview/tokio-tungstenite.git", rev = "71a5b72123db32b318d48964948fc76c943f1548", default-features = false, optional = true }
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved
percent-encoding = "2.1"
pin-project = "1.0"
tokio-rustls = { version = "0.14", optional = true }
tokio-rustls = { version = "0.21", optional = true }

[dev-dependencies]
pretty_env_logger = "0.4"
tracing-subscriber = "0.2.7"
tracing-log = "0.1"
serde_derive = "1.0"
handlebars = "3.0.0"
tokio = { version = "0.2", features = ["macros"] }
tokio = { version = "0.3", features = ["macros", "rt-multi-thread"] }
listenfd = "0.3"

[features]
Expand Down Expand Up @@ -78,7 +78,6 @@ required-features = ["compression"]

[[example]]
name = "unix_socket"
required-features = ["tokio/uds"]

[[example]]
name = "websockets"
Expand Down
2 changes: 1 addition & 1 deletion examples/futures.rs
Expand Up @@ -16,7 +16,7 @@ async fn main() {
}

async fn sleepy(Seconds(seconds): Seconds) -> Result<impl warp::Reply, Infallible> {
tokio::time::delay_for(Duration::from_secs(seconds)).await;
tokio::time::sleep(Duration::from_secs(seconds)).await;
Ok(format!("I waited {} seconds!", seconds))
}

Expand Down
5 changes: 2 additions & 3 deletions examples/unix_socket.rs
Expand Up @@ -6,9 +6,8 @@ use tokio::net::UnixListener;
async fn main() {
pretty_env_logger::init();

let mut listener = UnixListener::bind("/tmp/warp.sock").unwrap();
let incoming = listener.incoming();
let listener = UnixListener::bind("/tmp/warp.sock").unwrap();
warp::serve(warp::fs::dir("examples/dir"))
.run_incoming(incoming)
.run_incoming(listener)
.await;
}
2 changes: 1 addition & 1 deletion src/filters/body.rs
Expand Up @@ -7,7 +7,7 @@ use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::{buf::BufExt, Buf, Bytes};
use bytes::{Buf, Bytes};
use futures::{future, ready, Stream, TryFutureExt};
use headers::ContentLength;
use http::header::CONTENT_TYPE;
Expand Down
39 changes: 35 additions & 4 deletions src/filters/fs.rs
Expand Up @@ -5,12 +5,13 @@ use std::convert::Infallible;
use std::fs::Metadata;
use std::future::Future;
use std::io;
use std::mem::MaybeUninit;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::task::{Context, Poll};

use bytes::{Bytes, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use futures::future::Either;
use futures::{future, ready, stream, FutureExt, Stream, StreamExt, TryFutureExt};
use headers::{
Expand All @@ -22,7 +23,7 @@ use hyper::Body;
use mime_guess;
use percent_encoding::percent_decode_str;
use tokio::fs::File as TkFile;
use tokio::io::AsyncRead;
use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};

use crate::filter::{Filter, FilterClone, One};
use crate::reject::{self, Rejection};
Expand Down Expand Up @@ -419,7 +420,7 @@ fn file_stream(
}
reserve_at_least(&mut buf, buf_size);

let n = match ready!(Pin::new(&mut f).poll_read_buf(cx, &mut buf)) {
let n = match ready!(poll_read_buf(Pin::new(&mut f), cx, &mut buf)) {
Ok(n) => n as u64,
Err(err) => {
tracing::debug!("file read error: {}", err);
Expand Down Expand Up @@ -524,3 +525,33 @@ mod tests {
assert_eq!(buf.capacity(), cap);
}
}

fn poll_read_buf<T: AsyncRead, B: BufMut>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

let n = {
let dst = buf.bytes_mut();
let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
jxs marked this conversation as resolved.
Show resolved Hide resolved
let mut buf = ReadBuf::uninit(dst);
let ptr = buf.filled().as_ptr();
ready!(io.poll_read(cx, &mut buf)?);

// Ensure the pointer does not change from under us
assert_eq!(ptr, buf.filled().as_ptr());
buf.filled().len()
};

// Safety: This is guaranteed to be the number of initialized (and read)
// bytes due to the invariants provided by `ReadBuf::filled`.
unsafe {
buf.advance_mut(n);
}

Poll::Ready(Ok(n))
}
8 changes: 4 additions & 4 deletions src/filters/sse.rs
Expand Up @@ -54,7 +54,7 @@ use hyper::Body;
use pin_project::pin_project;
use serde::Serialize;
use serde_json;
use tokio::time::{self, Delay};
use tokio::time::{self, Sleep};

use self::sealed::{
BoxedServerSentEvent, EitherServerSentEvent, SseError, SseField, SseFormat, SseWrapper,
Expand Down Expand Up @@ -467,7 +467,7 @@ impl KeepAlive {
S::Ok: ServerSentEvent + Send,
S::Error: StdError + Send + Sync + 'static,
{
let alive_timer = time::delay_for(self.max_interval);
let alive_timer = time::sleep(self.max_interval);
SseKeepAlive {
event_stream,
comment_text: self.comment_text,
Expand All @@ -484,7 +484,7 @@ struct SseKeepAlive<S> {
event_stream: S,
comment_text: Cow<'static, str>,
max_interval: Duration,
alive_timer: Delay,
alive_timer: Sleep,
}

#[doc(hidden)]
Expand All @@ -505,7 +505,7 @@ where
let max_interval = keep_interval
.into()
.unwrap_or_else(|| Duration::from_secs(15));
let alive_timer = time::delay_for(max_interval);
let alive_timer = time::sleep(max_interval);
SseKeepAlive {
event_stream,
comment_text: Cow::Borrowed(""),
Expand Down
5 changes: 1 addition & 4 deletions src/filters/ws.rs
Expand Up @@ -134,10 +134,7 @@ where
fn into_response(self) -> Response {
let on_upgrade = self.on_upgrade;
let config = self.ws.config;
let fut = self
.ws
.body
.on_upgrade()
let fut = hyper::upgrade::on(Response::new(self.ws.body))
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved
.and_then(move |upgraded| {
tracing::trace!("websocket upgrade complete");
WebSocket::from_raw_socket(upgraded, protocol::Role::Server, config).map(Ok)
Expand Down
2 changes: 1 addition & 1 deletion src/test.rs
Expand Up @@ -515,7 +515,7 @@ impl WsBuilder {
let upgrade = ::hyper::Client::builder()
.build(AddrConnect(addr))
.request(req)
.and_then(|res| res.into_body().on_upgrade());
.and_then(|res| hyper::upgrade::on(res));

let upgraded = match upgrade.await {
Ok(up) => {
Expand Down
6 changes: 3 additions & 3 deletions src/tls.rs
Expand Up @@ -6,7 +6,7 @@ use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

use futures::ready;
use hyper::server::accept::Accept;
Expand Down Expand Up @@ -295,8 +295,8 @@ impl AsyncRead for TlsStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
let pin = self.get_mut();
match pin.state {
State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) {
Expand Down
6 changes: 3 additions & 3 deletions src/transport.rs
Expand Up @@ -4,7 +4,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use hyper::server::conn::AddrStream;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

pub trait Transport: AsyncRead + AsyncWrite {
fn remote_addr(&self) -> Option<SocketAddr>;
Expand All @@ -22,8 +22,8 @@ impl<T: AsyncRead + Unpin> AsyncRead for LiftIo<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
}
}
Expand Down