diff --git a/src/common/exec.rs b/src/common/exec.rs index b6da9a276b..76f616184b 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -11,7 +11,7 @@ use crate::body::HttpBody; use crate::proto::h2::server::H2Stream; use crate::rt::Executor; #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] -use crate::server::conn::spawn_all::{NewSvcTask, Watcher}; +use crate::server::server::{new_svc::NewSvcTask, Watcher}; #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] use crate::service::HttpService; diff --git a/src/server/conn.rs b/src/server/conn.rs index de765b3a15..3f9eaf9e62 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -48,8 +48,7 @@ not(all(feature = "http1", feature = "http2")) ))] use std::marker::PhantomData; -#[cfg(feature = "tcp")] -use std::net::SocketAddr; +#[cfg(all(any(feature = "http1", feature = "http2"), feature = "runtime"))] use std::time::Duration; #[cfg(feature = "http2")] @@ -70,17 +69,15 @@ cfg_feature! { use tokio::io::{AsyncRead, AsyncWrite}; use tracing::trace; - use super::accept::Accept; + pub use super::server::Connecting; use crate::body::{Body, HttpBody}; use crate::common::{task, Future, Pin, Poll, Unpin}; #[cfg(not(all(feature = "http1", feature = "http2")))] use crate::common::Never; - use crate::common::exec::{ConnStreamExec, Exec, NewSvcExec}; + use crate::common::exec::{ConnStreamExec, Exec}; use crate::proto; - use crate::service::{HttpService, MakeServiceRef}; - use self::spawn_all::NewSvcTask; + use crate::service::HttpService; - pub(super) use self::spawn_all::{NoopWatcher, Watcher}; pub(super) use self::upgrades::UpgradeableConnection; } @@ -97,7 +94,7 @@ pub use super::tcp::{AddrIncoming, AddrStream}; #[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] pub struct Http { - exec: E, + pub(crate) exec: E, h1_half_close: bool, h1_keep_alive: bool, h1_title_case_headers: bool, @@ -127,51 +124,6 @@ enum ConnectionMode { Fallback, } -#[cfg(any(feature = "http1", feature = "http2"))] -pin_project! { - /// A stream mapping incoming IOs to new services. - /// - /// Yields `Connecting`s that are futures that should be put on a reactor. - #[must_use = "streams do nothing unless polled"] - #[derive(Debug)] - pub(super) struct Serve { - #[pin] - incoming: I, - make_service: S, - protocol: Http, - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -pin_project! { - /// A future building a new `Service` to a `Connection`. - /// - /// Wraps the future returned from `MakeService` into one that returns - /// a `Connection`. - #[must_use = "futures do nothing unless polled"] - #[derive(Debug)] - #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] - pub struct Connecting { - #[pin] - future: F, - io: Option, - protocol: Http, - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -pin_project! { - #[must_use = "futures do nothing unless polled"] - #[derive(Debug)] - pub(super) struct SpawnAll { - // TODO: re-add `pub(super)` once rustdoc can handle this. - // - // See https://github.com/rust-lang/rust/issues/64705 - #[pin] - pub(super) serve: Serve, - } -} - #[cfg(any(feature = "http1", feature = "http2"))] pin_project! { /// A future binding a connection with a Service. @@ -719,23 +671,6 @@ impl Http { fallback: PhantomData, } } - - pub(super) fn serve(&self, incoming: I, make_service: S) -> Serve - where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin, - S: MakeServiceRef, - S::Error: Into>, - Bd: HttpBody, - E: ConnStreamExec<>::Future, Bd>, - { - Serve { - incoming, - make_service, - protocol: self.clone(), - } - } } // ===== impl Connection ===== @@ -987,141 +922,6 @@ impl Default for ConnectionMode { } } -// ===== impl Serve ===== - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Serve { - /// Get a reference to the incoming stream. - #[inline] - pub(super) fn incoming_ref(&self) -> &I { - &self.incoming - } - - /* - /// Get a mutable reference to the incoming stream. - #[inline] - pub fn incoming_mut(&mut self) -> &mut I { - &mut self.incoming - } - */ - - /// Spawn all incoming connections onto the executor in `Http`. - pub(super) fn spawn_all(self) -> SpawnAll { - SpawnAll { serve: self } - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Serve -where - I: Accept, - IO: AsyncRead + AsyncWrite + Unpin, - IE: Into>, - S: MakeServiceRef, - B: HttpBody, - E: ConnStreamExec<>::Future, B>, -{ - fn poll_next_( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll>>> { - let me = self.project(); - match ready!(me.make_service.poll_ready_ref(cx)) { - Ok(()) => (), - Err(e) => { - trace!("make_service closed"); - return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e)))); - } - } - - if let Some(item) = ready!(me.incoming.poll_accept(cx)) { - let io = item.map_err(crate::Error::new_accept)?; - let new_fut = me.make_service.make_service_ref(&io); - Poll::Ready(Some(Ok(Connecting { - future: new_fut, - io: Some(io), - protocol: me.protocol.clone(), - }))) - } else { - Poll::Ready(None) - } - } -} - -// ===== impl Connecting ===== - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Future for Connecting -where - I: AsyncRead + AsyncWrite + Unpin, - F: Future>, - S: HttpService, - B: HttpBody + 'static, - B::Error: Into>, - E: ConnStreamExec, -{ - type Output = Result, FE>; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let mut me = self.project(); - let service = ready!(me.future.poll(cx))?; - let io = Option::take(&mut me.io).expect("polled after complete"); - Poll::Ready(Ok(me.protocol.serve_connection(io, service))) - } -} - -// ===== impl SpawnAll ===== - -#[cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))] -impl SpawnAll { - pub(super) fn local_addr(&self) -> SocketAddr { - self.serve.incoming.local_addr() - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -impl SpawnAll { - pub(super) fn incoming_ref(&self) -> &I { - self.serve.incoming_ref() - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -impl SpawnAll -where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, - B: HttpBody, - E: ConnStreamExec<>::Future, B>, -{ - pub(super) fn poll_watch( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - watcher: &W, - ) -> Poll> - where - E: NewSvcExec, - W: Watcher, - { - let mut me = self.project(); - loop { - if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) { - let fut = NewSvcTask::new(connecting, watcher.clone()); - me.serve - .as_mut() - .project() - .protocol - .exec - .execute_new_svc(fut); - } else { - return Poll::Ready(Ok(())); - } - } - } -} - // ===== impl ProtoServer ===== #[cfg(any(feature = "http1", feature = "http2"))] @@ -1151,150 +951,6 @@ where } } -#[cfg(any(feature = "http1", feature = "http2"))] -pub(crate) mod spawn_all { - use std::error::Error as StdError; - use tokio::io::{AsyncRead, AsyncWrite}; - use tracing::debug; - - use super::{Connecting, UpgradeableConnection}; - use crate::body::{Body, HttpBody}; - use crate::common::exec::ConnStreamExec; - use crate::common::{task, Future, Pin, Poll, Unpin}; - use crate::service::HttpService; - use pin_project_lite::pin_project; - - // Used by `SpawnAll` to optionally watch a `Connection` future. - // - // The regular `hyper::Server` just uses a `NoopWatcher`, which does - // not need to watch anything, and so returns the `Connection` untouched. - // - // The `Server::with_graceful_shutdown` needs to keep track of all active - // connections, and signal that they start to shutdown when prompted, so - // it has a `GracefulWatcher` implementation to do that. - pub trait Watcher, E>: Clone { - type Future: Future>; - - fn watch(&self, conn: UpgradeableConnection) -> Self::Future; - } - - #[allow(missing_debug_implementations)] - #[derive(Copy, Clone)] - pub struct NoopWatcher; - - impl Watcher for NoopWatcher - where - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: HttpService, - E: ConnStreamExec, - S::ResBody: 'static, - ::Error: Into>, - { - type Future = UpgradeableConnection; - - fn watch(&self, conn: UpgradeableConnection) -> Self::Future { - conn - } - } - - // This is a `Future` spawned to an `Executor` inside - // the `SpawnAll`. By being a nameable type, we can be generic over the - // user's `Service::Future`, and thus an `Executor` can execute it. - // - // Doing this allows for the server to conditionally require `Send` futures, - // depending on the `Executor` configured. - // - // Users cannot import this type, nor the associated `NewSvcExec`. Instead, - // a blanket implementation for `Executor` is sufficient. - - pin_project! { - #[allow(missing_debug_implementations)] - pub struct NewSvcTask, E, W: Watcher> { - #[pin] - state: State, - } - } - - pin_project! { - #[project = StateProj] - pub(super) enum State, E, W: Watcher> { - Connecting { - #[pin] - connecting: Connecting, - watcher: W, - }, - Connected { - #[pin] - future: W::Future, - }, - } - } - - impl, E, W: Watcher> NewSvcTask { - pub(super) fn new(connecting: Connecting, watcher: W) -> Self { - NewSvcTask { - state: State::Connecting { - connecting, - watcher, - }, - } - } - } - - impl Future for NewSvcTask - where - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - N: Future>, - NE: Into>, - S: HttpService, - B: HttpBody + 'static, - B::Error: Into>, - E: ConnStreamExec, - W: Watcher, - { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - // If it weren't for needing to name this type so the `Send` bounds - // could be projected to the `Serve` executor, this could just be - // an `async fn`, and much safer. Woe is me. - - let mut me = self.project(); - loop { - let next = { - match me.state.as_mut().project() { - StateProj::Connecting { - connecting, - watcher, - } => { - let res = ready!(connecting.poll(cx)); - let conn = match res { - Ok(conn) => conn, - Err(err) => { - let err = crate::Error::new_user_make_service(err); - debug!("connecting error: {}", err); - return Poll::Ready(()); - } - }; - let future = watcher.watch(conn.with_upgrades()); - State::Connected { future } - } - StateProj::Connected { future } => { - return future.poll(cx).map(|res| { - if let Err(err) = res { - debug!("connection error: {}", err); - } - }); - } - } - }; - - me.state.set(next); - } - } - } -} - #[cfg(any(feature = "http1", feature = "http2"))] mod upgrades { use super::*; diff --git a/src/server/mod.rs b/src/server/mod.rs index a97944f518..b5508aefdd 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -149,7 +149,6 @@ pub mod accept; pub mod conn; -mod server; #[cfg(feature = "tcp")] mod tcp; @@ -158,7 +157,15 @@ pub use self::server::Server; cfg_feature! { #![any(feature = "http1", feature = "http2")] + pub(crate) mod server; pub use self::server::Builder; mod shutdown; } + +cfg_feature! { + #![not(any(feature = "http1", feature = "http2"))] + + mod server_stub; + use server_stub as server; +} diff --git a/src/server/server.rs b/src/server/server.rs index c48582c7fd..e3058da4bb 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -1,33 +1,29 @@ +use std::error::Error as StdError; use std::fmt; #[cfg(feature = "tcp")] use std::net::{SocketAddr, TcpListener as StdTcpListener}; #[cfg(any(feature = "tcp", feature = "http1"))] use std::time::Duration; -#[cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))] +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::trace; + +use super::accept::Accept; +#[cfg(all(feature = "tcp"))] use super::tcp::AddrIncoming; +use crate::body::{Body, HttpBody}; use crate::common::exec::Exec; +use crate::common::exec::{ConnStreamExec, NewSvcExec}; +use crate::common::{task, Future, Pin, Poll, Unpin}; +// Renamed `Http` as `Http_` for now so that people upgrading don't see an +// error that `hyper::server::Http` is private... +use super::conn::{Connection, Http as Http_, UpgradeableConnection}; +use super::shutdown::{Graceful, GracefulWatcher}; +use crate::service::{HttpService, MakeServiceRef}; -cfg_feature! { - #![any(feature = "http1", feature = "http2")] - - use std::error::Error as StdError; - - use pin_project_lite::pin_project; - use tokio::io::{AsyncRead, AsyncWrite}; +use self::new_svc::NewSvcTask; - use super::accept::Accept; - use crate::body::{Body, HttpBody}; - use crate::common::{task, Future, Pin, Poll, Unpin}; - use crate::common::exec::{ConnStreamExec, NewSvcExec}; - // Renamed `Http` as `Http_` for now so that people upgrading don't see an - // error that `hyper::server::Http` is private... - use super::conn::{Http as Http_, NoopWatcher, SpawnAll}; - use super::shutdown::{Graceful, GracefulWatcher}; - use crate::service::{HttpService, MakeServiceRef}; -} - -#[cfg(any(feature = "http1", feature = "http2"))] pin_project! { /// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. /// @@ -37,21 +33,14 @@ pin_project! { /// `Executor`. pub struct Server { #[pin] - spawn_all: SpawnAll, + incoming: I, + make_service: S, + protocol: Http_, } } -/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. -/// -/// Needs at least one of the `http1` and `http2` features to be activated to actually be useful. -#[cfg(not(any(feature = "http1", feature = "http2")))] -pub struct Server { - _marker: std::marker::PhantomData<(I, S, E)>, -} - /// A builder for a [`Server`](Server). #[derive(Debug)] -#[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] pub struct Builder { incoming: I, @@ -60,7 +49,6 @@ pub struct Builder { // ===== impl Server ===== -#[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] impl Server { /// Starts a [`Builder`](Builder) with the provided incoming stream. @@ -72,47 +60,48 @@ impl Server { } } -cfg_feature! { - #![all(feature = "tcp", any(feature = "http1", feature = "http2"))] - - impl Server { - /// Binds to the provided address, and returns a [`Builder`](Builder). - /// - /// # Panics - /// - /// This method will panic if binding to the address fails. For a method - /// to bind to an address and return a `Result`, see `Server::try_bind`. - pub fn bind(addr: &SocketAddr) -> Builder { - let incoming = AddrIncoming::new(addr).unwrap_or_else(|e| { - panic!("error binding to {}: {}", addr, e); - }); - Server::builder(incoming) - } +#[cfg(feature = "tcp")] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))) +)] +impl Server { + /// Binds to the provided address, and returns a [`Builder`](Builder). + /// + /// # Panics + /// + /// This method will panic if binding to the address fails. For a method + /// to bind to an address and return a `Result`, see `Server::try_bind`. + pub fn bind(addr: &SocketAddr) -> Builder { + let incoming = AddrIncoming::new(addr).unwrap_or_else(|e| { + panic!("error binding to {}: {}", addr, e); + }); + Server::builder(incoming) + } - /// Tries to bind to the provided address, and returns a [`Builder`](Builder). - pub fn try_bind(addr: &SocketAddr) -> crate::Result> { - AddrIncoming::new(addr).map(Server::builder) - } + /// Tries to bind to the provided address, and returns a [`Builder`](Builder). + pub fn try_bind(addr: &SocketAddr) -> crate::Result> { + AddrIncoming::new(addr).map(Server::builder) + } - /// Create a new instance from a `std::net::TcpListener` instance. - pub fn from_tcp(listener: StdTcpListener) -> Result, crate::Error> { - AddrIncoming::from_std(listener).map(Server::builder) - } + /// Create a new instance from a `std::net::TcpListener` instance. + pub fn from_tcp(listener: StdTcpListener) -> Result, crate::Error> { + AddrIncoming::from_std(listener).map(Server::builder) } } -cfg_feature! { - #![all(feature = "tcp", any(feature = "http1", feature = "http2"))] - - impl Server { - /// Returns the local address that this server is bound to. - pub fn local_addr(&self) -> SocketAddr { - self.spawn_all.local_addr() - } +#[cfg(feature = "tcp")] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))) +)] +impl Server { + /// Returns the local address that this server is bound to. + pub fn local_addr(&self) -> SocketAddr { + self.incoming.local_addr() } } -#[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] impl Server where @@ -124,7 +113,6 @@ where B: HttpBody + 'static, B::Error: Into>, E: ConnStreamExec<>::Future, B>, - E: NewSvcExec, { /// Prepares a server to handle graceful shutdown when the provided future /// completes. @@ -165,12 +153,57 @@ where pub fn with_graceful_shutdown(self, signal: F) -> Graceful where F: Future, + E: NewSvcExec, { - Graceful::new(self.spawn_all, signal) + Graceful::new(self, signal) + } + + fn poll_next_( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll>>> { + let me = self.project(); + match ready!(me.make_service.poll_ready_ref(cx)) { + Ok(()) => (), + Err(e) => { + trace!("make_service closed"); + return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e)))); + } + } + + if let Some(item) = ready!(me.incoming.poll_accept(cx)) { + let io = item.map_err(crate::Error::new_accept)?; + let new_fut = me.make_service.make_service_ref(&io); + Poll::Ready(Some(Ok(Connecting { + future: new_fut, + io: Some(io), + protocol: me.protocol.clone(), + }))) + } else { + Poll::Ready(None) + } + } + + pub(super) fn poll_watch( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + watcher: &W, + ) -> Poll> + where + E: NewSvcExec, + W: Watcher, + { + loop { + if let Some(connecting) = ready!(self.as_mut().poll_next_(cx)?) { + let fut = NewSvcTask::new(connecting, watcher.clone()); + self.as_mut().project().protocol.exec.execute_new_svc(fut); + } else { + return Poll::Ready(Ok(())); + } + } } } -#[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] impl Future for Server where @@ -187,22 +220,20 @@ where type Output = crate::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - self.project().spawn_all.poll_watch(cx, &NoopWatcher) + self.poll_watch(cx, &NoopWatcher) } } impl fmt::Debug for Server { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut st = f.debug_struct("Server"); - #[cfg(any(feature = "http1", feature = "http2"))] - st.field("listener", &self.spawn_all.incoming_ref()); + st.field("listener", &self.incoming); st.finish() } } // ===== impl Builder ===== -#[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] impl Builder { /// Start a new builder, wrapping an incoming stream and low-level options. @@ -309,7 +340,7 @@ impl Builder { self } - /// Set a timeout for reading client request headers. If a client does not + /// Set a timeout for reading client request headers. If a client does not /// transmit the entire header within this time, the connection is closed. /// /// Default is None. @@ -502,7 +533,7 @@ impl Builder { /// } /// # } /// ``` - pub fn serve(self, new_service: S) -> Server + pub fn serve(self, make_service: S) -> Server where I: Accept, I::Error: Into>, @@ -514,13 +545,19 @@ impl Builder { E: NewSvcExec, E: ConnStreamExec<>::Future, B>, { - let serve = self.protocol.serve(self.incoming, new_service); - let spawn_all = serve.spawn_all(); - Server { spawn_all } + Server { + incoming: self.incoming, + make_service, + protocol: self.protocol.clone(), + } } } -#[cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))] +#[cfg(feature = "tcp")] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))) +)] impl Builder { /// Set whether TCP keepalive messages are enabled on accepted connections. /// @@ -558,3 +595,182 @@ impl Builder { self } } + +// Used by `Server` to optionally watch a `Connection` future. +// +// The regular `hyper::Server` just uses a `NoopWatcher`, which does +// not need to watch anything, and so returns the `Connection` untouched. +// +// The `Server::with_graceful_shutdown` needs to keep track of all active +// connections, and signal that they start to shutdown when prompted, so +// it has a `GracefulWatcher` implementation to do that. +pub trait Watcher, E>: Clone { + type Future: Future>; + + fn watch(&self, conn: UpgradeableConnection) -> Self::Future; +} + +#[allow(missing_debug_implementations)] +#[derive(Copy, Clone)] +pub struct NoopWatcher; + +impl Watcher for NoopWatcher +where + I: AsyncRead + AsyncWrite + Unpin + Send + 'static, + S: HttpService, + E: ConnStreamExec, + S::ResBody: 'static, + ::Error: Into>, +{ + type Future = UpgradeableConnection; + + fn watch(&self, conn: UpgradeableConnection) -> Self::Future { + conn + } +} + +// used by exec.rs +pub(crate) mod new_svc { + use std::error::Error as StdError; + use tokio::io::{AsyncRead, AsyncWrite}; + use tracing::debug; + + use super::{Connecting, Watcher}; + use crate::body::{Body, HttpBody}; + use crate::common::exec::ConnStreamExec; + use crate::common::{task, Future, Pin, Poll, Unpin}; + use crate::service::HttpService; + use pin_project_lite::pin_project; + + // This is a `Future` spawned to an `Executor` inside + // the `Server`. By being a nameable type, we can be generic over the + // user's `Service::Future`, and thus an `Executor` can execute it. + // + // Doing this allows for the server to conditionally require `Send` futures, + // depending on the `Executor` configured. + // + // Users cannot import this type, nor the associated `NewSvcExec`. Instead, + // a blanket implementation for `Executor` is sufficient. + + pin_project! { + #[allow(missing_debug_implementations)] + pub struct NewSvcTask, E, W: Watcher> { + #[pin] + state: State, + } + } + + pin_project! { + #[project = StateProj] + pub(super) enum State, E, W: Watcher> { + Connecting { + #[pin] + connecting: Connecting, + watcher: W, + }, + Connected { + #[pin] + future: W::Future, + }, + } + } + + impl, E, W: Watcher> NewSvcTask { + pub(super) fn new(connecting: Connecting, watcher: W) -> Self { + NewSvcTask { + state: State::Connecting { + connecting, + watcher, + }, + } + } + } + + impl Future for NewSvcTask + where + I: AsyncRead + AsyncWrite + Unpin + Send + 'static, + N: Future>, + NE: Into>, + S: HttpService, + B: HttpBody + 'static, + B::Error: Into>, + E: ConnStreamExec, + W: Watcher, + { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + // If it weren't for needing to name this type so the `Send` bounds + // could be projected to the `Serve` executor, this could just be + // an `async fn`, and much safer. Woe is me. + + let mut me = self.project(); + loop { + let next = { + match me.state.as_mut().project() { + StateProj::Connecting { + connecting, + watcher, + } => { + let res = ready!(connecting.poll(cx)); + let conn = match res { + Ok(conn) => conn, + Err(err) => { + let err = crate::Error::new_user_make_service(err); + debug!("connecting error: {}", err); + return Poll::Ready(()); + } + }; + let future = watcher.watch(conn.with_upgrades()); + State::Connected { future } + } + StateProj::Connected { future } => { + return future.poll(cx).map(|res| { + if let Err(err) = res { + debug!("connection error: {}", err); + } + }); + } + } + }; + + me.state.set(next); + } + } + } +} + +pin_project! { + /// A future building a new `Service` to a `Connection`. + /// + /// Wraps the future returned from `MakeService` into one that returns + /// a `Connection`. + #[must_use = "futures do nothing unless polled"] + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] + pub struct Connecting { + #[pin] + future: F, + io: Option, + protocol: Http_, + } +} + +impl Future for Connecting +where + I: AsyncRead + AsyncWrite + Unpin, + F: Future>, + S: HttpService, + B: HttpBody + 'static, + B::Error: Into>, + E: ConnStreamExec, +{ + type Output = Result, FE>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let mut me = self.project(); + let service = ready!(me.future.poll(cx))?; + let io = Option::take(&mut me.io).expect("polled after complete"); + Poll::Ready(Ok(me.protocol.serve_connection(io, service))) + } +} diff --git a/src/server/server_stub.rs b/src/server/server_stub.rs new file mode 100644 index 0000000000..87b1f5131f --- /dev/null +++ b/src/server/server_stub.rs @@ -0,0 +1,16 @@ +use std::fmt; + +use crate::common::exec::Exec; + +/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. +/// +/// Needs at least one of the `http1` and `http2` features to be activated to actually be useful. +pub struct Server { + _marker: std::marker::PhantomData<(I, S, E)>, +} + +impl fmt::Debug for Server { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Server").finish() + } +} diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index 2277a40964..96937d0827 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -5,7 +5,8 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tracing::debug; use super::accept::Accept; -use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; +use super::conn::UpgradeableConnection; +use super::server::{Server, Watcher}; use crate::body::{Body, HttpBody}; use crate::common::drain::{self, Draining, Signal, Watch, Watching}; use crate::common::exec::{ConnStreamExec, NewSvcExec}; @@ -26,7 +27,7 @@ pin_project! { Running { drain: Option<(Signal, Watch)>, #[pin] - spawn_all: SpawnAll, + server: Server, #[pin] signal: F, }, @@ -35,12 +36,12 @@ pin_project! { } impl Graceful { - pub(super) fn new(spawn_all: SpawnAll, signal: F) -> Self { + pub(super) fn new(server: Server, signal: F) -> Self { let drain = Some(drain::channel()); Graceful { state: State::Running { drain, - spawn_all, + server, signal, }, } @@ -69,7 +70,7 @@ where match me.state.as_mut().project() { StateProj::Running { drain, - spawn_all, + server, signal, } => match signal.poll(cx) { Poll::Ready(()) => { @@ -81,7 +82,7 @@ where } Poll::Pending => { let watch = drain.as_ref().expect("drain channel").1.clone(); - return spawn_all.poll_watch(cx, &GracefulWatcher(watch)); + return server.poll_watch(cx, &GracefulWatcher(watch)); } }, StateProj::Draining { ref mut draining } => {