Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): re-design Server as higher-level API #1488

Merged
merged 1 commit into from
Apr 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 12 additions & 6 deletions benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ use tokio::runtime::Runtime;
use tokio::net::TcpListener;

use hyper::{Body, Method, Request, Response};
use hyper::server::Http;
use hyper::client::HttpConnector;
use hyper::server::conn::Http;


#[bench]
fn get_one_at_a_time(b: &mut test::Bencher) {
let mut rt = Runtime::new().unwrap();
let addr = spawn_hello(&mut rt);

let client = hyper::Client::configure()
.build_with_executor(&rt.reactor(), rt.executor());
let connector = HttpConnector::new_with_handle(1, rt.reactor().clone());
let client = hyper::Client::builder()
.executor(rt.executor())
.build::<_, Body>(connector);

let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap();

Expand All @@ -43,8 +46,10 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
let mut rt = Runtime::new().unwrap();
let addr = spawn_hello(&mut rt);

let client = hyper::Client::configure()
.build_with_executor(&rt.reactor(), rt.executor());
let connector = HttpConnector::new_with_handle(1, rt.reactor().clone());
let client = hyper::Client::builder()
.executor(rt.executor())
.build::<_, Body>(connector);

let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap();

Expand All @@ -71,7 +76,7 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr {
let listener = TcpListener::bind(&addr).unwrap();
let addr = listener.local_addr().unwrap();

let http = Http::<hyper::Chunk>::new();
let http = Http::new();

let service = const_service(service_fn(|req: Request<Body>| {
req.into_body()
Expand All @@ -81,6 +86,7 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr {
})
}));

// Specifically only accept 1 connection.
let srv = listener.incoming()
.into_future()
.map_err(|(e, _inc)| panic!("accept error: {}", e))
Expand Down
22 changes: 13 additions & 9 deletions benches/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,28 @@ use std::sync::mpsc;
use futures::{future, stream, Future, Stream};
use futures::sync::oneshot;

use hyper::{Body, Request, Response};
use hyper::{Body, Request, Response, Server};
use hyper::server::Service;

macro_rules! bench_server {
($b:ident, $header:expr, $body:expr) => ({
let _ = pretty_env_logger::try_init();
let (_until_tx, until_rx) = oneshot::channel();
let (_until_tx, until_rx) = oneshot::channel::<()>();
let addr = {
let (addr_tx, addr_rx) = mpsc::channel();
::std::thread::spawn(move || {
let addr = "127.0.0.1:0".parse().unwrap();
let srv = hyper::server::Http::new().bind(&addr, || Ok(BenchPayload {
header: $header,
body: $body,
})).unwrap();
let addr = srv.local_addr().unwrap();
addr_tx.send(addr).unwrap();
tokio::run(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e)));
let srv = Server::bind(&addr)
.serve(|| Ok(BenchPayload {
header: $header,
body: $body,
}));
addr_tx.send(srv.local_addr()).unwrap();
let fut = srv
.map_err(|e| panic!("server error: {}", e))
.select(until_rx.then(|_| Ok(())))
.then(|_| Ok(()));
tokio::run(fut);
});

addr_rx.recv().unwrap()
Expand Down
18 changes: 8 additions & 10 deletions examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,27 @@ extern crate pretty_env_logger;
extern crate tokio;

use futures::Future;
use futures::future::lazy;

use hyper::{Body, Response};
use hyper::server::{Http, const_service, service_fn};
use hyper::server::{Server, const_service, service_fn};

static PHRASE: &'static [u8] = b"Hello World!";

fn main() {
pretty_env_logger::init();

let addr = ([127, 0, 0, 1], 3000).into();

let new_service = const_service(service_fn(|_| {
//TODO: when `!` is stable, replace error type
Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE)))
}));

tokio::run(lazy(move || {
let server = Http::new()
.sleep_on_errors(true)
.bind(&addr, new_service)
.unwrap();
let server = Server::bind(&addr)
.serve(new_service)
.map_err(|e| eprintln!("server error: {}", e));

println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
println!("Listening on http://{}", addr);

tokio::run(server);
}
30 changes: 14 additions & 16 deletions examples/multi_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ extern crate futures;
extern crate pretty_env_logger;
extern crate tokio;

use futures::{Future, Stream};
use futures::{Future};
use futures::future::{FutureResult, lazy};

use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};

static INDEX1: &'static [u8] = b"The 1st service!";
static INDEX2: &'static [u8] = b"The 2nd service!";
Expand Down Expand Up @@ -40,25 +40,23 @@ impl Service for Srv {

fn main() {
pretty_env_logger::init();
let addr1 = "127.0.0.1:1337".parse().unwrap();
let addr2 = "127.0.0.1:1338".parse().unwrap();

let addr1 = ([127, 0, 0, 1], 1337).into();
let addr2 = ([127, 0, 0, 1], 1338).into();

tokio::run(lazy(move || {
let srv1 = Http::new().serve_addr(&addr1, || Ok(Srv(INDEX1))).unwrap();
let srv2 = Http::new().serve_addr(&addr2, || Ok(Srv(INDEX2))).unwrap();
let srv1 = Server::bind(&addr1)
.serve(|| Ok(Srv(INDEX1)))
.map_err(|e| eprintln!("server 1 error: {}", e));

println!("Listening on http://{}", srv1.incoming_ref().local_addr());
println!("Listening on http://{}", srv2.incoming_ref().local_addr());
let srv2 = Server::bind(&addr2)
.serve(|| Ok(Srv(INDEX2)))
.map_err(|e| eprintln!("server 2 error: {}", e));

tokio::spawn(srv1.for_each(move |conn| {
tokio::spawn(conn.map_err(|err| println!("srv1 error: {:?}", err)));
Ok(())
}).map_err(|_| ()));
println!("Listening on http://{} and http://{}", addr1, addr2);

tokio::spawn(srv2.for_each(move |conn| {
tokio::spawn(conn.map_err(|err| println!("srv2 error: {:?}", err)));
Ok(())
}).map_err(|_| ()));
tokio::spawn(srv1);
tokio::spawn(srv2);

Ok(())
}));
Expand Down
16 changes: 8 additions & 8 deletions examples/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ extern crate tokio;
extern crate url;

use futures::{Future, Stream};
use futures::future::lazy;

use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};

use std::collections::HashMap;
use url::form_urlencoded;
Expand Down Expand Up @@ -96,11 +95,12 @@ impl Service for ParamExample {

fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();

tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
let addr = ([127, 0, 0, 1], 1337).into();

let server = Server::bind(&addr)
.serve(|| Ok(ParamExample))
.map_err(|e| eprintln!("server error: {}", e));

tokio::run(server);
}
16 changes: 9 additions & 7 deletions examples/send_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ extern crate pretty_env_logger;
extern crate tokio;

use futures::{Future/*, Sink*/};
use futures::future::lazy;
use futures::sync::oneshot;

use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};

use std::fs::File;
use std::io::{self, copy/*, Read*/};
Expand Down Expand Up @@ -138,11 +137,14 @@ impl Service for ResponseExamples {

fn main() {
pretty_env_logger::init();

let addr = "127.0.0.1:1337".parse().unwrap();

tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
let server = Server::bind(&addr)
.serve(|| Ok(ResponseExamples))
.map_err(|e| eprintln!("server error: {}", e));

println!("Listening on http://{}", addr);

tokio::run(server);
}
19 changes: 11 additions & 8 deletions examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ extern crate pretty_env_logger;
extern crate tokio;

use futures::Future;
use futures::future::{FutureResult, lazy};
use futures::future::{FutureResult};

use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};

static INDEX: &'static [u8] = b"Try POST /echo";

Expand Down Expand Up @@ -41,11 +41,14 @@ impl Service for Echo {

fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();

tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
let addr = ([127, 0, 0, 1], 1337).into();

let server = Server::bind(&addr)
.serve(|| Ok(Echo))
.map_err(|e| eprintln!("server error: {}", e));

println!("Listening on http://{}", addr);

tokio::run(server);
}
14 changes: 8 additions & 6 deletions examples/web_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::future::lazy;

use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode};
use hyper::client::HttpConnector;
use hyper::server::{Http, Service};
use hyper::server::{Server, Service};

#[allow(unused, deprecated)]
use std::ascii::AsciiExt;
Expand Down Expand Up @@ -75,15 +75,17 @@ impl Service for ResponseExamples {

fn main() {
pretty_env_logger::init();

let addr = "127.0.0.1:1337".parse().unwrap();

tokio::run(lazy(move || {
let client = Client::new();
let serve = Http::new().serve_addr(&addr, move || Ok(ResponseExamples(client.clone()))).unwrap();
println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr());
let server = Server::bind(&addr)
.serve(move || Ok(ResponseExamples(client.clone())))
.map_err(|e| eprintln!("server error: {}", e));

println!("Listening on http://{}", addr);

serve.map_err(|_| ()).for_each(move |conn| {
tokio::spawn(conn.map_err(|err| println!("serve error: {:?}", err)))
})
server
}));
}
8 changes: 4 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ impl Error {
Error::new(Kind::Io, Some(cause.into()))
}

pub(crate) fn new_listen(err: io::Error) -> Error {
Error::new(Kind::Listen, Some(err.into()))
pub(crate) fn new_listen<E: Into<Cause>>(cause: E) -> Error {
Error::new(Kind::Listen, Some(cause.into()))
}

pub(crate) fn new_accept(err: io::Error) -> Error {
Error::new(Kind::Accept, Some(Box::new(err)))
pub(crate) fn new_accept<E: Into<Cause>>(cause: E) -> Error {
Error::new(Kind::Accept, Some(cause.into()))
}

pub(crate) fn new_connect<E: Into<Cause>>(cause: E) -> Error {
Expand Down
15 changes: 13 additions & 2 deletions src/proto/h1/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,19 @@ use tokio_io::{AsyncRead, AsyncWrite};

use proto::{Http1Transaction, MessageHead};

const INIT_BUFFER_SIZE: usize = 8192;
pub const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
/// The initial buffer size allocated before trying to read from IO.
pub(crate) const INIT_BUFFER_SIZE: usize = 8192;

/// The default maximum read buffer size. If the buffer gets this big and
/// a message is still not complete, a `TooLarge` error is triggered.
// Note: if this changes, update server::conn::Http::max_buf_size docs.
pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;

/// The maximum number of distinct `Buf`s to hold in a list before requiring
/// a flush. Only affects when the buffer strategy is to queue buffers.
///
/// Note that a flush can happen before reaching the maximum. This simply
/// forces a flush if the queue gets this big.
const MAX_BUF_LIST_BUFFERS: usize = 16;

pub struct Buffered<T, B> {
Expand Down