Skip to content

Commit

Permalink
Update to tokio 1.0, bytes 1.0 (seanmonstar#1076)
Browse files Browse the repository at this point in the history
Co-authored-by: Wim Looman <git@nemo157.com>
Co-authored-by: Paolo Barbolini <paolo@paolo565.org>
  • Loading branch information
3 people authored and pfernie committed Jan 5, 2021
1 parent 4141c85 commit 9167080
Show file tree
Hide file tree
Showing 16 changed files with 173 additions and 219 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Expand Up @@ -191,7 +191,7 @@ jobs:

strategy:
matrix:
rust: [1.39.0]
rust: [1.45.2]

steps:
- name: Checkout
Expand Down
44 changes: 22 additions & 22 deletions Cargo.toml
Expand Up @@ -28,7 +28,7 @@ default = ["default-tls"]

# Note: this doesn't enable the 'native-tls' feature, which adds specific
# functionality for it.
default-tls = ["hyper-tls", "native-tls-crate", "__tls", "tokio-tls"]
default-tls = ["hyper-tls", "native-tls-crate", "__tls", "tokio-native-tls"]

# Enables native-tls specific functionality not available by default.
native-tls = ["default-tls"]
Expand All @@ -39,13 +39,13 @@ rustls-tls-manual-roots = ["__rustls"]
rustls-tls-webpki-roots = ["webpki-roots", "__rustls"]
rustls-tls-native-roots = ["rustls-native-certs", "__rustls"]

blocking = ["futures-util/io", "tokio/rt-threaded", "tokio/rt-core", "tokio/sync"]
blocking = ["futures-util/io", "tokio/rt-multi-thread", "tokio/sync"]

cookies = ["cookie_crate", "cookie_store", "time"]

gzip = ["async-compression", "async-compression/gzip"]
gzip = ["async-compression", "async-compression/gzip", "tokio-util"]

brotli = ["async-compression", "async-compression/brotli"]
brotli = ["async-compression", "async-compression/brotli", "tokio-util"]

json = ["serde_json"]

Expand All @@ -71,7 +71,7 @@ __internal_proxy_sys_no_cache = []
[dependencies]
http = "0.2"
url = "2.2"
bytes = "0.5"
bytes = "1.0"
serde = "1.0"
serde_urlencoded = "0.7"
mime_guess = "2.0"
Expand All @@ -83,53 +83,53 @@ base64 = "0.13"
encoding_rs = "0.8"
futures-core = { version = "0.3.0", default-features = false }
futures-util = { version = "0.3.0", default-features = false }
http-body = "0.3.0"
hyper = { version = "0.13.4", default-features = false, features = ["tcp"] }
http-body = "0.4.0"
hyper = { version = "0.14", default-features = false, features = ["tcp", "http1", "http2", "client"] }
lazy_static = "1.4"
log = "0.4"
mime = "0.3.7"
percent-encoding = "2.1"
tokio = { version = "0.2.5", default-features = false, features = ["tcp", "time"] }
tokio = { version = "1.0", default-features = false, features = ["net", "time"] }
pin-project-lite = "0.2.0"
ipnet = "2.3"

# Optional deps...

## default-tls
hyper-tls = { version = "0.4", optional = true }
hyper-tls = { version = "0.5", optional = true }
native-tls-crate = { version = "0.2", optional = true, package = "native-tls" }
tokio-tls = { version = "0.3.0", optional = true }
tokio-native-tls = { version = "0.3.0", optional = true }

# rustls-tls
hyper-rustls = { version = "0.21", default-features = false, optional = true }
rustls = { version = "0.18", features = ["dangerous_configuration"], optional = true }
tokio-rustls = { version = "0.14", optional = true }
webpki-roots = { version = "0.20", optional = true }
rustls-native-certs = { version = "0.4", optional = true }
hyper-rustls = { version = "0.22.1", default-features = false, optional = true }
rustls = { version = "0.19", features = ["dangerous_configuration"], optional = true }
tokio-rustls = { version = "0.22", optional = true }
webpki-roots = { version = "0.21", optional = true }
rustls-native-certs = { version = "0.5", optional = true }

## cookies
cookie_crate = { version = "0.14", package = "cookie", optional = true }
cookie_store = { version = "0.12", optional = true }
time = { version = "0.2.11", optional = true }

## compression
async-compression = { version = "0.3.0", default-features = false, features = ["stream"], optional = true }

async-compression = { version = "0.3.7", default-features = false, features = ["tokio"], optional = true }
tokio-util = { version = "0.6.0", default-features = false, features = ["codec", "io"], optional = true }

## socks
tokio-socks = { version = "0.3", optional = true }
tokio-socks = { version = "0.5", optional = true }

## trust-dns
trust-dns-resolver = { version = "0.19", optional = true }
trust-dns-resolver = { version = "0.20", optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
env_logger = "0.7"
hyper = { version = "0.13", default-features = false, features = ["tcp", "stream"] }
env_logger = "0.8"
hyper = { version = "0.14", default-features = false, features = ["tcp", "stream", "http1", "http2", "client", "server"] }
serde = { version = "1.0", features = ["derive"] }
libflate = "1.0"
brotli_crate = { package = "brotli", version = "3.3.0" }
doc-comment = "0.3"
tokio = { version = "0.2.0", default-features = false, features = ["macros"] }
tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread"] }

[target.'cfg(windows)'.dependencies]
winreg = "0.7"
Expand Down
8 changes: 4 additions & 4 deletions src/async_impl/body.rs
Expand Up @@ -7,7 +7,7 @@ use bytes::Bytes;
use futures_core::Stream;
use http_body::Body as HttpBody;
use pin_project_lite::pin_project;
use tokio::time::Delay;
use tokio::time::Sleep;

/// An asynchronous request body.
pub struct Body {
Expand All @@ -27,7 +27,7 @@ enum Inner {
+ Sync,
>,
>,
timeout: Option<Delay>,
timeout: Option<Pin<Box<Sleep>>>,
},
}

Expand Down Expand Up @@ -103,7 +103,7 @@ impl Body {
}
}

pub(crate) fn response(body: hyper::Body, timeout: Option<Delay>) -> Body {
pub(crate) fn response(body: hyper::Body, timeout: Option<Pin<Box<Sleep>>>) -> Body {
Body {
inner: Inner::Streaming {
body: Box::pin(WrapHyper(body)),
Expand Down Expand Up @@ -217,7 +217,7 @@ impl HttpBody for ImplStream {
ref mut timeout,
} => {
if let Some(ref mut timeout) = timeout {
if let Poll::Ready(()) = Pin::new(timeout).poll(cx) {
if let Poll::Ready(()) = timeout.as_mut().poll(cx) {
return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
}
}
Expand Down
23 changes: 5 additions & 18 deletions src/async_impl/client.rs
Expand Up @@ -24,7 +24,7 @@ use rustls::RootCertStore;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::Delay;
use tokio::time::Sleep;
use pin_project_lite::pin_project;

use log::debug;
Expand Down Expand Up @@ -94,7 +94,6 @@ struct Config {
#[cfg(feature = "__tls")]
tls: TlsBackend,
http2_only: bool,
http1_writev: Option<bool>,
http1_title_case_headers: bool,
http2_initial_stream_window_size: Option<u32>,
http2_initial_connection_window_size: Option<u32>,
Expand Down Expand Up @@ -149,7 +148,6 @@ impl ClientBuilder {
#[cfg(feature = "__tls")]
tls: TlsBackend::default(),
http2_only: false,
http1_writev: None,
http1_title_case_headers: false,
http2_initial_stream_window_size: None,
http2_initial_connection_window_size: None,
Expand Down Expand Up @@ -314,10 +312,6 @@ impl ClientBuilder {
builder.http2_only(true);
}

if let Some(http1_writev) = config.http1_writev {
builder.http1_writev(http1_writev);
}

if let Some(http2_initial_stream_window_size) = config.http2_initial_stream_window_size {
builder.http2_initial_stream_window_size(http2_initial_stream_window_size);
}
Expand Down Expand Up @@ -652,14 +646,6 @@ impl ClientBuilder {
self
}

/// Force hyper to use either queued(if true), or flattened(if false) write strategy
/// This may eliminate unnecessary cloning of buffers for some TLS backends
/// By default hyper will try to guess which strategy to use
pub fn http1_writev(mut self, writev: bool) -> ClientBuilder {
self.config.http1_writev = Some(writev);
self
}

/// Only use HTTP/2.
pub fn http2_prior_knowledge(mut self) -> ClientBuilder {
self.config.http2_only = true;
Expand Down Expand Up @@ -1099,7 +1085,8 @@ impl Client {

let timeout = timeout
.or(self.inner.request_timeout)
.map(tokio::time::delay_for);
.map(tokio::time::sleep)
.map(Box::pin);

*req.headers_mut() = headers.clone();

Expand Down Expand Up @@ -1313,7 +1300,7 @@ pin_project! {
#[pin]
in_flight: ResponseFuture,
#[pin]
timeout: Option<Delay>,
timeout: Option<Pin<Box<Sleep>>>,
}
}

Expand All @@ -1322,7 +1309,7 @@ impl PendingRequest {
self.project().in_flight
}

fn timeout(self: Pin<&mut Self>) -> Pin<&mut Option<Delay>> {
fn timeout(self: Pin<&mut Self>) -> Pin<&mut Option<Pin<Box<Sleep>>>> {
self.project().timeout
}

Expand Down
21 changes: 13 additions & 8 deletions src/async_impl/decoder.rs
Expand Up @@ -4,17 +4,22 @@ use std::pin::Pin;
use std::task::{Context, Poll};

#[cfg(feature = "gzip")]
use async_compression::stream::GzipDecoder;
use async_compression::tokio::bufread::GzipDecoder;

#[cfg(feature = "brotli")]
use async_compression::stream::BrotliDecoder;
use async_compression::tokio::bufread::BrotliDecoder;

use bytes::Bytes;
use futures_core::Stream;
use futures_util::stream::Peekable;
use http::HeaderMap;
use hyper::body::HttpBody;

#[cfg(any(feature = "gzip", feature = "brotli"))]
use tokio_util::io::StreamReader;
#[cfg(any(feature = "gzip", feature = "brotli"))]
use tokio_util::codec::{BytesCodec, FramedRead};

use super::super::Body;
use crate::error;

Expand All @@ -39,11 +44,11 @@ enum Inner {

/// A `Gzip` decoder will uncompress the gzipped response content before returning it.
#[cfg(feature = "gzip")]
Gzip(GzipDecoder<Peekable<IoStream>>),
Gzip(FramedRead<GzipDecoder<StreamReader<Peekable<IoStream>, Bytes>>, BytesCodec>),

/// A `Brotli` decoder will uncompress the brotlied response content before returning it.
#[cfg(feature = "brotli")]
Brotli(BrotliDecoder<Peekable<IoStream>>),
Brotli(FramedRead<BrotliDecoder<StreamReader<Peekable<IoStream>, Bytes>>, BytesCodec>),

/// A decoder that doesn't have a value yet.
#[cfg(any(feature = "brotli", feature = "gzip"))]
Expand Down Expand Up @@ -229,15 +234,15 @@ impl Stream for Decoder {
#[cfg(feature = "gzip")]
Inner::Gzip(ref mut decoder) => {
return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))),
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => Poll::Ready(None),
};
}
#[cfg(feature = "brotli")]
Inner::Brotli(ref mut decoder) => {
return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))),
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => Poll::Ready(None),
};
Expand Down Expand Up @@ -302,9 +307,9 @@ impl Future for Pending {

match self.1 {
#[cfg(feature = "brotli")]
DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(BrotliDecoder::new(_body)))),
DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(FramedRead::new(BrotliDecoder::new(StreamReader::new(_body)), BytesCodec::new())))),
#[cfg(feature = "gzip")]
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(_body)))),
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(FramedRead::new(GzipDecoder::new(StreamReader::new(_body)), BytesCodec::new())))),
}
}
}
Expand Down
18 changes: 3 additions & 15 deletions src/async_impl/multipart.rs
Expand Up @@ -521,11 +521,7 @@ mod tests {
fn form_empty() {
let form = Form::new();

let mut rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.expect("new rt");
let rt = runtime::Builder::new_current_thread().enable_all().build().expect("new rt");
let body = form.stream().into_stream();
let s = body.map_ok(|try_c| try_c.to_vec()).try_concat();

Expand Down Expand Up @@ -572,11 +568,7 @@ mod tests {
--boundary\r\n\
Content-Disposition: form-data; name=\"key3\"; filename=\"filename\"\r\n\r\n\
value3\r\n--boundary--\r\n";
let mut rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.expect("new rt");
let rt = runtime::Builder::new_current_thread().enable_all().build().expect("new rt");
let body = form.stream().into_stream();
let s = body.map(|try_c| try_c.map(|r| r.to_vec())).try_concat();

Expand All @@ -603,11 +595,7 @@ mod tests {
\r\n\
value2\r\n\
--boundary--\r\n";
let mut rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.expect("new rt");
let rt = runtime::Builder::new_current_thread().enable_all().build().expect("new rt");
let body = form.stream().into_stream();
let s = body.map(|try_c| try_c.map(|r| r.to_vec())).try_concat();

Expand Down
5 changes: 3 additions & 2 deletions src/async_impl/response.rs
@@ -1,6 +1,7 @@
use std::borrow::Cow;
use std::fmt;
use std::net::SocketAddr;
use std::pin::Pin;

use bytes::Bytes;
use encoding_rs::{Encoding, UTF_8};
Expand All @@ -12,7 +13,7 @@ use mime::Mime;
use serde::de::DeserializeOwned;
#[cfg(feature = "json")]
use serde_json;
use tokio::time::Delay;
use tokio::time::Sleep;
use url::Url;

use super::body::Body;
Expand All @@ -37,7 +38,7 @@ impl Response {
res: hyper::Response<hyper::Body>,
url: Url,
accepts: Accepts,
timeout: Option<Delay>,
timeout: Option<Pin<Box<Sleep>>>,
) -> Response {
let (parts, body) = res.into_parts();
let status = parts.status;
Expand Down
7 changes: 4 additions & 3 deletions src/blocking/body.rs
Expand Up @@ -2,10 +2,11 @@ use std::fmt;
use std::fs::File;
use std::future::Future;
use std::io::{self, Cursor, Read};
use std::mem::{self, MaybeUninit};
use std::mem;
use std::ptr;

use bytes::Bytes;
use bytes::buf::UninitSlice;

use crate::async_impl;

Expand Down Expand Up @@ -289,14 +290,14 @@ async fn send_future(sender: Sender) -> Result<(), crate::Error> {
if buf.remaining_mut() == 0 {
buf.reserve(8192);
// zero out the reserved memory
let uninit = buf.chunk_mut();
unsafe {
let uninit = mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut());
ptr::write_bytes(uninit.as_mut_ptr(), 0, uninit.len());
}
}

let bytes = unsafe {
mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut())
mem::transmute::<&mut UninitSlice, &mut [u8]>(buf.chunk_mut())
};
match body.read(bytes) {
Ok(0) => {
Expand Down

0 comments on commit 9167080

Please sign in to comment.