Skip to content

Commit

Permalink
Merge pull request #21 from omjadas/chore/hyper-1
Browse files Browse the repository at this point in the history
chore: support hyper v1
  • Loading branch information
de-vri-es committed Nov 19, 2023
2 parents 931f2e9 + 99cdcac commit 4ef5639
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 80 deletions.
6 changes: 4 additions & 2 deletions Cargo.toml
Expand Up @@ -12,14 +12,16 @@ documentation = "https://docs.rs/hyper-tungstenite"
edition = "2021"

[dependencies]
hyper = { version = "0.14.4" }
http-body-util = "0.1.0"
hyper = { version = "1.0.0" }
hyper-util = { version = "0.1.0", features = ["tokio"] }
pin-project-lite = "0.2.10"
tokio = "1.2.0"
tokio-tungstenite = "0.20.0"
tungstenite = "0.20.0"

[dev-dependencies]
assert2 = "0.3.4"
hyper = { version = "0.14.18", features = ["http1", "server", "tcp"] }
hyper = { version = "1.0.0", features = ["http1", "server"] }
tokio = { version = "1.2.0", features = ["net", "macros", "rt-multi-thread"] }
futures = { version = "0.3.12" }
35 changes: 17 additions & 18 deletions README.md
Expand Up @@ -18,31 +18,32 @@ you can manually inspect the `Connection` and `Upgrade` headers.
## Example
```rust
use futures::{sink::SinkExt, stream::StreamExt};
use hyper::{Body, Request, Response};
use http_body_util::Full;
use hyper::{body::{Bytes, Incoming}, Request, Response};
use hyper_util::rt::TokioIo;
use hyper_tungstenite::{tungstenite, HyperWebsocket};
use std::convert::Infallible;
use tungstenite::Message;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

/// Handle a HTTP or WebSocket request.
async fn handle_request(mut request: Request<Body>) -> Result<Response<Body>, Error> {
async fn handle_request(mut request: Request<Incoming>) -> Result<Response<Full<Bytes>>, Error> {
// Check if the request is a websocket upgrade request.
if hyper_tungstenite::is_upgrade_request(&request) {
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?;

// Spawn a task to handle the websocket connection.
tokio::spawn(async move {
if let Err(e) = serve_websocket(websocket).await {
eprintln!("Error in websocket connection: {}", e);
eprintln!("Error in websocket connection: {e}");
}
});

// Return the response so the spawned future can continue.
Ok(response)
} else {
// Handle regular HTTP requests here.
Ok(Response::new(Body::from("Hello HTTP!")))
Ok(Response::new(Full::<Bytes>::from("Hello HTTP!")))
}
}

Expand All @@ -52,19 +53,19 @@ async fn serve_websocket(websocket: HyperWebsocket) -> Result<(), Error> {
while let Some(message) = websocket.next().await {
match message? {
Message::Text(msg) => {
println!("Received text message: {}", msg);
println!("Received text message: {msg}");
websocket.send(Message::text("Thank you, come again.")).await?;
},
Message::Binary(msg) => {
println!("Received binary message: {:02X?}", msg);
println!("Received binary message: {msg:02X?}");
websocket.send(Message::binary(b"Thank you, come again.".to_vec())).await?;
},
Message::Ping(msg) => {
// No need to send a reply: tungstenite takes care of this for you.
println!("Received ping message: {:02X?}", msg);
println!("Received ping message: {msg:02X?}");
},
Message::Pong(msg) => {
println!("Received pong message: {:02X?}", msg);
println!("Received pong message: {msg:02X?}");
}
Message::Close(msg) => {
// No need to send a reply: tungstenite takes care of this for you.
Expand All @@ -74,8 +75,8 @@ async fn serve_websocket(websocket: HyperWebsocket) -> Result<(), Error> {
println!("Received close message");
}
},
Message::Frame(msg) => {
unreachable!();
Message::Frame(_msg) => {
unreachable!();
}
}
}
Expand All @@ -86,22 +87,20 @@ async fn serve_websocket(websocket: HyperWebsocket) -> Result<(), Error> {
#[tokio::main]
async fn main() -> Result<(), Error> {
let addr: std::net::SocketAddr = "[::1]:3000".parse()?;
println!("Listening on http://{}", addr);
let listener = tokio::net::TcpListener::bind(&addr).await?;
println!("listening on {}", addr);
println!("Listening on http://{addr}");

let mut http = hyper::server::conn::Http::new();
http.http1_only(true);
http.http1_keep_alive(true);
let mut http = hyper::server::conn::http1::Builder::new();
http.keep_alive(true);

loop {
let (stream, _) = listener.accept().await?;
let connection = http
.serve_connection(stream, hyper::service::service_fn(handle_request))
.serve_connection(TokioIo::new(stream), hyper::service::service_fn(handle_request))
.with_upgrades();
tokio::spawn(async move {
if let Err(err) = connection.await {
println!("Error serving HTTP connection: {:?}", err);
println!("Error serving HTTP connection: {err:?}");
}
});
}
Expand Down
15 changes: 8 additions & 7 deletions examples/server.rs
@@ -1,12 +1,14 @@
use futures::{sink::SinkExt, stream::StreamExt};
use hyper::{Body, Request, Response};
use http_body_util::Full;
use hyper::{body::{Bytes, Incoming}, Request, Response};
use hyper_util::rt::TokioIo;
use hyper_tungstenite::{tungstenite, HyperWebsocket};
use tungstenite::Message;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

/// Handle a HTTP or WebSocket request.
async fn handle_request(mut request: Request<Body>) -> Result<Response<Body>, Error> {
async fn handle_request(mut request: Request<Incoming>) -> Result<Response<Full<Bytes>>, Error> {
// Check if the request is a websocket upgrade request.
if hyper_tungstenite::is_upgrade_request(&request) {
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?;
Expand All @@ -22,7 +24,7 @@ async fn handle_request(mut request: Request<Body>) -> Result<Response<Body>, Er
Ok(response)
} else {
// Handle regular HTTP requests here.
Ok(Response::new(Body::from("Hello HTTP!")))
Ok(Response::new(Full::<Bytes>::from("Hello HTTP!")))
}
}

Expand Down Expand Up @@ -69,14 +71,13 @@ async fn main() -> Result<(), Error> {
let listener = tokio::net::TcpListener::bind(&addr).await?;
println!("Listening on http://{addr}");

let mut http = hyper::server::conn::Http::new();
http.http1_only(true);
http.http1_keep_alive(true);
let mut http = hyper::server::conn::http1::Builder::new();
http.keep_alive(true);

loop {
let (stream, _) = listener.accept().await?;
let connection = http
.serve_connection(stream, hyper::service::service_fn(handle_request))
.serve_connection(TokioIo::new(stream), hyper::service::service_fn(handle_request))
.with_upgrades();
tokio::spawn(async move {
if let Err(err) = connection.await {
Expand Down
48 changes: 25 additions & 23 deletions src/lib.rs
Expand Up @@ -13,31 +13,32 @@
//! # Example
//! ```no_run
//! use futures::{sink::SinkExt, stream::StreamExt};
//! use hyper::{Body, Request, Response};
//! use http_body_util::Full;
//! use hyper::{body::{Bytes, Incoming}, Request, Response};
//! use hyper_util::rt::TokioIo;
//! use hyper_tungstenite::{tungstenite, HyperWebsocket};
//! use std::convert::Infallible;
//! use tungstenite::Message;
//!
//! type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
//!
//! /// Handle a HTTP or WebSocket request.
//! async fn handle_request(mut request: Request<Body>) -> Result<Response<Body>, Error> {
//! async fn handle_request(mut request: Request<Incoming>) -> Result<Response<Full<Bytes>>, Error> {
//! // Check if the request is a websocket upgrade request.
//! if hyper_tungstenite::is_upgrade_request(&request) {
//! let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?;
//!
//! // Spawn a task to handle the websocket connection.
//! tokio::spawn(async move {
//! if let Err(e) = serve_websocket(websocket).await {
//! eprintln!("Error in websocket connection: {}", e);
//! eprintln!("Error in websocket connection: {e}");
//! }
//! });
//!
//! // Return the response so the spawned future can continue.
//! Ok(response)
//! } else {
//! // Handle regular HTTP requests here.
//! Ok(Response::new(Body::from("Hello HTTP!")))
//! Ok(Response::new(Full::<Bytes>::from("Hello HTTP!")))
//! }
//! }
//!
Expand All @@ -47,19 +48,19 @@
//! while let Some(message) = websocket.next().await {
//! match message? {
//! Message::Text(msg) => {
//! println!("Received text message: {}", msg);
//! println!("Received text message: {msg}");
//! websocket.send(Message::text("Thank you, come again.")).await?;
//! },
//! Message::Binary(msg) => {
//! println!("Received binary message: {:02X?}", msg);
//! println!("Received binary message: {msg:02X?}");
//! websocket.send(Message::binary(b"Thank you, come again.".to_vec())).await?;
//! },
//! Message::Ping(msg) => {
//! // No need to send a reply: tungstenite takes care of this for you.
//! println!("Received ping message: {:02X?}", msg);
//! println!("Received ping message: {msg:02X?}");
//! },
//! Message::Pong(msg) => {
//! println!("Received pong message: {:02X?}", msg);
//! println!("Received pong message: {msg:02X?}");
//! }
//! Message::Close(msg) => {
//! // No need to send a reply: tungstenite takes care of this for you.
Expand All @@ -69,8 +70,8 @@
//! println!("Received close message");
//! }
//! },
//! Message::Frame(msg) => {
//! unreachable!();
//! Message::Frame(_msg) => {
//! unreachable!();
//! }
//! }
//! }
Expand All @@ -81,29 +82,30 @@
//! #[tokio::main]
//! async fn main() -> Result<(), Error> {
//! let addr: std::net::SocketAddr = "[::1]:3000".parse()?;
//! println!("Listening on http://{}", addr);
//! let listener = tokio::net::TcpListener::bind(&addr).await?;
//! println!("listening on {}", addr);
//! println!("Listening on http://{addr}");
//!
//! let mut http = hyper::server::conn::Http::new();
//! http.http1_only(true);
//! http.http1_keep_alive(true);
//! let mut http = hyper::server::conn::http1::Builder::new();
//! http.keep_alive(true);
//!
//! loop {
//! let (stream, _) = listener.accept().await?;
//! let connection = http
//! .serve_connection(stream, hyper::service::service_fn(handle_request))
//! .serve_connection(TokioIo::new(stream), hyper::service::service_fn(handle_request))
//! .with_upgrades();
//! tokio::spawn(async move {
//! if let Err(err) = connection.await {
//! println!("Error serving HTTP connection: {:?}", err);
//! println!("Error serving HTTP connection: {err:?}");
//! }
//! });
//! }
//! }
//! ```

