Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support hyper v1 #21

Merged
merged 1 commit into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ documentation = "https://docs.rs/hyper-tungstenite"
edition = "2021"

[dependencies]
hyper = { version = "0.14.4" }
http-body-util = "0.1.0"
hyper = { version = "1.0.0" }
hyper-util = { version = "0.1.0", features = ["tokio"] }
pin-project-lite = "0.2.10"
tokio = "1.2.0"
tokio-tungstenite = "0.20.0"
tungstenite = "0.20.0"

[dev-dependencies]
assert2 = "0.3.4"
hyper = { version = "0.14.18", features = ["http1", "server", "tcp"] }
hyper = { version = "1.0.0", features = ["http1", "server"] }
tokio = { version = "1.2.0", features = ["net", "macros", "rt-multi-thread"] }
futures = { version = "0.3.12" }
35 changes: 17 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,32 @@ you can manually inspect the `Connection` and `Upgrade` headers.
## Example
```rust
use futures::{sink::SinkExt, stream::StreamExt};
use hyper::{Body, Request, Response};
use http_body_util::Full;
use hyper::{body::{Bytes, Incoming}, Request, Response};
use hyper_util::rt::TokioIo;
use hyper_tungstenite::{tungstenite, HyperWebsocket};
use std::convert::Infallible;
use tungstenite::Message;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

/// Handle a HTTP or WebSocket request.
async fn handle_request(mut request: Request<Body>) -> Result<Response<Body>, Error> {
async fn handle_request(mut request: Request<Incoming>) -> Result<Response<Full<Bytes>>, Error> {
// Check if the request is a websocket upgrade request.
if hyper_tungstenite::is_upgrade_request(&request) {
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?;

// Spawn a task to handle the websocket connection.
tokio::spawn(async move {
if let Err(e) = serve_websocket(websocket).await {
eprintln!("Error in websocket connection: {}", e);
eprintln!("Error in websocket connection: {e}");
}
});

// Return the response so the spawned future can continue.
Ok(response)
} else {
// Handle regular HTTP requests here.
Ok(Response::new(Body::from("Hello HTTP!")))
Ok(Response::new(Full::<Bytes>::from("Hello HTTP!")))
}
}

Expand All @@ -52,19 +53,19 @@ async fn serve_websocket(websocket: HyperWebsocket) -> Result<(), Error> {
while let Some(message) = websocket.next().await {
match message? {
Message::Text(msg) => {
println!("Received text message: {}", msg);
println!("Received text message: {msg}");
websocket.send(Message::text("Thank you, come again.")).await?;
},
Message::Binary(msg) => {
println!("Received binary message: {:02X?}", msg);
println!("Received binary message: {msg:02X?}");
websocket.send(Message::binary(b"Thank you, come again.".to_vec())).await?;
},
Message::Ping(msg) => {
// No need to send a reply: tungstenite takes care of this for you.
println!("Received ping message: {:02X?}", msg);
println!("Received ping message: {msg:02X?}");
},
Message::Pong(msg) => {
println!("Received pong message: {:02X?}", msg);
println!("Received pong message: {msg:02X?}");
}
Message::Close(msg) => {
// No need to send a reply: tungstenite takes care of this for you.
Expand All @@ -74,8 +75,8 @@ async fn serve_websocket(websocket: HyperWebsocket) -> Result<(), Error> {
println!("Received close message");
}
},
Message::Frame(msg) => {
unreachable!();
Message::Frame(_msg) => {
unreachable!();
}
}
}
Expand All @@ -86,22 +87,20 @@ async fn serve_websocket(websocket: HyperWebsocket) -> Result<(), Error> {
#[tokio::main]
async fn main() -> Result<(), Error> {
let addr: std::net::SocketAddr = "[::1]:3000".parse()?;
println!("Listening on http://{}", addr);
let listener = tokio::net::TcpListener::bind(&addr).await?;
println!("listening on {}", addr);
println!("Listening on http://{addr}");

let mut http = hyper::server::conn::Http::new();
http.http1_only(true);
http.http1_keep_alive(true);
let mut http = hyper::server::conn::http1::Builder::new();
http.keep_alive(true);

loop {
let (stream, _) = listener.accept().await?;
let connection = http
.serve_connection(stream, hyper::service::service_fn(handle_request))
.serve_connection(TokioIo::new(stream), hyper::service::service_fn(handle_request))
.with_upgrades();
tokio::spawn(async move {
if let Err(err) = connection.await {
println!("Error serving HTTP connection: {:?}", err);
println!("Error serving HTTP connection: {err:?}");
}
});
}
Expand Down
15 changes: 8 additions & 7 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use futures::{sink::SinkExt, stream::StreamExt};
use hyper::{Body, Request, Response};
use http_body_util::Full;
use hyper::{body::{Bytes, Incoming}, Request, Response};
use hyper_util::rt::TokioIo;
use hyper_tungstenite::{tungstenite, HyperWebsocket};
use tungstenite::Message;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

/// Handle a HTTP or WebSocket request.
async fn handle_request(mut request: Request<Body>) -> Result<Response<Body>, Error> {
async fn handle_request(mut request: Request<Incoming>) -> Result<Response<Full<Bytes>>, Error> {
// Check if the request is a websocket upgrade request.
if hyper_tungstenite::is_upgrade_request(&request) {
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?;
Expand All @@ -22,7 +24,7 @@ async fn handle_request(mut request: Request<Body>) -> Result<Response<Body>, Er
Ok(response)
} else {
// Handle regular HTTP requests here.
Ok(Response::new(Body::from("Hello HTTP!")))
Ok(Response::new(Full::<Bytes>::from("Hello HTTP!")))
}
}

Expand Down Expand Up @@ -69,14 +71,13 @@ async fn main() -> Result<(), Error> {
let listener = tokio::net::TcpListener::bind(&addr).await?;
println!("Listening on http://{addr}");

let mut http = hyper::server::conn::Http::new();
http.http1_only(true);
http.http1_keep_alive(true);
let mut http = hyper::server::conn::http1::Builder::new();
http.keep_alive(true);

loop {
let (stream, _) = listener.accept().await?;
let connection = http
.serve_connection(stream, hyper::service::service_fn(handle_request))
.serve_connection(TokioIo::new(stream), hyper::service::service_fn(handle_request))
.with_upgrades();
tokio::spawn(async move {
if let Err(err) = connection.await {
Expand Down
48 changes: 25 additions & 23 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,32 @@
//! # Example
//! ```no_run
//! use futures::{sink::SinkExt, stream::StreamExt};
//! use hyper::{Body, Request, Response};
//! use http_body_util::Full;
//! use hyper::{body::{Bytes, Incoming}, Request, Response};
//! use hyper_util::rt::TokioIo;
//! use hyper_tungstenite::{tungstenite, HyperWebsocket};
//! use std::convert::Infallible;
//! use tungstenite::Message;
//!
//! type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
//!
//! /// Handle a HTTP or WebSocket request.
//! async fn handle_request(mut request: Request<Body>) -> Result<Response<Body>, Error> {
//! async fn handle_request(mut request: Request<Incoming>) -> Result<Response<Full<Bytes>>, Error> {
//! // Check if the request is a websocket upgrade request.
//! if hyper_tungstenite::is_upgrade_request(&request) {
//! let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?;
//!
//! // Spawn a task to handle the websocket connection.
//! tokio::spawn(async move {
//! if let Err(e) = serve_websocket(websocket).await {
//! eprintln!("Error in websocket connection: {}", e);
//! eprintln!("Error in websocket connection: {e}");
//! }
//! });
//!
//! // Return the response so the spawned future can continue.
//! Ok(response)
//! } else {
//! // Handle regular HTTP requests here.
//! Ok(Response::new(Body::from("Hello HTTP!")))
//! Ok(Response::new(Full::<Bytes>::from("Hello HTTP!")))
//! }
//! }
//!
Expand All @@ -47,19 +48,19 @@
//! while let Some(message) = websocket.next().await {
//! match message? {
//! Message::Text(msg) => {
//! println!("Received text message: {}", msg);
//! println!("Received text message: {msg}");
//! websocket.send(Message::text("Thank you, come again.")).await?;
//! },
//! Message::Binary(msg) => {
//! println!("Received binary message: {:02X?}", msg);
//! println!("Received binary message: {msg:02X?}");
//! websocket.send(Message::binary(b"Thank you, come again.".to_vec())).await?;
//! },
//! Message::Ping(msg) => {
//! // No need to send a reply: tungstenite takes care of this for you.
//! println!("Received ping message: {:02X?}", msg);
//! println!("Received ping message: {msg:02X?}");
//! },
//! Message::Pong(msg) => {
//! println!("Received pong message: {:02X?}", msg);
//! println!("Received pong message: {msg:02X?}");
//! }
//! Message::Close(msg) => {
//! // No need to send a reply: tungstenite takes care of this for you.
Expand All @@ -69,8 +70,8 @@
//! println!("Received close message");
//! }
//! },
//! Message::Frame(msg) => {
//! unreachable!();
//! Message::Frame(_msg) => {
//! unreachable!();
//! }
//! }
//! }
Expand All @@ -81,29 +82,30 @@
//! #[tokio::main]
//! async fn main() -> Result<(), Error> {
//! let addr: std::net::SocketAddr = "[::1]:3000".parse()?;
//! println!("Listening on http://{}", addr);
//! let listener = tokio::net::TcpListener::bind(&addr).await?;
//! println!("listening on {}", addr);
//! println!("Listening on http://{addr}");
//!
//! let mut http = hyper::server::conn::Http::new();
//! http.http1_only(true);
//! http.http1_keep_alive(true);
//! let mut http = hyper::server::conn::http1::Builder::new();
//! http.keep_alive(true);
//!
//! loop {
//! let (stream, _) = listener.accept().await?;
//! let connection = http
//! .serve_connection(stream, hyper::service::service_fn(handle_request))
//! .serve_connection(TokioIo::new(stream), hyper::service::service_fn(handle_request))
//! .with_upgrades();
//! tokio::spawn(async move {
//! if let Err(err) = connection.await {
//! println!("Error serving HTTP connection: {:?}", err);
//! println!("Error serving HTTP connection: {err:?}");
//! }
//! });
//! }
//! }
//! ```

