Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Hyper 1.0 & Axum 0.7 #1670

Open
wants to merge 54 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
998d8fe
Update deps, hyper 1.0, axum 0.7
allan2 Dec 7, 2023
296d656
Update imports to use util crates
allan2 Dec 7, 2023
25b81b1
Use Axum Request and Response in transport
allan2 Dec 7, 2023
bef18d0
Chop out AddrStream
allan2 Dec 7, 2023
cfba192
Replace AddrIncoming with TcpListener
allan2 Dec 7, 2023
0bf3ddb
Update impl of http_body::Body
allan2 Dec 7, 2023
e6d46b7
More http2 method renames
allan2 Dec 7, 2023
e28856e
Use BodyExt trait
allan2 Dec 7, 2023
47a6d3e
Remove `hyper::Error::is_connect`
allan2 Dec 7, 2023
47100aa
Add Clone to Extensions::insert<T>
allan2 Dec 7, 2023
39a12cf
Use TcpStream instead of hyper connect
Ludea Dec 19, 2023
dbcea0d
add poll_frame, delete poll_trailers
Ludea Dec 19, 2023
fdd3ab5
Fixed incoming
ikrivosheev Jan 12, 2024
8bc8374
Fixed
ikrivosheev Jan 13, 2024
3f3c002
Fixes
ikrivosheev Jan 13, 2024
3053c12
Handle Frame<> in polling body
alexrudy Mar 31, 2024
47ac1d2
Fix transport service
alexrudy Mar 31, 2024
4248428
Finish implementation of hyper-v1 connections
alexrudy Mar 31, 2024
7b291ff
Upgrade TLS
alexrudy Mar 31, 2024
f8cc5a4
Implement rudimentary serve() function
alexrudy Mar 31, 2024
82923c0
Use tonic::body::BoxBody
alexrudy Mar 31, 2024
76a8ac3
Tests compile with hyper-1.0
alexrudy Mar 31, 2024
47b7ded
Upgrade interop to hyper-1.0
alexrudy Mar 31, 2024
d0840b3
Expose tonic hyper service adaptor
alexrudy Mar 31, 2024
606be38
Fix tonic-web and tonic-web integration tests
alexrudy Mar 31, 2024
cb3d3fe
Fix examples
alexrudy Mar 31, 2024
de0429e
Reduce warnings
alexrudy Apr 1, 2024
a00872c
Fixup streaming request processing for http_body v1
alexrudy Apr 1, 2024
9dcb2c6
Adjust trailer handling in grpc-web call
alexrudy Apr 2, 2024
0272ed8
Handle encoding in the presence of trailers
alexrudy Apr 2, 2024
f829ba9
Fix trailer handling in EncodeBody
alexrudy Apr 2, 2024
6b24d4e
Revert GRPC web fixes to account for GRPC-web header
alexrudy Apr 2, 2024
18c7f91
Implement graceful shutdown
alexrudy Apr 2, 2024
2118998
Refactor trailer processing in decoder
alexrudy Apr 2, 2024
ebab6e7
Set SO_NONBLOCKING on TCP Listerner sockets
alexrudy Apr 2, 2024
8f4b352
Improve graceful shutdown with draining
alexrudy Apr 2, 2024
1091bfa
Fixup doctests, get to passing
alexrudy Apr 3, 2024
01c055d
Use socket2 to support SO_NODELAY and SO_KEEPALIVE on incoming connec…
alexrudy Apr 3, 2024
c421779
Implement http2_only in Server
alexrudy Apr 3, 2024
fea3663
Remove tokio::pin
alexrudy Apr 3, 2024
f386954
Work around Cargo Issue 10788
alexrudy Apr 13, 2024
6eaba61
Enable tokio/macros for transport feature
alexrudy Apr 13, 2024
4485ca2
Switch to ring for tls
alexrudy Apr 13, 2024
57b433e
Guard hyper_util::rt::TokioIo in tls feature
alexrudy Apr 13, 2024
6e4682b
Do not discard trailers when checking streaming response status
alexrudy Apr 13, 2024
f5f508a
Remove unused deps in tonic-web
alexrudy Apr 13, 2024
d76b42f
Update generated protobufs
alexrudy Apr 16, 2024
e24fcd8
Update nightly for udeps compatibility with axum 0.7
alexrudy Apr 16, 2024
f64d02c
Remove example dependency on aws-lc-rs
alexrudy Apr 16, 2024
a8e45db
Fix broken intra-doc links
alexrudy Apr 16, 2024
16608ea
Fix missing doc links
alexrudy Apr 16, 2024
3c013ea
Fix respons stream to only return a single error.
alexrudy Apr 16, 2024
9b306af
Fix unused dependencies and tls configuration in connector.rs
alexrudy Apr 26, 2024
5a579e6
[h2c] example should use serve_connection_with_upgrades
alexrudy May 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Expand Up @@ -54,7 +54,7 @@ jobs:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@master
with:
toolchain: nightly-2024-02-06
toolchain: nightly-2024-04-16
- uses: taiki-e/install-action@cargo-hack
- uses: taiki-e/install-action@cargo-udeps
- name: Install protoc
Expand Down
136 changes: 101 additions & 35 deletions examples/Cargo.toml
Expand Up @@ -149,15 +149,15 @@ path = "src/interceptor/client.rs"
name = "interceptor-server"
path = "src/interceptor/server.rs"

