Skip to content

Commit

Permalink
Update server-custom-accept example to hyper v1
Browse files Browse the repository at this point in the history
  • Loading branch information
nickelc authored and sdroege committed Dec 7, 2023
1 parent 2b1fa55 commit d26431f
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 21 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ env_logger = "0.10"
async-std = { version = "1.0", features = ["attributes", "unstable"] }
tokio = { version = "1.0", features = ["full"] }
futures-channel = "0.3"
hyper = { version = "0.14", default-features = false, features = ["http1", "server", "tcp"] }
hyper = { version = "1.0", default-features = false, features = ["http1", "server"] }
hyper-util = { version = "0.1", features = ["tokio"] }
http-body-util = "0.1"

[[example]]
name = "autobahn-client"
Expand Down
65 changes: 45 additions & 20 deletions examples/server-custom-accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@ use std::{
};

use hyper::{
body::Incoming,
header::{
HeaderValue, CONNECTION, SEC_WEBSOCKET_ACCEPT, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION,
UPGRADE,
},
server::conn::AddrStream,
service::{make_service_fn, service_fn},
server::conn::http1,
service::service_fn,
upgrade::Upgraded,
Body, Method, Request, Response, Server, StatusCode, Version,
Method, Request, Response, StatusCode, Version,
};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;

use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};
Expand All @@ -48,9 +51,25 @@ use tungstenite::{
type Tx = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;

/// Helper methods to create responses
mod body {
use http_body_util::{Either, Empty, Full};
use hyper::body::Bytes;

pub type Body = Either<Empty<Bytes>, Full<Bytes>>;

pub fn empty() -> Body {
Either::Left(Empty::new())
}

pub fn bytes<B: Into<Bytes>>(chunk: B) -> Body {
Either::Right(Full::from(chunk.into()))
}
}

async fn handle_connection(
peer_map: PeerMap,
ws_stream: WebSocketStream<TokioAdapter<Upgraded>>,
ws_stream: WebSocketStream<TokioAdapter<TokioIo<Upgraded>>>,
addr: SocketAddr,
) {
println!("WebSocket connection established: {}", addr);
Expand Down Expand Up @@ -93,9 +112,9 @@ async fn handle_connection(

async fn handle_request(
peer_map: PeerMap,
mut req: Request<Body>,
mut req: Request<Incoming>,
addr: SocketAddr,
) -> Result<Response<Body>, Infallible> {
) -> Result<Response<body::Body>, Infallible> {
println!("Received a new, potentially ws handshake");
println!("The request's path is: {}", req.uri().path());
println!("The request's headers are:");
Expand Down Expand Up @@ -129,7 +148,7 @@ async fn handle_request(
|| key.is_none()
|| req.uri() != "/socket"
{
return Ok(Response::new(Body::from("Hello World!")));
return Ok(Response::new(body::bytes("Hello World!")));
}
let ver = req.version();
tokio::task::spawn(async move {
Expand All @@ -138,7 +157,7 @@ async fn handle_request(
handle_connection(
peer_map,
WebSocketStream::from_raw_socket(
TokioAdapter::new(upgraded),
TokioAdapter::new(TokioIo::new(upgraded)),
Role::Server,
None,
)
Expand All @@ -150,7 +169,7 @@ async fn handle_request(
Err(e) => println!("upgrade error: {}", e),
}
});
let mut res = Response::new(Body::empty());
let mut res = Response::new(body::empty());
*res.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
*res.version_mut() = ver;
res.headers_mut().append(CONNECTION, upgrade);
Expand All @@ -166,25 +185,31 @@ async fn handle_request(
}

#[tokio::main]
async fn main() -> Result<(), hyper::Error> {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let state = PeerMap::new(Mutex::new(HashMap::new()));

let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string())
.parse()
.unwrap();
.parse::<SocketAddr>()?;

let make_svc = make_service_fn(move |conn: &AddrStream| {
let remote_addr = conn.remote_addr();
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, addr) = listener.accept().await?;
let state = state.clone();
let service = service_fn(move |req| handle_request(state.clone(), req, remote_addr));
async { Ok::<_, Infallible>(service) }
});

let server = Server::bind(&addr).serve(make_svc);
tokio::spawn(async move {
let io = TokioIo::new(stream);

let service = service_fn(move |req| handle_request(state.clone(), req, addr));

server.await?;
let conn = http1::Builder::new()
.serve_connection(io, service)
.with_upgrades();

Ok::<_, hyper::Error>(())
if let Err(err) = conn.await {
eprintln!("failed to serve connection: {err:?}");
}
});
}
}

0 comments on commit d26431f

Please sign in to comment.