Skip to content

Commit

Permalink
Update server-custom-accept example to hyper v1
Browse files Browse the repository at this point in the history
  • Loading branch information
nickelc authored and daniel-abramov committed Dec 8, 2023
1 parent 52e59dd commit 36b1286
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 19 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Expand Up @@ -67,7 +67,9 @@ version = "0.26.0"

[dev-dependencies]
futures-channel = "0.3.28"
hyper = { version = "0.14.25", default-features = false, features = ["http1", "server", "tcp"] }
hyper = { version = "1.0", default-features = false, features = ["http1", "server"] }
hyper-util = { version = "0.1", features = ["tokio"] }
http-body-util = "0.1"
tokio = { version = "1.27.0", default-features = false, features = ["io-std", "macros", "net", "rt-multi-thread", "time"] }
url = "2.3.1"
env_logger = "0.10.0"
Expand Down
63 changes: 45 additions & 18 deletions examples/server-custom-accept.rs
Expand Up @@ -26,15 +26,18 @@ use std::{
};

use hyper::{
body::Incoming,
header::{
HeaderValue, CONNECTION, SEC_WEBSOCKET_ACCEPT, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION,
UPGRADE,
},
server::conn::AddrStream,
service::{make_service_fn, service_fn},
server::conn::http1,
service::service_fn,
upgrade::Upgraded,
Body, Method, Request, Response, Server, StatusCode, Version,
Method, Request, Response, StatusCode, Version,
};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;

use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};
Expand All @@ -50,9 +53,25 @@ use tokio_tungstenite::{
type Tx = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;

/// Helper methods to create responses.
mod body {
use http_body_util::{Either, Empty, Full};
use hyper::body::Bytes;

pub type Body = Either<Empty<Bytes>, Full<Bytes>>;

pub fn empty() -> Body {
Either::Left(Empty::new())
}

pub fn bytes<B: Into<Bytes>>(chunk: B) -> Body {
Either::Right(Full::from(chunk.into()))
}
}

async fn handle_connection(
peer_map: PeerMap,
ws_stream: WebSocketStream<Upgraded>,
ws_stream: WebSocketStream<TokioIo<Upgraded>>,
addr: SocketAddr,
) {
println!("WebSocket connection established: {}", addr);
Expand Down Expand Up @@ -89,9 +108,9 @@ async fn handle_connection(

async fn handle_request(
peer_map: PeerMap,
mut req: Request<Body>,
mut req: Request<Incoming>,
addr: SocketAddr,
) -> Result<Response<Body>, Infallible> {
) -> Result<Response<body::Body>, Infallible> {
println!("Received a new, potentially ws handshake");
println!("The request's path is: {}", req.uri().path());
println!("The request's headers are:");
Expand Down Expand Up @@ -122,12 +141,13 @@ async fn handle_request(
|| key.is_none()
|| req.uri() != "/socket"
{
return Ok(Response::new(Body::from("Hello World!")));
return Ok(Response::new(body::bytes("Hello World!")));
}
let ver = req.version();
tokio::task::spawn(async move {
match hyper::upgrade::on(&mut req).await {
Ok(upgraded) => {
let upgraded = TokioIo::new(upgraded);
handle_connection(
peer_map,
WebSocketStream::from_raw_socket(upgraded, Role::Server, None).await,
Expand All @@ -138,7 +158,7 @@ async fn handle_request(
Err(e) => println!("upgrade error: {}", e),
}
});
let mut res = Response::new(Body::empty());
let mut res = Response::new(body::empty());
*res.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
*res.version_mut() = ver;
res.headers_mut().append(CONNECTION, upgrade);
Expand All @@ -151,21 +171,28 @@ async fn handle_request(
}

#[tokio::main]
async fn main() -> Result<(), hyper::Error> {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let state = PeerMap::new(Mutex::new(HashMap::new()));

let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string()).parse().unwrap();
let addr =
env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string()).parse::<SocketAddr>()?;

let listener = TcpListener::bind(addr).await?;

let make_svc = make_service_fn(move |conn: &AddrStream| {
let remote_addr = conn.remote_addr();
loop {
let (stream, remote_addr) = listener.accept().await?;
let state = state.clone();
let service = service_fn(move |req| handle_request(state.clone(), req, remote_addr));
async { Ok::<_, Infallible>(service) }
});

let server = Server::bind(&addr).serve(make_svc);
tokio::spawn(async move {
let io = TokioIo::new(stream);

server.await?;
let service = service_fn(move |req| handle_request(state.clone(), req, remote_addr));

Ok::<_, hyper::Error>(())
let conn = http1::Builder::new().serve_connection(io, service).with_upgrades();

if let Err(err) = conn.await {
eprintln!("failed to serve connection: {err:?}");
}
});
}
}

0 comments on commit 36b1286

Please sign in to comment.