[[bin]]
name = "hyper-warp-client"
path = "src/hyper_warp/client.rs"
required-features = ["hyper-warp"]
# [[bin]]
# name = "hyper-warp-client"
# path = "src/hyper_warp/client.rs"
# required-features = ["hyper-warp"]

[[bin]]
name = "hyper-warp-server"
path = "src/hyper_warp/server.rs"
required-features = ["hyper-warp"]
# [[bin]]
# name = "hyper-warp-server"
# path = "src/hyper_warp/server.rs"
# required-features = ["hyper-warp"]

[[bin]]
name = "health-server"
Expand All @@ -178,15 +178,15 @@ required-features = ["autoreload"]
name = "optional-server"
path = "src/optional/server.rs"

[[bin]]
name = "hyper-warp-multiplex-client"
path = "src/hyper_warp_multiplex/client.rs"
required-features = ["hyper-warp-multiplex"]
# [[bin]]
# name = "hyper-warp-multiplex-client"
# path = "src/hyper_warp_multiplex/client.rs"
# required-features = ["hyper-warp-multiplex"]

[[bin]]
name = "hyper-warp-multiplex-server"
path = "src/hyper_warp_multiplex/server.rs"
required-features = ["hyper-warp-multiplex"]
# [[bin]]
# name = "hyper-warp-multiplex-server"
# path = "src/hyper_warp_multiplex/server.rs"
# required-features = ["hyper-warp-multiplex"]

[[bin]]
name = "compression-server"
Expand Down Expand Up @@ -287,30 +287,84 @@ path = "src/codec_buffers/client.rs"

[features]
gcp = ["dep:prost-types", "tonic/tls"]
routeguide = ["dep:async-stream", "tokio-stream", "dep:rand", "dep:serde", "dep:serde_json"]
routeguide = [
"dep:async-stream",
"tokio-stream",
"dep:rand",
"dep:serde",
"dep:serde_json",
]
reflection = ["dep:tonic-reflection"]
autoreload = ["tokio-stream/net", "dep:listenfd"]
health = ["dep:tonic-health"]
grpc-web = ["dep:tonic-web", "dep:bytes", "dep:http", "dep:hyper", "dep:tracing-subscriber", "dep:tower"]
grpc-web = [
"dep:tonic-web",
"dep:bytes",
"dep:http",
"dep:hyper",
"dep:hyper-util",
"dep:tracing-subscriber",
"dep:tower",
]
tracing = ["dep:tracing", "dep:tracing-subscriber"]
hyper-warp = ["dep:either", "dep:tower", "dep:hyper", "dep:http", "dep:http-body", "dep:warp"]
hyper-warp-multiplex = ["hyper-warp"]
uds = ["tokio-stream/net", "dep:tower", "dep:hyper"]
# hyper-warp = [
# "dep:either",
# "dep:tower",
# "dep:hyper",
# "dep:http",
# "dep:http-body",
# "dep:warp",
# ]
# hyper-warp-multiplex = ["hyper-warp"]
uds = ["tokio-stream/net", "dep:tower", "dep:hyper", "dep:hyper-util"]
streaming = ["tokio-stream", "dep:h2"]
mock = ["tokio-stream", "dep:tower"]
tower = ["dep:hyper", "dep:tower", "dep:http"]
mock = ["tokio-stream", "dep:tower", "dep:hyper-util"]
tower = ["dep:hyper", "dep:hyper-util", "dep:tower", "dep:http"]
json-codec = ["dep:serde", "dep:serde_json", "dep:bytes"]
compression = ["tonic/gzip"]
tls = ["tonic/tls"]
tls-rustls = ["dep:hyper", "dep:hyper-rustls", "dep:tower", "tower-http/util", "tower-http/add-extension", "dep:rustls-pemfile", "dep:tokio-rustls"]
tls-rustls = [
"dep:hyper",
"dep:hyper-util",
"dep:hyper-rustls",
"dep:tower",
"tower-http/util",
"tower-http/add-extension",
"dep:rustls-pemfile",
"dep:tokio-rustls",
]
dynamic-load-balance = ["dep:tower"]
timeout = ["tokio/time", "dep:tower"]
tls-client-auth = ["tonic/tls"]
types = ["dep:tonic-types"]
h2c = ["dep:hyper", "dep:tower", "dep:http"]
h2c = ["dep:hyper", "dep:tower", "dep:http", "dep:hyper-util"]
cancellation = ["dep:tokio-util"]