use hyper::{Body, Request, Response};
use http_body_util::Full;
use hyper::body::Bytes;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use std::task::{Context, Poll};
use std::pin::Pin;
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -144,7 +146,7 @@ pin_project! {
pub fn upgrade<B>(
mut request: impl std::borrow::BorrowMut<Request<B>>,
config: Option<WebSocketConfig>,
) -> Result<(Response<Body>, HyperWebsocket), ProtocolError> {
) -> Result<(Response<Full<Bytes>>, HyperWebsocket), ProtocolError> {
let request = request.borrow_mut();

let key = request.headers().get("Sec-WebSocket-Key")
Expand All @@ -158,7 +160,7 @@ pub fn upgrade<B>(
.header(hyper::header::CONNECTION, "upgrade")
.header(hyper::header::UPGRADE, "websocket")
.header("Sec-WebSocket-Accept", &derive_accept_key(key.as_bytes()))
.body(Body::from("switching to websocket protocol"))
.body(Full::<Bytes>::from("switching to websocket protocol"))
.expect("bug: failed to build response");

let stream = HyperWebsocket {
Expand Down Expand Up @@ -212,7 +214,7 @@ fn trim_end(data: &[u8]) -> &[u8] {
}

impl std::future::Future for HyperWebsocket {
type Output = Result<WebSocketStream<hyper::upgrade::Upgraded>, Error>;
type Output = Result<WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>, Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
Expand All @@ -224,7 +226,7 @@ impl std::future::Future for HyperWebsocket {
let upgraded = upgraded.map_err(|_| Error::Protocol(ProtocolError::HandshakeIncomplete))?;

let stream = WebSocketStream::from_raw_socket(
upgraded,
TokioIo::new(upgraded),
Role::Server,
this.config.take(),
);
Expand Down
21 changes: 11 additions & 10 deletions tests/simple-server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use hyper::{Body, Request, Response};
use hyper::server::Server;
use hyper::service::{service_fn, make_service_fn};
use http_body_util::Full;
use hyper::{Request, Response};
use hyper::body::{Bytes, Incoming};
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use hyper_tungstenite::tungstenite::Error;
use tokio::net::TcpStream;
use std::net::Ipv6Addr;
Expand All @@ -13,16 +15,15 @@ use assert2::{assert, let_assert};
#[tokio::test]
async fn hyper() {
// Bind a TCP listener to an ephemeral port.
let_assert!(Ok(listener) = std::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16)));
let_assert!(Ok(listener) = tokio::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16)).await);
let_assert!(Ok(bind_addr) = listener.local_addr());
let_assert!(Ok(server) = Server::from_tcp(listener));
let server = hyper::server::conn::http1::Builder::new();

// Spawn the server in a task.
tokio::spawn(async move {
let service = make_service_fn(|_conn| async {
Ok::<_, hyper::Error>(service_fn(upgrade_websocket))
});
let_assert!(Ok(()) = server.http1_only(true).serve(service).await);
let service = service_fn(upgrade_websocket);
let_assert!(Ok((stream, _)) = listener.accept().await);
let_assert!(Ok(()) = server.serve_connection(TokioIo::new(stream), service).with_upgrades().await);
});

// Try to create a websocket connection with the server.
Expand All @@ -36,7 +37,7 @@ async fn hyper() {
assert!(let Some(Ok(Message::Close(None))) = stream.next().await);
}

async fn upgrade_websocket(request: Request<Body>) -> Result<Response<Body>> {
async fn upgrade_websocket(request: Request<Incoming>) -> Result<Response<Full<Bytes>>> {
assert!(hyper_tungstenite::is_upgrade_request(&request) == true);

let (response, stream) = hyper_tungstenite::upgrade(request, None)
Expand Down
21 changes: 11 additions & 10 deletions tests/split.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use hyper::{Body, Request, Response};
use hyper::server::Server;
use hyper::service::{service_fn, make_service_fn};
use http_body_util::Full;
use hyper::{Request, Response};
use hyper::body::{Bytes, Incoming};
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use hyper_tungstenite::tungstenite::Error;
use tokio::net::TcpStream;
use std::net::Ipv6Addr;
Expand All @@ -13,16 +15,15 @@ use assert2::{assert, let_assert};
#[tokio::test]
async fn hyper() {
// Bind a TCP listener to an ephemeral port.
let_assert!(Ok(listener) = std::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16)));
let_assert!(Ok(listener) = tokio::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16)).await);
let_assert!(Ok(bind_addr) = listener.local_addr());
let_assert!(Ok(server) = Server::from_tcp(listener));
let server = hyper::server::conn::http1::Builder::new();

// Spawn the server in a task.
tokio::spawn(async move {
let service = make_service_fn(|_conn| async {
Ok::<_, hyper::Error>(service_fn(upgrade_websocket))
});
let_assert!(Ok(()) = server.http1_only(true).serve(service).await);
let service = service_fn(upgrade_websocket);
let_assert!(Ok((stream, _)) = listener.accept().await);
let_assert!(Ok(()) = server.serve_connection(TokioIo::new(stream), service).with_upgrades().await);
});

// Try to create a websocket connection with the server.
Expand All @@ -36,7 +37,7 @@ async fn hyper() {
assert!(let Some(Ok(Message::Close(None))) = stream.next().await);
}

async fn upgrade_websocket(mut request: Request<Body>) -> Result<Response<Body>> {
async fn upgrade_websocket(mut request: Request<Incoming>) -> Result<Response<Full<Bytes>>> {
assert!(hyper_tungstenite::is_upgrade_request(&request) == true);

let (response, stream) = hyper_tungstenite::upgrade(&mut request, None)
Expand Down