use hyper::{Body, Request, Response};
use http_body_util::Full;
use hyper::body::Bytes;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use std::task::{Context, Poll};
use std::pin::Pin;
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -144,7 +146,7 @@ pin_project! {
pub fn upgrade<B>(
mut request: impl std::borrow::BorrowMut<Request<B>>,
config: Option<WebSocketConfig>,
) -> Result<(Response<Body>, HyperWebsocket), ProtocolError> {
) -> Result<(Response<Full<Bytes>>, HyperWebsocket), ProtocolError> {
let request = request.borrow_mut();

let key = request.headers().get("Sec-WebSocket-Key")
Expand All @@ -158,7 +160,7 @@ pub fn upgrade<B>(
.header(hyper::header::CONNECTION, "upgrade")
.header(hyper::header::UPGRADE, "websocket")
.header("Sec-WebSocket-Accept", &derive_accept_key(key.as_bytes()))
.body(Body::from("switching to websocket protocol"))
.body(Full::<Bytes>::from("switching to websocket protocol"))
.expect("bug: failed to build response");

let stream = HyperWebsocket {
Expand Down Expand Up @@ -212,7 +214,7 @@ fn trim_end(data: &[u8]) -> &[u8] {
}

impl std::future::Future for HyperWebsocket {
type Output = Result<WebSocketStream<hyper::upgrade::Upgraded>, Error>;
type Output = Result<WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>, Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
Expand All @@ -224,7 +226,7 @@ impl std::future::Future for HyperWebsocket {
let upgraded = upgraded.map_err(|_| Error::Protocol(ProtocolError::HandshakeIncomplete))?;

let stream = WebSocketStream::from_raw_socket(
upgraded,
TokioIo::new(upgraded),
Role::Server,
this.config.take(),
);
Expand Down
21 changes: 11 additions & 10 deletions tests/simple-server.rs
@@ -1,6 +1,8 @@
use hyper::{Body, Request, Response};
use hyper::server::Server;
use hyper::service::{service_fn, make_service_fn};
use http_body_util::Full;
use hyper::{Request, Response};
use hyper::body::{Bytes, Incoming};
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use hyper_tungstenite::tungstenite::Error;
use tokio::net::TcpStream;
use std::net::Ipv6Addr;
Expand All @@ -13,16 +15,15 @@ use assert2::{assert, let_assert};
#[tokio::test]
async fn hyper() {
// Bind a TCP listener to an ephemeral port.
let_assert!(Ok(listener) = std::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16)));
let_assert!(Ok(listener) = tokio::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16)).await);
let_assert!(Ok(bind_addr) = listener.local_addr());
let_assert!(Ok(server) = Server::from_tcp(listener));
let server = hyper::server::conn::http1::Builder::new();

// Spawn the server in a task.
tokio::spawn(async move {
let service = make_service_fn(|_conn| async {
Ok::<_, hyper::Error>(service_fn(upgrade_websocket))
});
let_assert!(Ok(()) = server.http1_only(true).serve(service).await);
let service = service_fn(upgrade_websocket);
let_assert!(Ok((stream, _)) = listener.accept().await);
let_assert!(Ok(()) = server.serve_connection(TokioIo::new(stream), service).with_upgrades().await);
});

// Try to create a websocket connection with the server.
Expand All @@ -36,7 +37,7 @@ async fn hyper() {
assert!(let Some(Ok(Message::Close(None))) = stream.next().await);
}

async fn upgrade_websocket(request: Request<Body>) -> Result<Response<Body>> {
async fn upgrade_websocket(request: Request<Incoming>) -> Result<Response<Full<Bytes>>> {
assert!(hyper_tungstenite::is_upgrade_request(&request) == true);

let (response, stream) = hyper_tungstenite::upgrade(request, None)
Expand Down
21 changes: 11 additions & 10 deletions tests/split.rs
@@ -1,6 +1,8 @@
use hyper::{Body, Request, Response};
use hyper::server::Server;
use hyper::service::{service_fn, make_service_fn};
use http_body_util::Full;
use hyper::{Request, Response};
use hyper::body::{Bytes, Incoming};
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use hyper_tungstenite::tungstenite::Error;
use tokio::net::TcpStream;
use std::net::Ipv6Addr;
Expand All @@ -13,16 +15,15 @@ use assert2::{assert, let_assert};
#[tokio::test]
async fn hyper() {
// Bind a TCP listener to an ephemeral port.
let_assert!(Ok(listener) = std::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16)));
let_assert!(Ok(listener) = tokio::net::TcpListener::bind((Ipv6Addr::LOCALHOST, 0u16)).await);
let_assert!(Ok(bind_addr) = listener.local_addr());
let_assert!(Ok(server) = Server::from_tcp(listener));
let server = hyper::server::conn::http1::Builder::new();

// Spawn the server in a task.
tokio::spawn(async move {
let service = make_service_fn(|_conn| async {
Ok::<_, hyper::Error>(service_fn(upgrade_websocket))
});
let_assert!(Ok(()) = server.http1_only(true).serve(service).await);
let service = service_fn(upgrade_websocket);
let_assert!(Ok((stream, _)) = listener.accept().await);
let_assert!(Ok(()) = server.serve_connection(TokioIo::new(stream), service).with_upgrades().await);
});

// Try to create a websocket connection with the server.
Expand All @@ -36,7 +37,7 @@ async fn hyper() {
assert!(let Some(Ok(Message::Close(None))) = stream.next().await);
}

async fn upgrade_websocket(mut request: Request<Body>) -> Result<Response<Body>> {
async fn upgrade_websocket(mut request: Request<Incoming>) -> Result<Response<Full<Bytes>>> {
assert!(hyper_tungstenite::is_upgrade_request(&request) == true);

let (response, stream) = hyper_tungstenite::upgrade(&mut request, None)
Expand Down

0 comments on commit 4ef5639

Please sign in to comment.