full = ["gcp", "routeguide", "reflection", "autoreload", "health", "grpc-web", "tracing", "hyper-warp", "hyper-warp-multiplex", "uds", "streaming", "mock", "tower", "json-codec", "compression", "tls", "tls-rustls", "dynamic-load-balance", "timeout", "tls-client-auth", "types", "cancellation", "h2c"]
full = [
"gcp",
"routeguide",
"reflection",
"autoreload",
"health",
"grpc-web",
"tracing",
# "hyper-warp",
# "hyper-warp-multiplex",
"uds",
"streaming",
"mock",
"tower",
"json-codec",
"compression",
"tls",
"tls-rustls",
"dynamic-load-balance",
"timeout",
"tls-client-auth",
"types",
"cancellation",
"h2c",
]
default = ["full"]

[dependencies]
Expand All @@ -332,19 +386,31 @@ rand = { version = "0.8", optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = { version = "1.0", optional = true }
tracing = { version = "0.1.16", optional = true }
tracing-subscriber = { version = "0.3", features = ["tracing-log", "fmt"], optional = true }
tracing-subscriber = { version = "0.3", features = [
"tracing-log",
"fmt",
], optional = true }
prost-types = { version = "0.12", optional = true }
http = { version = "0.2", optional = true }
http-body = { version = "0.4.2", optional = true }
hyper = { version = "0.14", optional = true }
warp = { version = "0.3.4", default-features = false, optional = true }
http = { version = "1", optional = true }
http-body = { version = "1", optional = true }
http-body-util = { version = "0.1", optional = true }
hyper = { version = "1", optional = true }
hyper-util = { version = "0.1", optional = true }
# warp = { version = "0.3.4", default-features = false, optional = true }
listenfd = { version = "1.0", optional = true }
bytes = { version = "1", optional = true }
h2 = { version = "0.3", optional = true }
tokio-rustls = { version = "0.24.0", optional = true }
hyper-rustls = { version = "0.24.0", features = ["http2"], optional = true }
rustls-pemfile = { version = "1", optional = true }
tower-http = { version = "0.4", optional = true }
tokio-rustls = { version = "0.26.0", optional = true, features = [
"ring",
"tls12",
], default-features = false }
hyper-rustls = { version = "0.27.0", features = [
"http2",
"ring",
"tls12",
], optional = true, default-features = false }
rustls-pemfile = { version = "2.1.1", optional = true }
tower-http = { version = "0.5", optional = true }

[build-dependencies]
tonic-build = { path = "../tonic-build", features = ["prost"] }
3 changes: 2 additions & 1 deletion examples/src/grpc-web/client.rs
@@ -1,4 +1,5 @@
use hello_world::{greeter_client::GreeterClient, HelloRequest};
use hyper_util::rt::TokioExecutor;
use tonic_web::GrpcWebClientLayer;

pub mod hello_world {
Expand All @@ -8,7 +9,7 @@ pub mod hello_world {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Must use hyper directly...
let client = hyper::Client::builder().build_http();
let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build_http();

let svc = tower::ServiceBuilder::new()
.layer(GrpcWebClientLayer::new())
Expand Down
29 changes: 17 additions & 12 deletions examples/src/h2c/client.rs
@@ -1,7 +1,8 @@
use hello_world::greeter_client::GreeterClient;
use hello_world::HelloRequest;
use http::Uri;
use hyper::Client;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;

pub mod hello_world {
tonic::include_proto!("helloworld");
Expand All @@ -11,7 +12,7 @@ pub mod hello_world {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let origin = Uri::from_static("http://[::1]:50051");
let h2c_client = h2c::H2cChannel {
client: Client::new(),
client: Client::builder(TokioExecutor::new()).build_http(),
};

let mut client = GreeterClient::with_origin(h2c_client, origin);
Expand All @@ -33,16 +34,20 @@ mod h2c {
task::{Context, Poll},
};

use hyper::{client::HttpConnector, Client};
use tonic::body::BoxBody;
use hyper::body::Incoming;
use hyper_util::{
client::legacy::{connect::HttpConnector, Client},
rt::TokioExecutor,
};
use tonic::body::{empty_body, BoxBody};
use tower::Service;

pub struct H2cChannel {
pub client: Client<HttpConnector>,
pub client: Client<HttpConnector, BoxBody>,
}

impl Service<http::Request<BoxBody>> for H2cChannel {
type Response = http::Response<hyper::Body>;
type Response = http::Response<Incoming>;
type Error = hyper::Error;
type Future =
Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
Expand All @@ -60,7 +65,7 @@ mod h2c {
let h2c_req = hyper::Request::builder()
.uri(origin)
.header(http::header::UPGRADE, "h2c")
.body(hyper::Body::empty())
.body(empty_body())
.unwrap();

let res = client.request(h2c_req).await.unwrap();
Expand All @@ -72,11 +77,11 @@ mod h2c {
let upgraded_io = hyper::upgrade::on(res).await.unwrap();

// In an ideal world you would somehow cache this connection
let (mut h2_client, conn) = hyper::client::conn::Builder::new()
.http2_only(true)
.handshake(upgraded_io)
.await
.unwrap();
let (mut h2_client, conn) =
hyper::client::conn::http2::Builder::new(TokioExecutor::new())
.handshake(upgraded_io)
.await
.unwrap();
tokio::spawn(conn);

h2_client.send_request(request).await
Expand Down
55 changes: 37 additions & 18 deletions examples/src/h2c/server.rs
@@ -1,8 +1,14 @@
use std::net::SocketAddr;

use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server::conn::auto::Builder;
use hyper_util::service::TowerToHyperService;
use tokio::net::TcpListener;
// use tonic::transport::server::TowerToHyperService;
use tonic::{transport::Server, Request, Response, Status};

use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloReply, HelloRequest};
use tower::make::Shared;

pub mod hello_world {
tonic::include_proto!("helloworld");
Expand All @@ -28,28 +34,45 @@ impl Greeter for MyGreeter {

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();
let addr: SocketAddr = "[::1]:50051".parse().unwrap();
let greeter = MyGreeter::default();

println!("GreeterServer listening on {}", addr);

let incoming = TcpListener::bind(addr).await?;
let svc = Server::builder()
.add_service(GreeterServer::new(greeter))
.into_router();

let h2c = h2c::H2c { s: svc };

let server = hyper::Server::bind(&addr).serve(Shared::new(h2c));
server.await.unwrap();

Ok(())
loop {
match incoming.accept().await {
Ok((io, _)) => {
let router = h2c.clone();
tokio::spawn(async move {
let builder = Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(
TokioIo::new(io),
TowerToHyperService::new(router),
);
let _ = conn.await;
});
}
Err(e) => {
eprintln!("Error accepting connection: {}", e);
}
}
}
}

mod h2c {
use std::pin::Pin;

use http::{Request, Response};
use hyper::Body;
use hyper::body::Incoming;
use hyper_util::{rt::TokioExecutor, service::TowerToHyperService};
use tonic::{body::empty_body, transport::AxumBoxBody};
use tower::Service;

#[derive(Clone)]
Expand All @@ -59,17 +82,14 @@ mod h2c {

type BoxError = Box<dyn std::error::Error + Send + Sync>;

impl<S> Service<Request<Body>> for H2c<S>
impl<S> Service<Request<Incoming>> for H2c<S>
where
S: Service<Request<Body>, Response = Response<tonic::transport::AxumBoxBody>>
+ Clone
+ Send
+ 'static,
S: Service<Request<Incoming>, Response = Response<AxumBoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Sync + Send + 'static,
S::Response: Send + 'static,
{
type Response = hyper::Response<Body>;
type Response = hyper::Response<tonic::body::BoxBody>;
type Error = hyper::Error;
type Future =
Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
Expand All @@ -81,20 +101,19 @@ mod h2c {
std::task::Poll::Ready(Ok(()))
}

fn call(&mut self, mut req: hyper::Request<Body>) -> Self::Future {
fn call(&mut self, mut req: hyper::Request<Incoming>) -> Self::Future {
let svc = self.s.clone();
Box::pin(async move {
tokio::spawn(async move {
let upgraded_io = hyper::upgrade::on(&mut req).await.unwrap();

hyper::server::conn::Http::new()
.http2_only(true)
.serve_connection(upgraded_io, svc)
hyper::server::conn::http2::Builder::new(TokioExecutor::new())
.serve_connection(upgraded_io, TowerToHyperService::new(svc))
.await
.unwrap();
});

let mut res = hyper::Response::new(hyper::Body::empty());
let mut res = hyper::Response::new(empty_body());
*res.status_mut() = http::StatusCode::SWITCHING_PROTOCOLS;
res.headers_mut().insert(
hyper::header::UPGRADE,
Expand Down
2 changes: 2 additions & 0 deletions examples/src/hyper_warp/client.rs
@@ -1,3 +1,5 @@
//! IMPORTANT: warp is not compatible with hyper-1.0, this example will not
//! compile until warp is updated to support hyper-1.0.
//! To hit the gRPC endpoint you must run this client via:
//! `cargo run --bin hyper-warp-client
//! To hit the warp server you can run this command:
Expand Down