Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Hyper 1.x #1090

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ async-compression = { version = "0.4.5", features = ["tokio"], optional = true }
bytes = "1.0"
futures-util = { version = "0.3", default-features = false, features = ["sink"] }
futures-channel = { version = "0.3.17", features = ["sink"]}
headers = "0.3.5"
http = "0.2"
hyper = { version = "0.14", features = ["stream", "server", "http1", "http2", "tcp", "client"] }
headers = "0.4.0"
http = "1"
hyper = { version = "1", features = ["server", "http1", "http2", "client"] }
hyper-util = "0.1.2"
http-body = "1"
http-body-util = "0.1.0"
log = "0.4"
mime = "0.3"
mime_guess = "2.0.0"
multer = { version = "2.1.0", optional = true }
multer = { version = "3.0.0", optional = true }
scoped-tls = "1.0"
serde = "1.0"
serde_json = "1.0"
Expand Down
6 changes: 1 addition & 5 deletions src/filter/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,8 @@ where
type Error = Infallible;
type Future = FilteredFuture<F::Future>;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

#[inline]
fn call(&mut self, req: Request) -> Self::Future {
fn call(&self, req: Request) -> Self::Future {
self.call_with_addr(req, None)
}
}
Expand Down
20 changes: 11 additions & 9 deletions src/filters/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use bytes::{Buf, Bytes};
use futures_util::{future, ready, Stream, TryFutureExt};
use headers::ContentLength;
use http::header::CONTENT_TYPE;
use hyper::Body;
use hyper::body::Incoming;
use mime;
use serde::de::DeserializeOwned;
use serde_json;
Expand All @@ -22,10 +22,10 @@ use crate::reject::{self, Rejection};

type BoxError = Box<dyn StdError + Send + Sync>;

