Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to tokio 1.0, bytes 1.0 #1076

Merged
merged 12 commits into from Dec 30, 2020
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Expand Up @@ -191,7 +191,7 @@ jobs:

strategy:
matrix:
rust: [1.39.0]
rust: [1.45.2]

steps:
- name: Checkout
Expand Down
44 changes: 22 additions & 22 deletions Cargo.toml
Expand Up @@ -28,7 +28,7 @@ default = ["default-tls"]

# Note: this doesn't enable the 'native-tls' feature, which adds specific
# functionality for it.
default-tls = ["hyper-tls", "native-tls-crate", "__tls", "tokio-tls"]
default-tls = ["hyper-tls", "native-tls-crate", "__tls", "tokio-native-tls"]

# Enables native-tls specific functionality not available by default.
native-tls = ["default-tls"]
Expand All @@ -39,13 +39,13 @@ rustls-tls-manual-roots = ["__rustls"]
rustls-tls-webpki-roots = ["webpki-roots", "__rustls"]
rustls-tls-native-roots = ["rustls-native-certs", "__rustls"]

blocking = ["futures-util/io", "tokio/rt-threaded", "tokio/rt-core", "tokio/sync"]
blocking = ["futures-util/io", "tokio/rt-multi-thread", "tokio/sync"]

cookies = ["cookie_crate", "cookie_store", "time"]

gzip = ["async-compression", "async-compression/gzip"]
gzip = ["async-compression", "async-compression/gzip", "tokio-util"]

brotli = ["async-compression", "async-compression/brotli"]
brotli = ["async-compression", "async-compression/brotli", "tokio-util"]

json = ["serde_json"]

Expand All @@ -71,7 +71,7 @@ __internal_proxy_sys_no_cache = []
[dependencies]
http = "0.2"
url = "2.2"
bytes = "0.5"
bytes = "1.0"
serde = "1.0"
serde_urlencoded = "0.7"
mime_guess = "2.0"
Expand All @@ -83,53 +83,53 @@ base64 = "0.13"
encoding_rs = "0.8"
futures-core = { version = "0.3.0", default-features = false }
futures-util = { version = "0.3.0", default-features = false }
http-body = "0.3.0"
hyper = { version = "0.13.4", default-features = false, features = ["tcp"] }
http-body = "0.4.0"
hyper = { version = "0.14", default-features = false, features = ["tcp", "http1", "http2", "client"] }
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
lazy_static = "1.4"
log = "0.4"
mime = "0.3.7"
percent-encoding = "2.1"
tokio = { version = "0.2.5", default-features = false, features = ["tcp", "time"] }
tokio = { version = "1.0", default-features = false, features = ["net", "time"] }
pin-project-lite = "0.2.0"
ipnet = "2.3"

# Optional deps...

## default-tls
hyper-tls = { version = "0.4", optional = true }
hyper-tls = { version = "0.5", optional = true }
native-tls-crate = { version = "0.2", optional = true, package = "native-tls" }
tokio-tls = { version = "0.3.0", optional = true }
tokio-native-tls = { version = "0.3.0", optional = true }

# rustls-tls
hyper-rustls = { version = "0.21", default-features = false, optional = true }
rustls = { version = "0.18", features = ["dangerous_configuration"], optional = true }
tokio-rustls = { version = "0.14", optional = true }
webpki-roots = { version = "0.20", optional = true }
rustls-native-certs = { version = "0.4", optional = true }
hyper-rustls = { version = "0.22.1", default-features = false, optional = true }
rustls = { version = "0.19", features = ["dangerous_configuration"], optional = true }
tokio-rustls = { version = "0.22", optional = true }
webpki-roots = { version = "0.21", optional = true }
rustls-native-certs = { version = "0.5", optional = true }

## cookies
cookie_crate = { version = "0.14", package = "cookie", optional = true }
cookie_store = { version = "0.12", optional = true }
time = { version = "0.2.11", optional = true }

## compression
async-compression = { version = "0.3.0", default-features = false, features = ["stream"], optional = true }

async-compression = { version = "0.3.7", default-features = false, features = ["tokio"], optional = true }
tokio-util = { version = "0.6.0", default-features = false, features = ["codec", "io"], optional = true }

## socks
tokio-socks = { version = "0.3", optional = true }
tokio-socks = { version = "0.5", optional = true }

## trust-dns
trust-dns-resolver = { version = "0.19", optional = true }
trust-dns-resolver = { version = "0.20", optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
env_logger = "0.7"
hyper = { version = "0.13", default-features = false, features = ["tcp", "stream"] }
env_logger = "0.8"
hyper = { version = "0.14", default-features = false, features = ["tcp", "stream", "http1", "http2", "client", "server"] }
serde = { version = "1.0", features = ["derive"] }
libflate = "1.0"
brotli_crate = { package = "brotli", version = "3.3.0" }
doc-comment = "0.3"
tokio = { version = "0.2.0", default-features = false, features = ["macros"] }
tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread"] }

[target.'cfg(windows)'.dependencies]
winreg = "0.7"
Expand Down
8 changes: 4 additions & 4 deletions src/async_impl/body.rs
Expand Up @@ -7,7 +7,7 @@ use bytes::Bytes;
use futures_core::Stream;
use http_body::Body as HttpBody;
use pin_project_lite::pin_project;
use tokio::time::Delay;
use tokio::time::Sleep;

/// An asynchronous request body.
pub struct Body {
Expand All @@ -27,7 +27,7 @@ enum Inner {
+ Sync,
>,
>,
timeout: Option<Delay>,
timeout: Option<Pin<Box<Sleep>>>,
},
}

Expand Down Expand Up @@ -103,7 +103,7 @@ impl Body {
}
}

