From f6c9707cc489086db92e4969b67c2f8d4e07710d Mon Sep 17 00:00:00 2001 From: SabrinaJewson Date: Sat, 12 Mar 2022 12:59:58 +0000 Subject: [PATCH 1/3] refactor(server): move non-conn code out of conn.rs The actual code for `Server` was previously organized very confusingly: it was thrice layered with `SpawnAll` and `Serve` which both appeared in conn.rs despite not having anything to do with the lower-level conn API. This commit changes that, removing all layering and having the code for the higher-level `Server` appear inside `server.rs` only. --- src/common/exec.rs | 2 +- src/server/conn.rs | 353 +---------------------------------------- src/server/mod.rs | 2 +- src/server/server.rs | 261 ++++++++++++++++++++++++++++-- src/server/shutdown.rs | 13 +- 5 files changed, 262 insertions(+), 369 deletions(-) diff --git a/src/common/exec.rs b/src/common/exec.rs index b6da9a276b..f890e6aae5 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -11,7 +11,7 @@ use crate::body::HttpBody; use crate::proto::h2::server::H2Stream; use crate::rt::Executor; #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] -use crate::server::conn::spawn_all::{NewSvcTask, Watcher}; +use crate::server::server::{Watcher, new_svc::NewSvcTask}; #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] use crate::service::HttpService; diff --git a/src/server/conn.rs b/src/server/conn.rs index de765b3a15..1e6cb469d2 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -48,8 +48,6 @@ not(all(feature = "http1", feature = "http2")) ))] use std::marker::PhantomData; -#[cfg(feature = "tcp")] -use std::net::SocketAddr; use std::time::Duration; #[cfg(feature = "http2")] @@ -70,17 +68,15 @@ cfg_feature! { use tokio::io::{AsyncRead, AsyncWrite}; use tracing::trace; - use super::accept::Accept; + pub use super::server::Connecting; use crate::body::{Body, HttpBody}; use crate::common::{task, Future, Pin, Poll, Unpin}; #[cfg(not(all(feature = "http1", feature = "http2")))] use crate::common::Never; - use crate::common::exec::{ConnStreamExec, Exec, NewSvcExec}; + use crate::common::exec::{ConnStreamExec, Exec}; use crate::proto; - use crate::service::{HttpService, MakeServiceRef}; - use self::spawn_all::NewSvcTask; + use crate::service::HttpService; - pub(super) use self::spawn_all::{NoopWatcher, Watcher}; pub(super) use self::upgrades::UpgradeableConnection; } @@ -97,7 +93,7 @@ pub use super::tcp::{AddrIncoming, AddrStream}; #[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] pub struct Http { - exec: E, + pub(crate) exec: E, h1_half_close: bool, h1_keep_alive: bool, h1_title_case_headers: bool, @@ -127,51 +123,6 @@ enum ConnectionMode { Fallback, } -#[cfg(any(feature = "http1", feature = "http2"))] -pin_project! { - /// A stream mapping incoming IOs to new services. - /// - /// Yields `Connecting`s that are futures that should be put on a reactor. - #[must_use = "streams do nothing unless polled"] - #[derive(Debug)] - pub(super) struct Serve { - #[pin] - incoming: I, - make_service: S, - protocol: Http, - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -pin_project! { - /// A future building a new `Service` to a `Connection`. - /// - /// Wraps the future returned from `MakeService` into one that returns - /// a `Connection`. - #[must_use = "futures do nothing unless polled"] - #[derive(Debug)] - #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] - pub struct Connecting { - #[pin] - future: F, - io: Option, - protocol: Http, - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -pin_project! { - #[must_use = "futures do nothing unless polled"] - #[derive(Debug)] - pub(super) struct SpawnAll { - // TODO: re-add `pub(super)` once rustdoc can handle this. - // - // See https://github.com/rust-lang/rust/issues/64705 - #[pin] - pub(super) serve: Serve, - } -} - #[cfg(any(feature = "http1", feature = "http2"))] pin_project! { /// A future binding a connection with a Service. @@ -719,23 +670,6 @@ impl Http { fallback: PhantomData, } } - - pub(super) fn serve(&self, incoming: I, make_service: S) -> Serve - where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin, - S: MakeServiceRef, - S::Error: Into>, - Bd: HttpBody, - E: ConnStreamExec<>::Future, Bd>, - { - Serve { - incoming, - make_service, - protocol: self.clone(), - } - } } // ===== impl Connection ===== @@ -987,141 +921,6 @@ impl Default for ConnectionMode { } } -// ===== impl Serve ===== - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Serve { - /// Get a reference to the incoming stream. - #[inline] - pub(super) fn incoming_ref(&self) -> &I { - &self.incoming - } - - /* - /// Get a mutable reference to the incoming stream. - #[inline] - pub fn incoming_mut(&mut self) -> &mut I { - &mut self.incoming - } - */ - - /// Spawn all incoming connections onto the executor in `Http`. - pub(super) fn spawn_all(self) -> SpawnAll { - SpawnAll { serve: self } - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Serve -where - I: Accept, - IO: AsyncRead + AsyncWrite + Unpin, - IE: Into>, - S: MakeServiceRef, - B: HttpBody, - E: ConnStreamExec<>::Future, B>, -{ - fn poll_next_( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll>>> { - let me = self.project(); - match ready!(me.make_service.poll_ready_ref(cx)) { - Ok(()) => (), - Err(e) => { - trace!("make_service closed"); - return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e)))); - } - } - - if let Some(item) = ready!(me.incoming.poll_accept(cx)) { - let io = item.map_err(crate::Error::new_accept)?; - let new_fut = me.make_service.make_service_ref(&io); - Poll::Ready(Some(Ok(Connecting { - future: new_fut, - io: Some(io), - protocol: me.protocol.clone(), - }))) - } else { - Poll::Ready(None) - } - } -} - -// ===== impl Connecting ===== - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Future for Connecting -where - I: AsyncRead + AsyncWrite + Unpin, - F: Future>, - S: HttpService, - B: HttpBody + 'static, - B::Error: Into>, - E: ConnStreamExec, -{ - type Output = Result, FE>; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let mut me = self.project(); - let service = ready!(me.future.poll(cx))?; - let io = Option::take(&mut me.io).expect("polled after complete"); - Poll::Ready(Ok(me.protocol.serve_connection(io, service))) - } -} - -// ===== impl SpawnAll ===== - -#[cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))] -impl SpawnAll { - pub(super) fn local_addr(&self) -> SocketAddr { - self.serve.incoming.local_addr() - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -impl SpawnAll { - pub(super) fn incoming_ref(&self) -> &I { - self.serve.incoming_ref() - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -impl SpawnAll -where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, - B: HttpBody, - E: ConnStreamExec<>::Future, B>, -{ - pub(super) fn poll_watch( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - watcher: &W, - ) -> Poll> - where - E: NewSvcExec, - W: Watcher, - { - let mut me = self.project(); - loop { - if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) { - let fut = NewSvcTask::new(connecting, watcher.clone()); - me.serve - .as_mut() - .project() - .protocol - .exec - .execute_new_svc(fut); - } else { - return Poll::Ready(Ok(())); - } - } - } -} - // ===== impl ProtoServer ===== #[cfg(any(feature = "http1", feature = "http2"))] @@ -1151,150 +950,6 @@ where } } -#[cfg(any(feature = "http1", feature = "http2"))] -pub(crate) mod spawn_all { - use std::error::Error as StdError; - use tokio::io::{AsyncRead, AsyncWrite}; - use tracing::debug; - - use super::{Connecting, UpgradeableConnection}; - use crate::body::{Body, HttpBody}; - use crate::common::exec::ConnStreamExec; - use crate::common::{task, Future, Pin, Poll, Unpin}; - use crate::service::HttpService; - use pin_project_lite::pin_project; - - // Used by `SpawnAll` to optionally watch a `Connection` future. - // - // The regular `hyper::Server` just uses a `NoopWatcher`, which does - // not need to watch anything, and so returns the `Connection` untouched. - // - // The `Server::with_graceful_shutdown` needs to keep track of all active - // connections, and signal that they start to shutdown when prompted, so - // it has a `GracefulWatcher` implementation to do that. - pub trait Watcher, E>: Clone { - type Future: Future>; - - fn watch(&self, conn: UpgradeableConnection) -> Self::Future; - } - - #[allow(missing_debug_implementations)] - #[derive(Copy, Clone)] - pub struct NoopWatcher; - - impl Watcher for NoopWatcher - where - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: HttpService, - E: ConnStreamExec, - S::ResBody: 'static, - ::Error: Into>, - { - type Future = UpgradeableConnection; - - fn watch(&self, conn: UpgradeableConnection) -> Self::Future { - conn - } - } - - // This is a `Future` spawned to an `Executor` inside - // the `SpawnAll`. By being a nameable type, we can be generic over the - // user's `Service::Future`, and thus an `Executor` can execute it. - // - // Doing this allows for the server to conditionally require `Send` futures, - // depending on the `Executor` configured. - // - // Users cannot import this type, nor the associated `NewSvcExec`. Instead, - // a blanket implementation for `Executor` is sufficient. - - pin_project! { - #[allow(missing_debug_implementations)] - pub struct NewSvcTask, E, W: Watcher> { - #[pin] - state: State, - } - } - - pin_project! { - #[project = StateProj] - pub(super) enum State, E, W: Watcher> { - Connecting { - #[pin] - connecting: Connecting, - watcher: W, - }, - Connected { - #[pin] - future: W::Future, - }, - } - } - - impl, E, W: Watcher> NewSvcTask { - pub(super) fn new(connecting: Connecting, watcher: W) -> Self { - NewSvcTask { - state: State::Connecting { - connecting, - watcher, - }, - } - } - } - - impl Future for NewSvcTask - where - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - N: Future>, - NE: Into>, - S: HttpService, - B: HttpBody + 'static, - B::Error: Into>, - E: ConnStreamExec, - W: Watcher, - { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - // If it weren't for needing to name this type so the `Send` bounds - // could be projected to the `Serve` executor, this could just be - // an `async fn`, and much safer. Woe is me. - - let mut me = self.project(); - loop { - let next = { - match me.state.as_mut().project() { - StateProj::Connecting { - connecting, - watcher, - } => { - let res = ready!(connecting.poll(cx)); - let conn = match res { - Ok(conn) => conn, - Err(err) => { - let err = crate::Error::new_user_make_service(err); - debug!("connecting error: {}", err); - return Poll::Ready(()); - } - }; - let future = watcher.watch(conn.with_upgrades()); - State::Connected { future } - } - StateProj::Connected { future } => { - return future.poll(cx).map(|res| { - if let Err(err) = res { - debug!("connection error: {}", err); - } - }); - } - } - }; - - me.state.set(next); - } - } - } -} - #[cfg(any(feature = "http1", feature = "http2"))] mod upgrades { use super::*; diff --git a/src/server/mod.rs b/src/server/mod.rs index a97944f518..d658f42118 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -149,7 +149,7 @@ pub mod accept; pub mod conn; -mod server; +pub(crate) mod server; #[cfg(feature = "tcp")] mod tcp; diff --git a/src/server/server.rs b/src/server/server.rs index c48582c7fd..f2fb8bd8b0 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -15,6 +15,7 @@ cfg_feature! { use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; + use tracing::trace; use super::accept::Accept; use crate::body::{Body, HttpBody}; @@ -22,9 +23,11 @@ cfg_feature! { use crate::common::exec::{ConnStreamExec, NewSvcExec}; // Renamed `Http` as `Http_` for now so that people upgrading don't see an // error that `hyper::server::Http` is private... - use super::conn::{Http as Http_, NoopWatcher, SpawnAll}; + use super::conn::{Connection, Http as Http_, UpgradeableConnection}; use super::shutdown::{Graceful, GracefulWatcher}; use crate::service::{HttpService, MakeServiceRef}; + + use self::new_svc::NewSvcTask; } #[cfg(any(feature = "http1", feature = "http2"))] @@ -37,7 +40,9 @@ pin_project! { /// `Executor`. pub struct Server { #[pin] - spawn_all: SpawnAll, + incoming: I, + make_service: S, + protocol: Http_, } } @@ -107,7 +112,7 @@ cfg_feature! { impl Server { /// Returns the local address that this server is bound to. pub fn local_addr(&self) -> SocketAddr { - self.spawn_all.local_addr() + self.incoming.local_addr() } } } @@ -124,7 +129,6 @@ where B: HttpBody + 'static, B::Error: Into>, E: ConnStreamExec<>::Future, B>, - E: NewSvcExec, { /// Prepares a server to handle graceful shutdown when the provided future /// completes. @@ -165,8 +169,54 @@ where pub fn with_graceful_shutdown(self, signal: F) -> Graceful where F: Future, + E: NewSvcExec, { - Graceful::new(self.spawn_all, signal) + Graceful::new(self, signal) + } + + fn poll_next_( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll>>> { + let me = self.project(); + match ready!(me.make_service.poll_ready_ref(cx)) { + Ok(()) => (), + Err(e) => { + trace!("make_service closed"); + return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e)))); + } + } + + if let Some(item) = ready!(me.incoming.poll_accept(cx)) { + let io = item.map_err(crate::Error::new_accept)?; + let new_fut = me.make_service.make_service_ref(&io); + Poll::Ready(Some(Ok(Connecting { + future: new_fut, + io: Some(io), + protocol: me.protocol.clone(), + }))) + } else { + Poll::Ready(None) + } + } + + pub(super) fn poll_watch( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + watcher: &W, + ) -> Poll> + where + E: NewSvcExec, + W: Watcher, + { + loop { + if let Some(connecting) = ready!(self.as_mut().poll_next_(cx)?) { + let fut = NewSvcTask::new(connecting, watcher.clone()); + self.as_mut().project().protocol.exec.execute_new_svc(fut); + } else { + return Poll::Ready(Ok(())); + } + } } } @@ -187,7 +237,7 @@ where type Output = crate::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - self.project().spawn_all.poll_watch(cx, &NoopWatcher) + self.poll_watch(cx, &NoopWatcher) } } @@ -195,7 +245,7 @@ impl fmt::Debug for Server { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut st = f.debug_struct("Server"); #[cfg(any(feature = "http1", feature = "http2"))] - st.field("listener", &self.spawn_all.incoming_ref()); + st.field("listener", &self.incoming); st.finish() } } @@ -309,7 +359,7 @@ impl Builder { self } - /// Set a timeout for reading client request headers. If a client does not + /// Set a timeout for reading client request headers. If a client does not /// transmit the entire header within this time, the connection is closed. /// /// Default is None. @@ -502,7 +552,7 @@ impl Builder { /// } /// # } /// ``` - pub fn serve(self, new_service: S) -> Server + pub fn serve(self, make_service: S) -> Server where I: Accept, I::Error: Into>, @@ -514,9 +564,11 @@ impl Builder { E: NewSvcExec, E: ConnStreamExec<>::Future, B>, { - let serve = self.protocol.serve(self.incoming, new_service); - let spawn_all = serve.spawn_all(); - Server { spawn_all } + Server { + incoming: self.incoming, + make_service, + protocol: self.protocol.clone(), + } } } @@ -558,3 +610,188 @@ impl Builder { self } } + +// Used by `Server` to optionally watch a `Connection` future. +// +// The regular `hyper::Server` just uses a `NoopWatcher`, which does +// not need to watch anything, and so returns the `Connection` untouched. +// +// The `Server::with_graceful_shutdown` needs to keep track of all active +// connections, and signal that they start to shutdown when prompted, so +// it has a `GracefulWatcher` implementation to do that. +#[cfg(any(feature = "http1", feature = "http2"))] +pub trait Watcher, E>: Clone { + type Future: Future>; + + fn watch(&self, conn: UpgradeableConnection) -> Self::Future; +} + +#[allow(missing_debug_implementations)] +#[derive(Copy, Clone)] +#[cfg(any(feature = "http1", feature = "http2"))] +pub struct NoopWatcher; + +#[cfg(any(feature = "http1", feature = "http2"))] +impl Watcher for NoopWatcher +where + I: AsyncRead + AsyncWrite + Unpin + Send + 'static, + S: HttpService, + E: ConnStreamExec, + S::ResBody: 'static, + ::Error: Into>, +{ + type Future = UpgradeableConnection; + + fn watch(&self, conn: UpgradeableConnection) -> Self::Future { + conn + } +} + +#[cfg(any(feature = "http1", feature = "http2"))] +// used by exec.rs +pub(crate) mod new_svc { + use std::error::Error as StdError; + use tokio::io::{AsyncRead, AsyncWrite}; + use tracing::debug; + + use super::{Connecting, Watcher}; + use crate::body::{Body, HttpBody}; + use crate::common::exec::ConnStreamExec; + use crate::common::{task, Future, Pin, Poll, Unpin}; + use crate::service::HttpService; + use pin_project_lite::pin_project; + + // This is a `Future` spawned to an `Executor` inside + // the `Server`. By being a nameable type, we can be generic over the + // user's `Service::Future`, and thus an `Executor` can execute it. + // + // Doing this allows for the server to conditionally require `Send` futures, + // depending on the `Executor` configured. + // + // Users cannot import this type, nor the associated `NewSvcExec`. Instead, + // a blanket implementation for `Executor` is sufficient. + + pin_project! { + #[allow(missing_debug_implementations)] + pub struct NewSvcTask, E, W: Watcher> { + #[pin] + state: State, + } + } + + pin_project! { + #[project = StateProj] + pub(super) enum State, E, W: Watcher> { + Connecting { + #[pin] + connecting: Connecting, + watcher: W, + }, + Connected { + #[pin] + future: W::Future, + }, + } + } + + impl, E, W: Watcher> NewSvcTask { + pub(super) fn new(connecting: Connecting, watcher: W) -> Self { + NewSvcTask { + state: State::Connecting { + connecting, + watcher, + }, + } + } + } + + impl Future for NewSvcTask + where + I: AsyncRead + AsyncWrite + Unpin + Send + 'static, + N: Future>, + NE: Into>, + S: HttpService, + B: HttpBody + 'static, + B::Error: Into>, + E: ConnStreamExec, + W: Watcher, + { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + // If it weren't for needing to name this type so the `Send` bounds + // could be projected to the `Serve` executor, this could just be + // an `async fn`, and much safer. Woe is me. + + let mut me = self.project(); + loop { + let next = { + match me.state.as_mut().project() { + StateProj::Connecting { + connecting, + watcher, + } => { + let res = ready!(connecting.poll(cx)); + let conn = match res { + Ok(conn) => conn, + Err(err) => { + let err = crate::Error::new_user_make_service(err); + debug!("connecting error: {}", err); + return Poll::Ready(()); + } + }; + let future = watcher.watch(conn.with_upgrades()); + State::Connected { future } + } + StateProj::Connected { future } => { + return future.poll(cx).map(|res| { + if let Err(err) = res { + debug!("connection error: {}", err); + } + }); + } + } + }; + + me.state.set(next); + } + } + } +} + +#[cfg(any(feature = "http1", feature = "http2"))] +pin_project! { + /// A future building a new `Service` to a `Connection`. + /// + /// Wraps the future returned from `MakeService` into one that returns + /// a `Connection`. + #[must_use = "futures do nothing unless polled"] + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] + pub struct Connecting { + #[pin] + future: F, + io: Option, + protocol: Http_, + } +} + +#[cfg(any(feature = "http1", feature = "http2"))] +impl Future for Connecting +where + I: AsyncRead + AsyncWrite + Unpin, + F: Future>, + S: HttpService, + B: HttpBody + 'static, + B::Error: Into>, + E: ConnStreamExec, +{ + type Output = Result, FE>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let mut me = self.project(); + let service = ready!(me.future.poll(cx))?; + let io = Option::take(&mut me.io).expect("polled after complete"); + Poll::Ready(Ok(me.protocol.serve_connection(io, service))) + } +} diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index 2277a40964..96937d0827 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -5,7 +5,8 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tracing::debug; use super::accept::Accept; -use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; +use super::conn::UpgradeableConnection; +use super::server::{Server, Watcher}; use crate::body::{Body, HttpBody}; use crate::common::drain::{self, Draining, Signal, Watch, Watching}; use crate::common::exec::{ConnStreamExec, NewSvcExec}; @@ -26,7 +27,7 @@ pin_project! { Running { drain: Option<(Signal, Watch)>, #[pin] - spawn_all: SpawnAll, + server: Server, #[pin] signal: F, }, @@ -35,12 +36,12 @@ pin_project! { } impl Graceful { - pub(super) fn new(spawn_all: SpawnAll, signal: F) -> Self { + pub(super) fn new(server: Server, signal: F) -> Self { let drain = Some(drain::channel()); Graceful { state: State::Running { drain, - spawn_all, + server, signal, }, } @@ -69,7 +70,7 @@ where match me.state.as_mut().project() { StateProj::Running { drain, - spawn_all, + server, signal, } => match signal.poll(cx) { Poll::Ready(()) => { @@ -81,7 +82,7 @@ where } Poll::Pending => { let watch = drain.as_ref().expect("drain channel").1.clone(); - return spawn_all.poll_watch(cx, &GracefulWatcher(watch)); + return server.poll_watch(cx, &GracefulWatcher(watch)); } }, StateProj::Draining { ref mut draining } => { From 43bc6029c79a18320a4e97b8f45f5f040400ecd1 Mon Sep 17 00:00:00 2001 From: SabrinaJewson Date: Sat, 12 Mar 2022 13:12:34 +0000 Subject: [PATCH 2/3] refactor(server): simplify server cfgs `server.rs` is currently littered with `cfg`s for `http1` or `http2`, since the majority of server behaviour is only available with either one of those feature flags active. This is hard to maintain and confusing to read, so this commit extracts the two implementations into their own files, since there is very little benefit in sharing code between the two. --- src/server/conn.rs | 1 + src/server/mod.rs | 9 ++- src/server/server.rs | 131 ++++++++++++++++---------------------- src/server/server_stub.rs | 16 +++++ 4 files changed, 80 insertions(+), 77 deletions(-) create mode 100644 src/server/server_stub.rs diff --git a/src/server/conn.rs b/src/server/conn.rs index 1e6cb469d2..3f9eaf9e62 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -48,6 +48,7 @@ not(all(feature = "http1", feature = "http2")) ))] use std::marker::PhantomData; +#[cfg(all(any(feature = "http1", feature = "http2"), feature = "runtime"))] use std::time::Duration; #[cfg(feature = "http2")] diff --git a/src/server/mod.rs b/src/server/mod.rs index d658f42118..b5508aefdd 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -149,7 +149,6 @@ pub mod accept; pub mod conn; -pub(crate) mod server; #[cfg(feature = "tcp")] mod tcp; @@ -158,7 +157,15 @@ pub use self::server::Server; cfg_feature! { #![any(feature = "http1", feature = "http2")] + pub(crate) mod server; pub use self::server::Builder; mod shutdown; } + +cfg_feature! { + #![not(any(feature = "http1", feature = "http2"))] + + mod server_stub; + use server_stub as server; +} diff --git a/src/server/server.rs b/src/server/server.rs index f2fb8bd8b0..e3058da4bb 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -1,36 +1,29 @@ +use std::error::Error as StdError; use std::fmt; #[cfg(feature = "tcp")] use std::net::{SocketAddr, TcpListener as StdTcpListener}; #[cfg(any(feature = "tcp", feature = "http1"))] use std::time::Duration; -#[cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))] +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::trace; + +use super::accept::Accept; +#[cfg(all(feature = "tcp"))] use super::tcp::AddrIncoming; +use crate::body::{Body, HttpBody}; use crate::common::exec::Exec; +use crate::common::exec::{ConnStreamExec, NewSvcExec}; +use crate::common::{task, Future, Pin, Poll, Unpin}; +// Renamed `Http` as `Http_` for now so that people upgrading don't see an +// error that `hyper::server::Http` is private... +use super::conn::{Connection, Http as Http_, UpgradeableConnection}; +use super::shutdown::{Graceful, GracefulWatcher}; +use crate::service::{HttpService, MakeServiceRef}; -cfg_feature! { - #![any(feature = "http1", feature = "http2")] - - use std::error::Error as StdError; +use self::new_svc::NewSvcTask; - use pin_project_lite::pin_project; - use tokio::io::{AsyncRead, AsyncWrite}; - use tracing::trace; - - use super::accept::Accept; - use crate::body::{Body, HttpBody}; - use crate::common::{task, Future, Pin, Poll, Unpin}; - use crate::common::exec::{ConnStreamExec, NewSvcExec}; - // Renamed `Http` as `Http_` for now so that people upgrading don't see an - // error that `hyper::server::Http` is private... - use super::conn::{Connection, Http as Http_, UpgradeableConnection}; - use super::shutdown::{Graceful, GracefulWatcher}; - use crate::service::{HttpService, MakeServiceRef}; - - use self::new_svc::NewSvcTask; -} - -#[cfg(any(feature = "http1", feature = "http2"))] pin_project! { /// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. /// @@ -46,17 +39,8 @@ pin_project! { } } -/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. -/// -/// Needs at least one of the `http1` and `http2` features to be activated to actually be useful. -#[cfg(not(any(feature = "http1", feature = "http2")))] -pub struct Server { - _marker: std::marker::PhantomData<(I, S, E)>, -} - /// A builder for a [`Server`](Server). #[derive(Debug)] -#[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] pub struct Builder { incoming: I, @@ -65,7 +49,6 @@ pub struct Builder { // ===== impl Server ===== -#[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] impl Server { /// Starts a [`Builder`](Builder) with the provided incoming stream. @@ -77,47 +60,48 @@ impl Server { } } -cfg_feature! { - #![all(feature = "tcp", any(feature = "http1", feature = "http2"))] - - impl Server { - /// Binds to the provided address, and returns a [`Builder`](Builder). - /// - /// # Panics - /// - /// This method will panic if binding to the address fails. For a method - /// to bind to an address and return a `Result`, see `Server::try_bind`. - pub fn bind(addr: &SocketAddr) -> Builder { - let incoming = AddrIncoming::new(addr).unwrap_or_else(|e| { - panic!("error binding to {}: {}", addr, e); - }); - Server::builder(incoming) - } +#[cfg(feature = "tcp")] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))) +)] +impl Server { + /// Binds to the provided address, and returns a [`Builder`](Builder). + /// + /// # Panics + /// + /// This method will panic if binding to the address fails. For a method + /// to bind to an address and return a `Result`, see `Server::try_bind`. + pub fn bind(addr: &SocketAddr) -> Builder { + let incoming = AddrIncoming::new(addr).unwrap_or_else(|e| { + panic!("error binding to {}: {}", addr, e); + }); + Server::builder(incoming) + } - /// Tries to bind to the provided address, and returns a [`Builder`](Builder). - pub fn try_bind(addr: &SocketAddr) -> crate::Result> { - AddrIncoming::new(addr).map(Server::builder) - } + /// Tries to bind to the provided address, and returns a [`Builder`](Builder). + pub fn try_bind(addr: &SocketAddr) -> crate::Result> { + AddrIncoming::new(addr).map(Server::builder) + } - /// Create a new instance from a `std::net::TcpListener` instance. - pub fn from_tcp(listener: StdTcpListener) -> Result, crate::Error> { - AddrIncoming::from_std(listener).map(Server::builder) - } + /// Create a new instance from a `std::net::TcpListener` instance. + pub fn from_tcp(listener: StdTcpListener) -> Result, crate::Error> { + AddrIncoming::from_std(listener).map(Server::builder) } } -cfg_feature! { - #![all(feature = "tcp", any(feature = "http1", feature = "http2"))] - - impl Server { - /// Returns the local address that this server is bound to. - pub fn local_addr(&self) -> SocketAddr { - self.incoming.local_addr() - } +#[cfg(feature = "tcp")] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))) +)] +impl Server { + /// Returns the local address that this server is bound to. + pub fn local_addr(&self) -> SocketAddr { + self.incoming.local_addr() } } -#[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] impl Server where @@ -220,7 +204,6 @@ where } } -#[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] impl Future for Server where @@ -244,7 +227,6 @@ where impl fmt::Debug for Server { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut st = f.debug_struct("Server"); - #[cfg(any(feature = "http1", feature = "http2"))] st.field("listener", &self.incoming); st.finish() } @@ -252,7 +234,6 @@ impl fmt::Debug for Server { // ===== impl Builder ===== -#[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] impl Builder { /// Start a new builder, wrapping an incoming stream and low-level options. @@ -572,7 +553,11 @@ impl Builder { } } -#[cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))] +#[cfg(feature = "tcp")] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))) +)] impl Builder { /// Set whether TCP keepalive messages are enabled on accepted connections. /// @@ -619,7 +604,6 @@ impl Builder { // The `Server::with_graceful_shutdown` needs to keep track of all active // connections, and signal that they start to shutdown when prompted, so // it has a `GracefulWatcher` implementation to do that. -#[cfg(any(feature = "http1", feature = "http2"))] pub trait Watcher, E>: Clone { type Future: Future>; @@ -628,10 +612,8 @@ pub trait Watcher, E>: Clone { #[allow(missing_debug_implementations)] #[derive(Copy, Clone)] -#[cfg(any(feature = "http1", feature = "http2"))] pub struct NoopWatcher; -#[cfg(any(feature = "http1", feature = "http2"))] impl Watcher for NoopWatcher where I: AsyncRead + AsyncWrite + Unpin + Send + 'static, @@ -647,7 +629,6 @@ where } } -#[cfg(any(feature = "http1", feature = "http2"))] // used by exec.rs pub(crate) mod new_svc { use std::error::Error as StdError; @@ -759,7 +740,6 @@ pub(crate) mod new_svc { } } -#[cfg(any(feature = "http1", feature = "http2"))] pin_project! { /// A future building a new `Service` to a `Connection`. /// @@ -776,7 +756,6 @@ pin_project! { } } -#[cfg(any(feature = "http1", feature = "http2"))] impl Future for Connecting where I: AsyncRead + AsyncWrite + Unpin, diff --git a/src/server/server_stub.rs b/src/server/server_stub.rs new file mode 100644 index 0000000000..87b1f5131f --- /dev/null +++ b/src/server/server_stub.rs @@ -0,0 +1,16 @@ +use std::fmt; + +use crate::common::exec::Exec; + +/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. +/// +/// Needs at least one of the `http1` and `http2` features to be activated to actually be useful. +pub struct Server { + _marker: std::marker::PhantomData<(I, S, E)>, +} + +impl fmt::Debug for Server { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Server").finish() + } +} From ff4c8ccb80d1f14a0c5b0a0c346cd46b6e094535 Mon Sep 17 00:00:00 2001 From: SabrinaJewson Date: Sat, 12 Mar 2022 15:40:35 +0000 Subject: [PATCH 3/3] style(server): rustfmt --- src/common/exec.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/exec.rs b/src/common/exec.rs index f890e6aae5..76f616184b 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -11,7 +11,7 @@ use crate::body::HttpBody; use crate::proto::h2::server::H2Stream; use crate::rt::Executor; #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] -use crate::server::server::{Watcher, new_svc::NewSvcTask}; +use crate::server::server::{new_svc::NewSvcTask, Watcher}; #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] use crate::service::HttpService;