// Extracts the `Body` Stream from the route.
// Extracts the `Incoming` Stream from the route.
//
// Does not consume any of it.
pub(crate) fn body() -> impl Filter<Extract = (Body,), Error = Rejection> + Copy {
pub(crate) fn body() -> impl Filter<Extract = (Incoming,), Error = Rejection> + Copy {
filter_fn_one(|route| {
future::ready(route.take_body().ok_or_else(|| {
tracing::error!("request body already taken in previous filter");
Expand Down Expand Up @@ -79,7 +79,7 @@ pub fn content_length_limit(limit: u64) -> impl Filter<Extract = (), Error = Rej
pub fn stream(
) -> impl Filter<Extract = (impl Stream<Item = Result<impl Buf, crate::Error>>,), Error = Rejection> + Copy
{
body().map(|body: Body| BodyStream { body })
body().map(|body: Incoming| BodyStream { body })
}

/// Returns a `Filter` that matches any request and extracts a `Future` of a
Expand All @@ -106,8 +106,9 @@ pub fn stream(
/// });
/// ```
pub fn bytes() -> impl Filter<Extract = (Bytes,), Error = Rejection> + Copy {
body().and_then(|body: hyper::Body| {
hyper::body::to_bytes(body).map_err(|err| {
body().and_then(|body: Incoming| {
use http_body_util::BodyExt;
body.collect().map_ok(|b| b.to_bytes()).map_err(|err| {
tracing::debug!("to_bytes error: {}", err);
reject::known(BodyReadError(err))
})
Expand Down Expand Up @@ -144,8 +145,9 @@ pub fn bytes() -> impl Filter<Extract = (Bytes,), Error = Rejection> + Copy {
/// .map(full_body);
/// ```
pub fn aggregate() -> impl Filter<Extract = (impl Buf,), Error = Rejection> + Copy {
body().and_then(|body: ::hyper::Body| {
hyper::body::aggregate(body).map_err(|err| {
body().and_then(|body: Incoming| {
use http_body_util::BodyExt;
body.collect().map_ok(|b| b.aggregate()).map_err(|err| {
tracing::debug!("aggregate error: {}", err);
reject::known(BodyReadError(err))
})
Expand Down Expand Up @@ -291,7 +293,7 @@ fn is_content_type<D: Decode>() -> impl Filter<Extract = (), Error = Rejection>
// ===== BodyStream =====

struct BodyStream {
body: Body,
body: Incoming,
}

impl Stream for BodyStream {
Expand Down
24 changes: 12 additions & 12 deletions src/filters/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use async_compression::tokio::bufread::{DeflateEncoder, GzipEncoder};

use http::header::HeaderValue;
use hyper::{
body::Incoming,
header::{CONTENT_ENCODING, CONTENT_LENGTH},
Body,
};
use tokio_util::io::{ReaderStream, StreamReader};

Expand Down Expand Up @@ -69,7 +69,7 @@ pub struct Compression<F> {
#[cfg(feature = "compression-gzip")]
pub fn gzip() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
let func = move |mut props: CompressionProps| {
let body = Body::wrap_stream(ReaderStream::new(GzipEncoder::new(StreamReader::new(
let body = Incoming::wrap_stream(ReaderStream::new(GzipEncoder::new(StreamReader::new(
props.body,
))));
props
Expand Down Expand Up @@ -98,9 +98,9 @@ pub fn gzip() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
#[cfg(feature = "compression-gzip")]
pub fn deflate() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
let func = move |mut props: CompressionProps| {
let body = Body::wrap_stream(ReaderStream::new(DeflateEncoder::new(StreamReader::new(
props.body,
))));
let body = Incoming::wrap_stream(ReaderStream::new(DeflateEncoder::new(
StreamReader::new(props.body),
)));
props
.head
.headers
Expand All @@ -127,7 +127,7 @@ pub fn deflate() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
#[cfg(feature = "compression-brotli")]
pub fn brotli() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
let func = move |mut props: CompressionProps| {
let body = Body::wrap_stream(ReaderStream::new(BrotliEncoder::new(StreamReader::new(
let body = Incoming::wrap_stream(ReaderStream::new(BrotliEncoder::new(StreamReader::new(
props.body,
))));
props
Expand Down Expand Up @@ -164,7 +164,7 @@ mod internal {

use bytes::Bytes;
use futures_util::{ready, Stream, TryFuture};
use hyper::Body;
use hyper::body::Incoming;
use pin_project::pin_project;

use crate::filter::{Filter, FilterBase, Internal};
Expand Down Expand Up @@ -201,21 +201,21 @@ mod internal {
}
}

impl From<Body> for CompressableBody<Body, hyper::Error> {
fn from(body: Body) -> Self {
impl From<Incoming> for CompressableBody<Incoming, hyper::Error> {
fn from(body: Incoming) -> Self {
CompressableBody { body }
}
}

/// Compression Props
#[derive(Debug)]
pub struct CompressionProps {
pub(super) body: CompressableBody<Body, hyper::Error>,
pub(super) body: CompressableBody<Incoming, hyper::Error>,
pub(super) head: http::response::Parts,
}

impl From<http::Response<Body>> for CompressionProps {
fn from(resp: http::Response<Body>) -> Self {
impl From<http::Response<Incoming>> for CompressionProps {
fn from(resp: http::Response<Incoming>) -> Self {
let (head, body) = resp.into_parts();
CompressionProps {
body: body.into(),
Expand Down
10 changes: 5 additions & 5 deletions src/filters/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use headers::{
IfUnmodifiedSince, LastModified, Range,
};
use http::StatusCode;
use hyper::Body;
use hyper::body::Incoming;
use mime_guess;
use percent_encoding::percent_decode_str;
use tokio::fs::File as TkFile;
Expand Down Expand Up @@ -162,7 +162,7 @@ impl Conditionals {
precondition
);
if !precondition {
let mut res = Response::new(Body::empty());
let mut res = Response::new(Incoming::empty());
*res.status_mut() = StatusCode::PRECONDITION_FAILED;
return Cond::NoBody(res);
}
Expand All @@ -179,7 +179,7 @@ impl Conditionals {
// no last_modified means its always modified
.unwrap_or(false);
if unmodified {
let mut res = Response::new(Body::empty());
let mut res = Response::new(Incoming::empty());
*res.status_mut() = StatusCode::NOT_MODIFIED;
return Cond::NoBody(res);
}
Expand Down Expand Up @@ -318,7 +318,7 @@ fn file_conditional(
let sub_len = end - start;
let buf_size = optimal_buf_size(&meta);
let stream = file_stream(file, buf_size, (start, end));
let body = Body::wrap_stream(stream);
let body = Incoming::wrap_stream(stream);

let mut resp = Response::new(body);

Expand All @@ -345,7 +345,7 @@ fn file_conditional(
})
.unwrap_or_else(|BadRange| {
// bad byte range
let mut resp = Response::new(Body::empty());
let mut resp = Response::new(Incoming::empty());
*resp.status_mut() = StatusCode::RANGE_NOT_SATISFIABLE;
resp.headers_mut()
.typed_insert(ContentRange::unsatisfied_bytes(len));
Expand Down
4 changes: 2 additions & 2 deletions src/filters/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{fmt, io};
use bytes::{Buf, Bytes};
use futures_util::{future, Stream};
use headers::ContentType;
use hyper::Body;
use hyper::body::Incoming;
use mime::Mime;
use multer::{Field as PartInner, Multipart as FormDataInner};

Expand Down Expand Up @@ -200,7 +200,7 @@ impl Stream for PartStream {
}
}

struct BodyIoError(Body);
struct BodyIoError(Incoming);

impl Stream for BodyIoError {
type Item = io::Result<Bytes>;
Expand Down
4 changes: 2 additions & 2 deletions src/filters/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use std::time::Duration;

use futures_util::{future, Stream, TryStream, TryStreamExt};
use http::header::{HeaderValue, CACHE_CONTROL, CONTENT_TYPE};
use hyper::Body;
use hyper::body::Incoming;
use pin_project::pin_project;
use serde_json::{self, Error};
use tokio::time::{self, Sleep};
Expand Down Expand Up @@ -340,7 +340,7 @@ where
.into_stream()
.and_then(|event| future::ready(Ok(event.to_string())));

let mut res = Response::new(Body::wrap_stream(body_stream));
let mut res = Response::new(Incoming::wrap_stream(body_stream));
// Set appropriate content type
res.headers_mut()
.insert(CONTENT_TYPE, HeaderValue::from_static("text/event-stream"));
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![deny(missing_docs)]
#![deny(missing_debug_implementations)]
#![deny(rust_2018_idioms)]
#![cfg_attr(test, deny(warnings))]
// #![cfg_attr(test, deny(warnings))]

//! # warp
//!
Expand Down Expand Up @@ -176,4 +176,4 @@ pub use bytes::Buf;
pub use futures_util::{Future, Sink, Stream};
#[doc(hidden)]

pub(crate) type Request = http::Request<hyper::Body>;
pub(crate) type Request = http::Request<hyper::body::Incoming>;
9 changes: 5 additions & 4 deletions src/reject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use http::{
header::{HeaderValue, CONTENT_TYPE},
StatusCode,
};
use hyper::Body;
use hyper::body::Incoming;

pub(crate) use self::sealed::{CombineRejection, IsReject};

Expand Down Expand Up @@ -443,7 +443,7 @@ impl Rejections {
fn into_response(&self) -> crate::reply::Response {
match *self {
Rejections::Known(ref e) => {
let mut res = http::Response::new(Body::from(e.to_string()));
let mut res = http::Response::new(Incoming::from(e.to_string()));
*res.status_mut() = self.status();
res.headers_mut().insert(
CONTENT_TYPE,
Expand All @@ -457,7 +457,7 @@ impl Rejections {
e
);
let body = format!("Unhandled rejection: {:?}", e);
let mut res = http::Response::new(Body::from(body));
let mut res = http::Response::new(Incoming::from(body));
*res.status_mut() = self.status();
res.headers_mut().insert(
CONTENT_TYPE,
Expand Down Expand Up @@ -800,8 +800,9 @@ mod tests {
}

async fn response_body_string(resp: crate::reply::Response) -> String {
use http_body_util::BodyExt;
let (_, body) = resp.into_parts();
let body_bytes = hyper::body::to_bytes(body).await.expect("failed concat");
let body_bytes = body.collect().await.expect("failed concat").to_bytes();
String::from_utf8_lossy(&body_bytes).to_string()
}

Expand Down