pub(crate) fn response(body: hyper::Body, timeout: Option<Delay>) -> Body {
pub(crate) fn response(body: hyper::Body, timeout: Option<Pin<Box<Sleep>>>) -> Body {
Body {
inner: Inner::Streaming {
body: Box::pin(WrapHyper(body)),
Expand Down Expand Up @@ -217,7 +217,7 @@ impl HttpBody for ImplStream {
ref mut timeout,
} => {
if let Some(ref mut timeout) = timeout {
if let Poll::Ready(()) = Pin::new(timeout).poll(cx) {
if let Poll::Ready(()) = timeout.as_mut().poll(cx) {
return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
}
}
Expand Down
23 changes: 5 additions & 18 deletions src/async_impl/client.rs
Expand Up @@ -26,7 +26,7 @@ use rustls::RootCertStore;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::Delay;
use tokio::time::Sleep;
use pin_project_lite::pin_project;

use log::debug;
Expand Down Expand Up @@ -96,7 +96,6 @@ struct Config {
#[cfg(feature = "__tls")]
tls: TlsBackend,
http2_only: bool,
http1_writev: Option<bool>,
http1_title_case_headers: bool,
http2_initial_stream_window_size: Option<u32>,
http2_initial_connection_window_size: Option<u32>,
Expand Down Expand Up @@ -151,7 +150,6 @@ impl ClientBuilder {
#[cfg(feature = "__tls")]
tls: TlsBackend::default(),
http2_only: false,
http1_writev: None,
http1_title_case_headers: false,
http2_initial_stream_window_size: None,
http2_initial_connection_window_size: None,
Expand Down Expand Up @@ -316,10 +314,6 @@ impl ClientBuilder {
builder.http2_only(true);
}

if let Some(http1_writev) = config.http1_writev {
builder.http1_writev(http1_writev);
}

if let Some(http2_initial_stream_window_size) = config.http2_initial_stream_window_size {
builder.http2_initial_stream_window_size(http2_initial_stream_window_size);
}
Expand Down Expand Up @@ -655,14 +649,6 @@ impl ClientBuilder {
self
}

/// Force hyper to use either queued(if true), or flattened(if false) write strategy
/// This may eliminate unnecessary cloning of buffers for some TLS backends
/// By default hyper will try to guess which strategy to use
pub fn http1_writev(mut self, writev: bool) -> ClientBuilder {
self.config.http1_writev = Some(writev);
self
}

/// Only use HTTP/2.
pub fn http2_prior_knowledge(mut self) -> ClientBuilder {
self.config.http2_only = true;
Expand Down Expand Up @@ -1103,7 +1089,8 @@ impl Client {

let timeout = timeout
.or(self.inner.request_timeout)
.map(tokio::time::delay_for);
.map(tokio::time::sleep)
.map(Box::pin);

*req.headers_mut() = headers.clone();

Expand Down Expand Up @@ -1317,7 +1304,7 @@ pin_project! {
#[pin]
in_flight: ResponseFuture,
#[pin]
timeout: Option<Delay>,
timeout: Option<Pin<Box<Sleep>>>,
}
}

Expand All @@ -1326,7 +1313,7 @@ impl PendingRequest {
self.project().in_flight
}

fn timeout(self: Pin<&mut Self>) -> Pin<&mut Option<Delay>> {
fn timeout(self: Pin<&mut Self>) -> Pin<&mut Option<Pin<Box<Sleep>>>> {
self.project().timeout
}

Expand Down
21 changes: 13 additions & 8 deletions src/async_impl/decoder.rs
Expand Up @@ -4,17 +4,22 @@ use std::pin::Pin;
use std::task::{Context, Poll};

#[cfg(feature = "gzip")]
use async_compression::stream::GzipDecoder;
use async_compression::tokio::bufread::GzipDecoder;

#[cfg(feature = "brotli")]
use async_compression::stream::BrotliDecoder;
use async_compression::tokio::bufread::BrotliDecoder;

use bytes::Bytes;
use futures_core::Stream;
use futures_util::stream::Peekable;
use http::HeaderMap;
use hyper::body::HttpBody;

#[cfg(any(feature = "gzip", feature = "brotli"))]
use tokio_util::io::StreamReader;
#[cfg(any(feature = "gzip", feature = "brotli"))]
use tokio_util::codec::{BytesCodec, FramedRead};

use super::super::Body;
use crate::error;

Expand All @@ -39,11 +44,11 @@ enum Inner {

/// A `Gzip` decoder will uncompress the gzipped response content before returning it.
#[cfg(feature = "gzip")]
Gzip(GzipDecoder<Peekable<IoStream>>),
Gzip(FramedRead<GzipDecoder<StreamReader<Peekable<IoStream>, Bytes>>, BytesCodec>),

/// A `Brotli` decoder will uncompress the brotlied response content before returning it.
#[cfg(feature = "brotli")]
Brotli(BrotliDecoder<Peekable<IoStream>>),
Brotli(FramedRead<BrotliDecoder<StreamReader<Peekable<IoStream>, Bytes>>, BytesCodec>),
messense marked this conversation as resolved.
Show resolved Hide resolved

/// A decoder that doesn't have a value yet.
#[cfg(any(feature = "brotli", feature = "gzip"))]
Expand Down Expand Up @@ -229,15 +234,15 @@ impl Stream for Decoder {
#[cfg(feature = "gzip")]
Inner::Gzip(ref mut decoder) => {
return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))),
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => Poll::Ready(None),
};
}
#[cfg(feature = "brotli")]
Inner::Brotli(ref mut decoder) => {
return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))),
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => Poll::Ready(None),
};
Expand Down Expand Up @@ -302,9 +307,9 @@ impl Future for Pending {

match self.1 {
#[cfg(feature = "brotli")]
DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(BrotliDecoder::new(_body)))),
DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(FramedRead::new(BrotliDecoder::new(StreamReader::new(_body)), BytesCodec::new())))),
#[cfg(feature = "gzip")]
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(_body)))),
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(FramedRead::new(GzipDecoder::new(StreamReader::new(_body)), BytesCodec::new())))),
}
}
}
Expand Down
18 changes: 3 additions & 15 deletions src/async_impl/multipart.rs
Expand Up @@ -521,11 +521,7 @@ mod tests {
fn form_empty() {
let form = Form::new();

let mut rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.expect("new rt");
let rt = runtime::Builder::new_current_thread().enable_all().build().expect("new rt");
let body = form.stream().into_stream();
let s = body.map_ok(|try_c| try_c.to_vec()).try_concat();

Expand Down Expand Up @@ -572,11 +568,7 @@ mod tests {
--boundary\r\n\
Content-Disposition: form-data; name=\"key3\"; filename=\"filename\"\r\n\r\n\
value3\r\n--boundary--\r\n";
let mut rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.expect("new rt");
let rt = runtime::Builder::new_current_thread().enable_all().build().expect("new rt");
let body = form.stream().into_stream();
let s = body.map(|try_c| try_c.map(|r| r.to_vec())).try_concat();

Expand All @@ -603,11 +595,7 @@ mod tests {
\r\n\
value2\r\n\
--boundary--\r\n";
let mut rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.expect("new rt");
let rt = runtime::Builder::new_current_thread().enable_all().build().expect("new rt");
let body = form.stream().into_stream();
let s = body.map(|try_c| try_c.map(|r| r.to_vec())).try_concat();

Expand Down
5 changes: 3 additions & 2 deletions src/async_impl/response.rs
@@ -1,6 +1,7 @@
use std::borrow::Cow;
use std::fmt;
use std::net::SocketAddr;
use std::pin::Pin;

use bytes::Bytes;
use encoding_rs::{Encoding, UTF_8};
Expand All @@ -12,7 +13,7 @@ use mime::Mime;
use serde::de::DeserializeOwned;
#[cfg(feature = "json")]
use serde_json;
use tokio::time::Delay;
use tokio::time::Sleep;
use url::Url;

use super::body::Body;
Expand All @@ -37,7 +38,7 @@ impl Response {
res: hyper::Response<hyper::Body>,
url: Url,
accepts: Accepts,
timeout: Option<Delay>,
timeout: Option<Pin<Box<Sleep>>>,
) -> Response {
let (parts, body) = res.into_parts();
let status = parts.status;
Expand Down
7 changes: 4 additions & 3 deletions src/blocking/body.rs
Expand Up @@ -2,10 +2,11 @@ use std::fmt;
use std::fs::File;
use std::future::Future;
use std::io::{self, Cursor, Read};
use std::mem::{self, MaybeUninit};
use std::mem;
use std::ptr;

use bytes::Bytes;
use bytes::buf::UninitSlice;

use crate::async_impl;

Expand Down Expand Up @@ -289,14 +290,14 @@ async fn send_future(sender: Sender) -> Result<(), crate::Error> {
if buf.remaining_mut() == 0 {
buf.reserve(8192);
// zero out the reserved memory
let uninit = buf.chunk_mut();
unsafe {
let uninit = mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut());
ptr::write_bytes(uninit.as_mut_ptr(), 0, uninit.len());
}
}

let bytes = unsafe {
mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut())
mem::transmute::<&mut UninitSlice, &mut [u8]>(buf.chunk_mut())
};
match body.read(bytes) {
Ok(0) => {
Expand Down