From 9b422a6030de73737028859900944ee26751e673 Mon Sep 17 00:00:00 2001 From: Daniele Ahmed Date: Fri, 7 Oct 2022 18:46:07 +0200 Subject: [PATCH 01/16] Timeout on body Closes #295 Add a timeout on inactive bodies, both on request and response bodies. Signed-off-by: Daniele Ahmed Co-authored-by: Harry Barber --- tower-http/src/timeout/body.rs | 345 ++++++++++++++++++ tower-http/src/timeout/mod.rs | 4 + .../src/{timeout.rs => timeout/service.rs} | 2 +- 3 files changed, 350 insertions(+), 1 deletion(-) create mode 100644 tower-http/src/timeout/body.rs create mode 100644 tower-http/src/timeout/mod.rs rename tower-http/src/{timeout.rs => timeout/service.rs} (98%) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs new file mode 100644 index 00000000..3f998864 --- /dev/null +++ b/tower-http/src/timeout/body.rs @@ -0,0 +1,345 @@ +//! Middleware that applies a timeout to bodies. +//! +//! Bodies must produce data at most within the specified timeout. +//! If they are inactive, an error will be generated. +//! +//! # Differences from `tower_http::timeout::service::Timeout` +//! +//! [`tower_http::timeout::service::Timeout`] applies a timeout on the full request. +//! That timeout is not reset when bytes are handled, whether the request is active or not. +//! +//! This middleware will return a `TimeoutError`. +//! +//! # Example +//! +//! ``` +//! use http::{Request, Response}; +//! use hyper::Body; +//! use std::time::Duration; +//! use tower::ServiceBuilder; +//! use tower_http::timeout::body::RequestTimeoutBodyLayer; +//! +//! async fn handle(_: Request) -> Result, Infallible> { +//! // ... +//! # Ok(Response::new(Body::empty())) +//! } +//! +//! # #[tokio::main] +//! # async fn main() -> Result<(), Box> { +//! let svc = ServiceBuilder::new() +//! // Timeout bodies after 30 seconds of inactivity +//! .layer(RequestTimeoutBodyLayer::new(Duration::from_secs(30))) +//! .service_fn(handle); +//! # Ok(()) +//! # } +//! ``` + +use std::{time::Duration, task::{Context, Poll}, pin::Pin}; +use http_body::Body; +use futures_core::Future; +use pin_project_lite::pin_project; +use tokio::time::{Sleep, sleep}; +use http::{Request, Response}; +use tower_layer::Layer; +use tower_service::Service; + +pin_project! { + /// Wrapper around a `http_body::Body` to time out if data is not ready within the specified duration. + pub struct TimeoutBody { + timeout: Duration, + #[pin] + sleep: Option, + #[pin] + body: B, + } +} + +impl TimeoutBody { + /// Create a new [`TimeoutBody`]. + pub fn new(timeout: Duration, body: B) -> Self { + TimeoutBody { + timeout, + sleep: None, + body, + } + } +} + +impl Body for TimeoutBody +where + B: Body, +{ + type Data = B::Data; + type Error = Box; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut this = self.project(); + + // Start the `Sleep` if not active. + let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() { + some + } else { + this.sleep.set(Some(sleep(*this.timeout))); + this.sleep.as_mut().as_pin_mut().unwrap() + }; + + // Error if the timeout has expired. + match sleep_pinned.poll(cx) { + Poll::Pending => (), + Poll::Ready(()) => return Poll::Ready(Some(Err(Box::new(TimeoutError)))), + } + + // Check for body data. + match this.body.poll_data(cx) { + Poll::Ready(data) => { + // Some data is ready. Reset the `Sleep`... + this.sleep.set(Some(sleep(*this.timeout))); + + // ...then `poll` it to get awoken. + let _ = this.sleep.as_pin_mut().unwrap().poll(cx); + Poll::Ready(data.transpose().map_err(|_| Box::new(TimeoutError) as Self::Error).transpose()) + } + Poll::Pending => Poll::Pending + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + let this = self.project(); + + // Error if the timeout has expired. + match this.sleep.as_pin_mut().expect("poll_data was not called").poll(cx) { + Poll::Pending => (), + Poll::Ready(()) => return Poll::Ready(Err(Box::new(TimeoutError))), + } + + this.body.poll_trailers(cx).map_err(|_| Box::new(TimeoutError) as Self::Error) + } +} + +/// Error for [`TimeoutBody`]. +#[derive(Debug)] +pub struct TimeoutError; + +impl std::error::Error for TimeoutError {} + +impl std::fmt::Display for TimeoutError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "TimeoutError") + } +} + +/// Apply a TimeoutBody to the request body. +#[derive(Clone, Debug)] +pub struct RequestBodyTimeoutLayer { + timeout: Duration, +} + +impl RequestBodyTimeoutLayer { + /// Create a new [`RequestBodyTimeoutLayer`]. + pub fn new(timeout: Duration) -> Self { + Self { timeout } + } +} + +impl Layer for RequestBodyTimeoutLayer +{ + type Service = RequestBodyTimeout; + + fn layer(&self, inner: S) -> Self::Service { + RequestBodyTimeout::new(inner, self.timeout) + } +} + +/// Apply a TimeoutBody to the request body. +#[derive(Clone, Debug)] +pub struct RequestBodyTimeout { + inner: S, + timeout: Duration, +} + +impl RequestBodyTimeout { + /// Create a new [`RequestBodyTimeout`]. + pub fn new(service: S, timeout: Duration) -> Self { + Self { inner: service, timeout } + } + + /// Returns a new [`Layer`] that wraps services with a `RequestBodyTimeoutLayer` middleware. + /// + /// [`Layer`]: tower_layer::Layer + pub fn layer(timeout: Duration) -> RequestBodyTimeoutLayer { + RequestBodyTimeoutLayer::new(timeout) + } + + define_inner_service_accessors!(); +} + +impl Service> for RequestBodyTimeout +where + S: Service>, Response = Response>, + S::Error: Into>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let req = req.map(|body| TimeoutBody::new(self.timeout, body)); + self.inner.call(req) + } +} + +/// Apply a TimeoutBody to the response body. +#[derive(Clone)] +pub struct ResponseBodyTimeoutLayer { + timeout: Duration, +} + +impl ResponseBodyTimeoutLayer { + /// Create a new [`ResponseBodyTimeoutLayer`]. + pub fn new(timeout: Duration) -> Self { + Self { timeout } + } +} + +impl Layer for ResponseBodyTimeoutLayer { + type Service = ResponseBodyTimeout; + + fn layer(&self, inner: S) -> Self::Service { + ResponseBodyTimeout::new(inner, self.timeout) + } +} + +/// Apply a TimeoutBody to the response body. +#[derive(Clone)] +pub struct ResponseBodyTimeout { + inner: S, + timeout: Duration, +} + +impl ResponseBodyTimeout { + /// Create a new [`ResponseBodyTimeout`]. + pub fn new(service: S, timeout: Duration) -> Self { + Self { inner: service, timeout } + } + + /// Returns a new [`Layer`] that wraps services with a `ResponseBodyTimeoutLayer` middleware. + /// + /// [`Layer`]: tower_layer::Layer + pub fn layer(timeout: Duration) -> ResponseBodyTimeoutLayer { + ResponseBodyTimeoutLayer::new(timeout) + } + + define_inner_service_accessors!(); +} + +impl Service> for ResponseBodyTimeout +where + S: Service, Response = Response>, + S::Error: Into>, +{ + type Response = Response>; + type Error = S::Error; + type Future = ResponseBodyTimeoutFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + ResponseBodyTimeoutFuture { inner: self.inner.call(req), timeout: self.timeout } + } +} + +pin_project! { + /// Response future for [`ResponseBodyTimeout`]. + pub struct ResponseBodyTimeoutFuture { + #[pin] + inner: Fut, + timeout: Duration, + } +} + +use futures_core::ready; +impl Future for ResponseBodyTimeoutFuture +where + Fut: Future, E>> +{ + type Output = Result>, E>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let timeout = self.timeout; + let this = self.project(); + let res = ready!(this.inner.poll(cx)?); + Poll::Ready(Ok(res.map(|body| TimeoutBody::new(timeout, body)))) + } +} + + +#[cfg(test)] +mod tests { + use super::*; + + use bytes::Bytes; + use pin_project_lite::pin_project; + + struct MockError; + + pin_project! { + struct MockBody { + #[pin] + sleep: Sleep + } + } + + impl Body for MockBody { + type Data = Bytes; + type Error = MockError; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let this = self.project(); + this.sleep.poll(cx).map(|_| Some(Ok(vec![].into()))) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + todo!() + } + } + + #[tokio::test] + async fn test_body_available_within_timeout() { + let mock_sleep = Duration::from_secs(1); + let timeout_sleep = Duration::from_secs(2); + + let mock_body = MockBody { sleep: sleep(mock_sleep) }; + let timeout_body = TimeoutBody::new(timeout_sleep, mock_body); + + assert!(timeout_body.boxed().data().await.unwrap().is_ok()); + } + + #[tokio::test] + async fn test_body_unavailable_within_timeout_error() { + let mock_sleep = Duration::from_secs(2); + let timeout_sleep = Duration::from_secs(1); + + let mock_body = MockBody { sleep: sleep(mock_sleep) }; + let timeout_body = TimeoutBody::new(timeout_sleep, mock_body); + + assert!(timeout_body.boxed().data().await.unwrap().is_err()); + } +} diff --git a/tower-http/src/timeout/mod.rs b/tower-http/src/timeout/mod.rs new file mode 100644 index 00000000..49d7b07f --- /dev/null +++ b/tower-http/src/timeout/mod.rs @@ -0,0 +1,4 @@ +//! Middleware for setting timeouts on requests and responses. + +pub mod body; +pub mod service; diff --git a/tower-http/src/timeout.rs b/tower-http/src/timeout/service.rs similarity index 98% rename from tower-http/src/timeout.rs rename to tower-http/src/timeout/service.rs index 6c669e9b..448abd13 100644 --- a/tower-http/src/timeout.rs +++ b/tower-http/src/timeout/service.rs @@ -20,7 +20,7 @@ //! use hyper::Body; //! use std::{convert::Infallible, time::Duration}; //! use tower::ServiceBuilder; -//! use tower_http::timeout::TimeoutLayer; +//! use tower_http::timeout::service::TimeoutLayer; //! //! async fn handle(_: Request) -> Result, Infallible> { //! // ... From 99b4f2dbb513e447efeca99cb0ada635eb308384 Mon Sep 17 00:00:00 2001 From: Daniele Ahmed Date: Tue, 11 Oct 2022 15:47:49 +0200 Subject: [PATCH 02/16] Poll sleep before returning data Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/body.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs index 3f998864..c7787adb 100644 --- a/tower-http/src/timeout/body.rs +++ b/tower-http/src/timeout/body.rs @@ -99,7 +99,9 @@ where this.sleep.set(Some(sleep(*this.timeout))); // ...then `poll` it to get awoken. - let _ = this.sleep.as_pin_mut().unwrap().poll(cx); + if let Poll::Ready(_) = this.sleep.as_pin_mut().unwrap().poll(cx) { + return Poll::Ready(Some(Err(Box::new(TimeoutError)))) + } Poll::Ready(data.transpose().map_err(|_| Box::new(TimeoutError) as Self::Error).transpose()) } Poll::Pending => Poll::Pending From c06d70e1b5302db9367ed217570e445c39528809 Mon Sep 17 00:00:00 2001 From: Daniele Ahmed Date: Tue, 11 Oct 2022 16:14:25 +0200 Subject: [PATCH 03/16] Fix comments Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/body.rs | 14 +++++++------- tower-http/src/timeout/service.rs | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs index c7787adb..fc3fdfef 100644 --- a/tower-http/src/timeout/body.rs +++ b/tower-http/src/timeout/body.rs @@ -55,7 +55,7 @@ pin_project! { } impl TimeoutBody { - /// Create a new [`TimeoutBody`]. + /// Creates a new [`TimeoutBody`]. pub fn new(timeout: Duration, body: B) -> Self { TimeoutBody { timeout, @@ -136,14 +136,14 @@ impl std::fmt::Display for TimeoutError { } } -/// Apply a TimeoutBody to the request body. +/// Applies a TimeoutBody to the request body. #[derive(Clone, Debug)] pub struct RequestBodyTimeoutLayer { timeout: Duration, } impl RequestBodyTimeoutLayer { - /// Create a new [`RequestBodyTimeoutLayer`]. + /// Creates a new [`RequestBodyTimeoutLayer`]. pub fn new(timeout: Duration) -> Self { Self { timeout } } @@ -166,7 +166,7 @@ pub struct RequestBodyTimeout { } impl RequestBodyTimeout { - /// Create a new [`RequestBodyTimeout`]. + /// Creates a new [`RequestBodyTimeout`]. pub fn new(service: S, timeout: Duration) -> Self { Self { inner: service, timeout } } @@ -207,7 +207,7 @@ pub struct ResponseBodyTimeoutLayer { } impl ResponseBodyTimeoutLayer { - /// Create a new [`ResponseBodyTimeoutLayer`]. + /// Creates a new [`ResponseBodyTimeoutLayer`]. pub fn new(timeout: Duration) -> Self { Self { timeout } } @@ -221,7 +221,7 @@ impl Layer for ResponseBodyTimeoutLayer { } } -/// Apply a TimeoutBody to the response body. +/// Applies a TimeoutBody to the response body. #[derive(Clone)] pub struct ResponseBodyTimeout { inner: S, @@ -229,7 +229,7 @@ pub struct ResponseBodyTimeout { } impl ResponseBodyTimeout { - /// Create a new [`ResponseBodyTimeout`]. + /// Creates a new [`ResponseBodyTimeout`]. pub fn new(service: S, timeout: Duration) -> Self { Self { inner: service, timeout } } diff --git a/tower-http/src/timeout/service.rs b/tower-http/src/timeout/service.rs index 448abd13..472e9c20 100644 --- a/tower-http/src/timeout/service.rs +++ b/tower-http/src/timeout/service.rs @@ -60,7 +60,7 @@ pub struct TimeoutLayer { } impl TimeoutLayer { - /// Create a new [`TimeoutLayer`]. + /// Creates a new [`TimeoutLayer`]. pub fn new(timeout: Duration) -> Self { TimeoutLayer { timeout } } @@ -87,7 +87,7 @@ pub struct Timeout { } impl Timeout { - /// Create a new [`Timeout`]. + /// Creates a new [`Timeout`]. pub fn new(inner: S, timeout: Duration) -> Self { Self { inner, timeout } } From ffa590b2b4201b9393a68d4cd54dd7699f7b72f4 Mon Sep 17 00:00:00 2001 From: Daniele Ahmed Date: Tue, 11 Oct 2022 19:01:46 +0200 Subject: [PATCH 04/16] Fix comments Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/body.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs index fc3fdfef..bea35178 100644 --- a/tower-http/src/timeout/body.rs +++ b/tower-http/src/timeout/body.rs @@ -3,12 +3,12 @@ //! Bodies must produce data at most within the specified timeout. //! If they are inactive, an error will be generated. //! -//! # Differences from `tower_http::timeout::service::Timeout` +//! # Differences from [`tower_http::timeout::service::Timeout`] //! //! [`tower_http::timeout::service::Timeout`] applies a timeout on the full request. //! That timeout is not reset when bytes are handled, whether the request is active or not. //! -//! This middleware will return a `TimeoutError`. +//! This middleware will return a [`TimeoutError`]. //! //! # Example //! @@ -44,7 +44,7 @@ use tower_layer::Layer; use tower_service::Service; pin_project! { - /// Wrapper around a `http_body::Body` to time out if data is not ready within the specified duration. + /// Wrapper around a [`http_body::Body`] to time out if data is not ready within the specified duration. pub struct TimeoutBody { timeout: Duration, #[pin] @@ -158,7 +158,7 @@ impl Layer for RequestBodyTimeoutLayer } } -/// Apply a TimeoutBody to the request body. +/// Applies a TimeoutBody to the request body. #[derive(Clone, Debug)] pub struct RequestBodyTimeout { inner: S, @@ -171,7 +171,7 @@ impl RequestBodyTimeout { Self { inner: service, timeout } } - /// Returns a new [`Layer`] that wraps services with a `RequestBodyTimeoutLayer` middleware. + /// Returns a new [`Layer`] that wraps services with a [`RequestBodyTimeoutLayer`] middleware. /// /// [`Layer`]: tower_layer::Layer pub fn layer(timeout: Duration) -> RequestBodyTimeoutLayer { @@ -200,7 +200,7 @@ where } } -/// Apply a TimeoutBody to the response body. +/// Applies a TimeoutBody to the response body. #[derive(Clone)] pub struct ResponseBodyTimeoutLayer { timeout: Duration, @@ -234,7 +234,7 @@ impl ResponseBodyTimeout { Self { inner: service, timeout } } - /// Returns a new [`Layer`] that wraps services with a `ResponseBodyTimeoutLayer` middleware. + /// Returns a new [`Layer`] that wraps services with a [`ResponseBodyTimeoutLayer`] middleware. /// /// [`Layer`]: tower_layer::Layer pub fn layer(timeout: Duration) -> ResponseBodyTimeoutLayer { From f9466ede03e8207178198504105398bf3d6c0289 Mon Sep 17 00:00:00 2001 From: Daniele Ahmed Date: Tue, 11 Oct 2022 19:03:37 +0200 Subject: [PATCH 05/16] Use .into() instead of as for casting errors Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/body.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs index bea35178..098b4448 100644 --- a/tower-http/src/timeout/body.rs +++ b/tower-http/src/timeout/body.rs @@ -102,7 +102,7 @@ where if let Poll::Ready(_) = this.sleep.as_pin_mut().unwrap().poll(cx) { return Poll::Ready(Some(Err(Box::new(TimeoutError)))) } - Poll::Ready(data.transpose().map_err(|_| Box::new(TimeoutError) as Self::Error).transpose()) + Poll::Ready(data.transpose().map_err(|_| Box::new(TimeoutError).into()).transpose()) } Poll::Pending => Poll::Pending } @@ -120,7 +120,7 @@ where Poll::Ready(()) => return Poll::Ready(Err(Box::new(TimeoutError))), } - this.body.poll_trailers(cx).map_err(|_| Box::new(TimeoutError) as Self::Error) + this.body.poll_trailers(cx).map_err(|_| Box::new(TimeoutError).into()) } } From 83cacf9b8f19df80e6064bcf93a188fce52a2119 Mon Sep 17 00:00:00 2001 From: Daniele Ahmed Date: Wed, 12 Oct 2022 10:54:56 +0200 Subject: [PATCH 06/16] Address comments Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/body.rs | 96 +++++++++++++++++++++------------- 1 file changed, 61 insertions(+), 35 deletions(-) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs index 098b4448..7f9a1a7c 100644 --- a/tower-http/src/timeout/body.rs +++ b/tower-http/src/timeout/body.rs @@ -3,9 +3,9 @@ //! Bodies must produce data at most within the specified timeout. //! If they are inactive, an error will be generated. //! -//! # Differences from [`tower_http::timeout::service::Timeout`] +//! # Differences from `tower_http::timeout::service::Timeout` //! -//! [`tower_http::timeout::service::Timeout`] applies a timeout on the full request. +//! `tower_http::timeout::service::Timeout` applies a timeout on the full request. //! That timeout is not reset when bytes are handled, whether the request is active or not. //! //! This middleware will return a [`TimeoutError`]. @@ -17,29 +17,33 @@ //! use hyper::Body; //! use std::time::Duration; //! use tower::ServiceBuilder; -//! use tower_http::timeout::body::RequestTimeoutBodyLayer; +//! use tower_http::timeout::body::RequestBodyTimeoutLayer; //! -//! async fn handle(_: Request) -> Result, Infallible> { +//! async fn handle(_: Request) -> Result, std::convert::Infallible> { //! // ... -//! # Ok(Response::new(Body::empty())) +//! # todo!() //! } //! //! # #[tokio::main] //! # async fn main() -> Result<(), Box> { //! let svc = ServiceBuilder::new() //! // Timeout bodies after 30 seconds of inactivity -//! .layer(RequestTimeoutBodyLayer::new(Duration::from_secs(30))) +//! .layer(RequestBodyTimeoutLayer::new(Duration::from_secs(30))) //! .service_fn(handle); //! # Ok(()) //! # } //! ``` -use std::{time::Duration, task::{Context, Poll}, pin::Pin}; -use http_body::Body; use futures_core::Future; -use pin_project_lite::pin_project; -use tokio::time::{Sleep, sleep}; use http::{Request, Response}; +use http_body::Body; +use pin_project_lite::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; +use tokio::time::{sleep, Sleep}; use tower_layer::Layer; use tower_service::Service; @@ -77,7 +81,7 @@ where cx: &mut Context<'_>, ) -> Poll>> { let mut this = self.project(); - + // Start the `Sleep` if not active. let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() { some @@ -100,11 +104,15 @@ where // ...then `poll` it to get awoken. if let Poll::Ready(_) = this.sleep.as_pin_mut().unwrap().poll(cx) { - return Poll::Ready(Some(Err(Box::new(TimeoutError)))) + return Poll::Ready(Some(Err(Box::new(TimeoutError)))); } - Poll::Ready(data.transpose().map_err(|_| Box::new(TimeoutError).into()).transpose()) + Poll::Ready( + data.transpose() + .map_err(|_| Box::new(TimeoutError).into()) + .transpose(), + ) } - Poll::Pending => Poll::Pending + Poll::Pending => Poll::Pending, } } @@ -115,28 +123,35 @@ where let this = self.project(); // Error if the timeout has expired. - match this.sleep.as_pin_mut().expect("poll_data was not called").poll(cx) { + match this + .sleep + .as_pin_mut() + .expect("poll_data was not called") + .poll(cx) + { Poll::Pending => (), Poll::Ready(()) => return Poll::Ready(Err(Box::new(TimeoutError))), } - this.body.poll_trailers(cx).map_err(|_| Box::new(TimeoutError).into()) + this.body + .poll_trailers(cx) + .map_err(|_| Box::new(TimeoutError).into()) } } /// Error for [`TimeoutBody`]. #[derive(Debug)] -pub struct TimeoutError; +struct TimeoutError; impl std::error::Error for TimeoutError {} impl std::fmt::Display for TimeoutError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "TimeoutError") + write!(f, "data was not received within the designated timeout") } } -/// Applies a TimeoutBody to the request body. +/// Applies a [`TimeoutBody`] to the request body. #[derive(Clone, Debug)] pub struct RequestBodyTimeoutLayer { timeout: Duration, @@ -149,8 +164,7 @@ impl RequestBodyTimeoutLayer { } } -impl Layer for RequestBodyTimeoutLayer -{ +impl Layer for RequestBodyTimeoutLayer { type Service = RequestBodyTimeout; fn layer(&self, inner: S) -> Self::Service { @@ -158,7 +172,7 @@ impl Layer for RequestBodyTimeoutLayer } } -/// Applies a TimeoutBody to the request body. +/// Applies a [`TimeoutBody`] to the request body. #[derive(Clone, Debug)] pub struct RequestBodyTimeout { inner: S, @@ -168,7 +182,10 @@ pub struct RequestBodyTimeout { impl RequestBodyTimeout { /// Creates a new [`RequestBodyTimeout`]. pub fn new(service: S, timeout: Duration) -> Self { - Self { inner: service, timeout } + Self { + inner: service, + timeout, + } } /// Returns a new [`Layer`] that wraps services with a [`RequestBodyTimeoutLayer`] middleware. @@ -200,7 +217,7 @@ where } } -/// Applies a TimeoutBody to the response body. +/// Applies a [`TimeoutBody`] to the response body. #[derive(Clone)] pub struct ResponseBodyTimeoutLayer { timeout: Duration, @@ -221,7 +238,7 @@ impl Layer for ResponseBodyTimeoutLayer { } } -/// Applies a TimeoutBody to the response body. +/// Applies a [`TimeoutBody`] to the response body. #[derive(Clone)] pub struct ResponseBodyTimeout { inner: S, @@ -231,7 +248,10 @@ pub struct ResponseBodyTimeout { impl ResponseBodyTimeout { /// Creates a new [`ResponseBodyTimeout`]. pub fn new(service: S, timeout: Duration) -> Self { - Self { inner: service, timeout } + Self { + inner: service, + timeout, + } } /// Returns a new [`Layer`] that wraps services with a [`ResponseBodyTimeoutLayer`] middleware. @@ -258,7 +278,10 @@ where } fn call(&mut self, req: Request) -> Self::Future { - ResponseBodyTimeoutFuture { inner: self.inner.call(req), timeout: self.timeout } + ResponseBodyTimeoutFuture { + inner: self.inner.call(req), + timeout: self.timeout, + } } } @@ -274,7 +297,7 @@ pin_project! { use futures_core::ready; impl Future for ResponseBodyTimeoutFuture where - Fut: Future, E>> + Fut: Future, E>>, { type Output = Result>, E>; @@ -286,7 +309,6 @@ where } } - #[cfg(test)] mod tests { use super::*; @@ -308,9 +330,9 @@ mod tests { type Error = MockError; fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { let this = self.project(); this.sleep.poll(cx).map(|_| Some(Ok(vec![].into()))) } @@ -328,18 +350,22 @@ mod tests { let mock_sleep = Duration::from_secs(1); let timeout_sleep = Duration::from_secs(2); - let mock_body = MockBody { sleep: sleep(mock_sleep) }; + let mock_body = MockBody { + sleep: sleep(mock_sleep), + }; let timeout_body = TimeoutBody::new(timeout_sleep, mock_body); assert!(timeout_body.boxed().data().await.unwrap().is_ok()); } - + #[tokio::test] async fn test_body_unavailable_within_timeout_error() { let mock_sleep = Duration::from_secs(2); let timeout_sleep = Duration::from_secs(1); - let mock_body = MockBody { sleep: sleep(mock_sleep) }; + let mock_body = MockBody { + sleep: sleep(mock_sleep), + }; let timeout_body = TimeoutBody::new(timeout_sleep, mock_body); assert!(timeout_body.boxed().data().await.unwrap().is_err()); From 969c3cbfca38f1ede208cb47165317cf21adaf4d Mon Sep 17 00:00:00 2001 From: Daniele Ahmed Date: Fri, 21 Oct 2022 17:05:51 +0200 Subject: [PATCH 07/16] Make sleep future lazy for body poll Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/body.rs | 74 +++++++++++++++++----------------- 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs index 7f9a1a7c..4f1a286a 100644 --- a/tower-http/src/timeout/body.rs +++ b/tower-http/src/timeout/body.rs @@ -34,10 +34,11 @@ //! # } //! ``` -use futures_core::Future; +use futures_core::{ready, Future}; use http::{Request, Response}; use http_body::Body; use pin_project_lite::pin_project; +use tower::BoxError; use std::{ pin::Pin, task::{Context, Poll}, @@ -72,6 +73,7 @@ impl TimeoutBody { impl Body for TimeoutBody where B: Body, + B::Error: Into, { type Data = B::Data; type Error = Box; @@ -91,57 +93,49 @@ where }; // Error if the timeout has expired. - match sleep_pinned.poll(cx) { - Poll::Pending => (), - Poll::Ready(()) => return Poll::Ready(Some(Err(Box::new(TimeoutError)))), + if let Poll::Ready(()) = sleep_pinned.poll(cx) { + return Poll::Ready(Some(Err(Box::new(TimeoutError(()))))) } // Check for body data. - match this.body.poll_data(cx) { - Poll::Ready(data) => { - // Some data is ready. Reset the `Sleep`... - this.sleep.set(Some(sleep(*this.timeout))); - - // ...then `poll` it to get awoken. - if let Poll::Ready(_) = this.sleep.as_pin_mut().unwrap().poll(cx) { - return Poll::Ready(Some(Err(Box::new(TimeoutError)))); - } - Poll::Ready( - data.transpose() - .map_err(|_| Box::new(TimeoutError).into()) - .transpose(), - ) - } - Poll::Pending => Poll::Pending, - } + let data = ready!(this.body.poll_data(cx)); + // Some data is ready. Reset the `Sleep`... + this.sleep.set(None); + + Poll::Ready( + data.transpose() + .map_err(Into::into) + .transpose(), + ) } fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>> { - let this = self.project(); + let mut this = self.project(); + + let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() { + some + } else { + this.sleep.set(Some(sleep(*this.timeout))); + this.sleep.as_mut().as_pin_mut().unwrap() + }; // Error if the timeout has expired. - match this - .sleep - .as_pin_mut() - .expect("poll_data was not called") - .poll(cx) - { - Poll::Pending => (), - Poll::Ready(()) => return Poll::Ready(Err(Box::new(TimeoutError))), + if let Poll::Ready(()) = sleep_pinned.poll(cx) { + return Poll::Ready(Err(Box::new(TimeoutError(())))) } this.body .poll_trailers(cx) - .map_err(|_| Box::new(TimeoutError).into()) + .map_err(Into::into) } } /// Error for [`TimeoutBody`]. #[derive(Debug)] -struct TimeoutError; +pub struct TimeoutError(()); impl std::error::Error for TimeoutError {} @@ -198,9 +192,9 @@ impl RequestBodyTimeout { define_inner_service_accessors!(); } -impl Service> for RequestBodyTimeout +impl Service> for RequestBodyTimeout where - S: Service>, Response = Response>, + S: Service>>, S::Error: Into>, { type Response = S::Response; @@ -294,7 +288,6 @@ pin_project! { } } -use futures_core::ready; impl Future for ResponseBodyTimeoutFuture where Fut: Future, E>>, @@ -304,7 +297,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let timeout = self.timeout; let this = self.project(); - let res = ready!(this.inner.poll(cx)?); + let res = ready!(this.inner.poll(cx))?; Poll::Ready(Ok(res.map(|body| TimeoutBody::new(timeout, body)))) } } @@ -315,9 +308,18 @@ mod tests { use bytes::Bytes; use pin_project_lite::pin_project; + use std::{error::Error, fmt::Display}; + #[derive(Debug)] struct MockError; + impl Error for MockError {} + impl Display for MockError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + todo!() + } + } + pin_project! { struct MockBody { #[pin] From d3e5df75b83e313e7720d5827815dcb56ce5108b Mon Sep 17 00:00:00 2001 From: Daniele Ahmed Date: Mon, 24 Oct 2022 10:53:23 +0200 Subject: [PATCH 08/16] Style Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/body.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs index 4f1a286a..c10832a0 100644 --- a/tower-http/src/timeout/body.rs +++ b/tower-http/src/timeout/body.rs @@ -38,13 +38,13 @@ use futures_core::{ready, Future}; use http::{Request, Response}; use http_body::Body; use pin_project_lite::pin_project; -use tower::BoxError; use std::{ pin::Pin, task::{Context, Poll}, time::Duration, }; use tokio::time::{sleep, Sleep}; +use tower::BoxError; use tower_layer::Layer; use tower_service::Service; @@ -94,7 +94,7 @@ where // Error if the timeout has expired. if let Poll::Ready(()) = sleep_pinned.poll(cx) { - return Poll::Ready(Some(Err(Box::new(TimeoutError(()))))) + return Poll::Ready(Some(Err(Box::new(TimeoutError(()))))); } // Check for body data. @@ -102,11 +102,7 @@ where // Some data is ready. Reset the `Sleep`... this.sleep.set(None); - Poll::Ready( - data.transpose() - .map_err(Into::into) - .transpose(), - ) + Poll::Ready(data.transpose().map_err(Into::into).transpose()) } fn poll_trailers( @@ -124,12 +120,10 @@ where // Error if the timeout has expired. if let Poll::Ready(()) = sleep_pinned.poll(cx) { - return Poll::Ready(Err(Box::new(TimeoutError(())))) + return Poll::Ready(Err(Box::new(TimeoutError(())))); } - this.body - .poll_trailers(cx) - .map_err(Into::into) + this.body.poll_trailers(cx).map_err(Into::into) } } From 77539787931dab8ed8073f1e6bc0d21a4a46150e Mon Sep 17 00:00:00 2001 From: Daniele Ahmed Date: Mon, 24 Oct 2022 11:19:40 +0200 Subject: [PATCH 09/16] Update BoxError reference Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/body.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs index c10832a0..e6f7315d 100644 --- a/tower-http/src/timeout/body.rs +++ b/tower-http/src/timeout/body.rs @@ -34,6 +34,7 @@ //! # } //! ``` +use crate::BoxError; use futures_core::{ready, Future}; use http::{Request, Response}; use http_body::Body; @@ -44,7 +45,6 @@ use std::{ time::Duration, }; use tokio::time::{sleep, Sleep}; -use tower::BoxError; use tower_layer::Layer; use tower_service::Service; From aa7b313e8a50ef4d7aa2fb33553ce6f71f62b0e2 Mon Sep 17 00:00:00 2001 From: 82marbag <69267416+82marbag@users.noreply.github.com> Date: Wed, 26 Oct 2022 12:24:57 +0200 Subject: [PATCH 10/16] Use two sleep futures temporarily Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/body.rs | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs index e6f7315d..0047f786 100644 --- a/tower-http/src/timeout/body.rs +++ b/tower-http/src/timeout/body.rs @@ -52,8 +52,13 @@ pin_project! { /// Wrapper around a [`http_body::Body`] to time out if data is not ready within the specified duration. pub struct TimeoutBody { timeout: Duration, + // In http-body 1.0, `poll_*` will be merged into `poll_frame`. + // Merge the two `sleep_data` and `sleep_trailers` into one `sleep`. + // See: https://github.com/tower-rs/tower-http/pull/303#discussion_r1004834958 #[pin] - sleep: Option, + sleep_data: Option, + #[pin] + sleep_trailers: Option, #[pin] body: B, } @@ -64,7 +69,8 @@ impl TimeoutBody { pub fn new(timeout: Duration, body: B) -> Self { TimeoutBody { timeout, - sleep: None, + sleep_data: None, + sleep_trailers: None, body, } } @@ -85,11 +91,11 @@ where let mut this = self.project(); // Start the `Sleep` if not active. - let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() { + let sleep_pinned = if let Some(some) = this.sleep_data.as_mut().as_pin_mut() { some } else { - this.sleep.set(Some(sleep(*this.timeout))); - this.sleep.as_mut().as_pin_mut().unwrap() + this.sleep_data.set(Some(sleep(*this.timeout))); + this.sleep_data.as_mut().as_pin_mut().unwrap() }; // Error if the timeout has expired. @@ -100,7 +106,7 @@ where // Check for body data. let data = ready!(this.body.poll_data(cx)); // Some data is ready. Reset the `Sleep`... - this.sleep.set(None); + this.sleep_data.set(None); Poll::Ready(data.transpose().map_err(Into::into).transpose()) } @@ -111,11 +117,15 @@ where ) -> Poll, Self::Error>> { let mut this = self.project(); - let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() { + // In http-body 1.0, `poll_*` will be merged into `poll_frame`. + // Merge the two `sleep_data` and `sleep_trailers` into one `sleep`. + // See: https://github.com/tower-rs/tower-http/pull/303#discussion_r1004834958 + + let sleep_pinned = if let Some(some) = this.sleep_trailers.as_mut().as_pin_mut() { some } else { - this.sleep.set(Some(sleep(*this.timeout))); - this.sleep.as_mut().as_pin_mut().unwrap() + this.sleep_trailers.set(Some(sleep(*this.timeout))); + this.sleep_trailers.as_mut().as_pin_mut().unwrap() }; // Error if the timeout has expired. From 992827a3a0e290959bf02fa61968a79e4a9d51d0 Mon Sep 17 00:00:00 2001 From: 82marbag <69267416+82marbag@users.noreply.github.com> Date: Thu, 27 Oct 2022 10:57:09 +0200 Subject: [PATCH 11/16] Rename unused variable Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/body.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs index 0047f786..ff8c0f0c 100644 --- a/tower-http/src/timeout/body.rs +++ b/tower-http/src/timeout/body.rs @@ -319,7 +319,7 @@ mod tests { impl Error for MockError {} impl Display for MockError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { todo!() } } From 3182815ee58af2627d0c42049494d34a445ace3b Mon Sep 17 00:00:00 2001 From: 82marbag <69267416+82marbag@users.noreply.github.com> Date: Fri, 28 Oct 2022 11:49:35 +0200 Subject: [PATCH 12/16] Improve documentation Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/body.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs index ff8c0f0c..49cf5738 100644 --- a/tower-http/src/timeout/body.rs +++ b/tower-http/src/timeout/body.rs @@ -1,12 +1,13 @@ -//! Middleware that applies a timeout to bodies. +//! Middleware that applies a timeout to request and response bodies. //! //! Bodies must produce data at most within the specified timeout. -//! If they are inactive, an error will be generated. +//! If the body does not produce a requested data frame within the timeout period, it will return an error. //! -//! # Differences from `tower_http::timeout::service::Timeout` +//! # Differences from [`tower_http::timeout::service::Timeout`] //! -//! `tower_http::timeout::service::Timeout` applies a timeout on the full request. +//! [`tower_http::timeout::service::Timeout`] applies a timeout to the request future, not body. //! That timeout is not reset when bytes are handled, whether the request is active or not. +//! Bodies are handled asynchronously outside of the tower stack's future and thus needs an additional timeout. //! //! This middleware will return a [`TimeoutError`]. //! From ae0cd936303aef9e5ac266105924e409e54b06df Mon Sep 17 00:00:00 2001 From: 82marbag <69267416+82marbag@users.noreply.github.com> Date: Fri, 28 Oct 2022 17:38:37 +0200 Subject: [PATCH 13/16] Refactor modules Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/body.rs | 165 +----------------------------- tower-http/src/timeout/mod.rs | 13 ++- tower-http/src/timeout/service.rs | 161 +++++++++++++++++++++++++++++ 3 files changed, 174 insertions(+), 165 deletions(-) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs index 49cf5738..bb5265f4 100644 --- a/tower-http/src/timeout/body.rs +++ b/tower-http/src/timeout/body.rs @@ -3,9 +3,9 @@ //! Bodies must produce data at most within the specified timeout. //! If the body does not produce a requested data frame within the timeout period, it will return an error. //! -//! # Differences from [`tower_http::timeout::service::Timeout`] +//! # Differences from [`tower_http::timeout::Timeout`] //! -//! [`tower_http::timeout::service::Timeout`] applies a timeout to the request future, not body. +//! [`tower_http::timeout::Timeout`] applies a timeout to the request future, not body. //! That timeout is not reset when bytes are handled, whether the request is active or not. //! Bodies are handled asynchronously outside of the tower stack's future and thus needs an additional timeout. //! @@ -37,7 +37,6 @@ use crate::BoxError; use futures_core::{ready, Future}; -use http::{Request, Response}; use http_body::Body; use pin_project_lite::pin_project; use std::{ @@ -46,8 +45,6 @@ use std::{ time::Duration, }; use tokio::time::{sleep, Sleep}; -use tower_layer::Layer; -use tower_service::Service; pin_project! { /// Wrapper around a [`http_body::Body`] to time out if data is not ready within the specified duration. @@ -149,164 +146,6 @@ impl std::fmt::Display for TimeoutError { write!(f, "data was not received within the designated timeout") } } - -/// Applies a [`TimeoutBody`] to the request body. -#[derive(Clone, Debug)] -pub struct RequestBodyTimeoutLayer { - timeout: Duration, -} - -impl RequestBodyTimeoutLayer { - /// Creates a new [`RequestBodyTimeoutLayer`]. - pub fn new(timeout: Duration) -> Self { - Self { timeout } - } -} - -impl Layer for RequestBodyTimeoutLayer { - type Service = RequestBodyTimeout; - - fn layer(&self, inner: S) -> Self::Service { - RequestBodyTimeout::new(inner, self.timeout) - } -} - -/// Applies a [`TimeoutBody`] to the request body. -#[derive(Clone, Debug)] -pub struct RequestBodyTimeout { - inner: S, - timeout: Duration, -} - -impl RequestBodyTimeout { - /// Creates a new [`RequestBodyTimeout`]. - pub fn new(service: S, timeout: Duration) -> Self { - Self { - inner: service, - timeout, - } - } - - /// Returns a new [`Layer`] that wraps services with a [`RequestBodyTimeoutLayer`] middleware. - /// - /// [`Layer`]: tower_layer::Layer - pub fn layer(timeout: Duration) -> RequestBodyTimeoutLayer { - RequestBodyTimeoutLayer::new(timeout) - } - - define_inner_service_accessors!(); -} - -impl Service> for RequestBodyTimeout -where - S: Service>>, - S::Error: Into>, -{ - type Response = S::Response; - type Error = S::Error; - type Future = S::Future; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, req: Request) -> Self::Future { - let req = req.map(|body| TimeoutBody::new(self.timeout, body)); - self.inner.call(req) - } -} - -/// Applies a [`TimeoutBody`] to the response body. -#[derive(Clone)] -pub struct ResponseBodyTimeoutLayer { - timeout: Duration, -} - -impl ResponseBodyTimeoutLayer { - /// Creates a new [`ResponseBodyTimeoutLayer`]. - pub fn new(timeout: Duration) -> Self { - Self { timeout } - } -} - -impl Layer for ResponseBodyTimeoutLayer { - type Service = ResponseBodyTimeout; - - fn layer(&self, inner: S) -> Self::Service { - ResponseBodyTimeout::new(inner, self.timeout) - } -} - -/// Applies a [`TimeoutBody`] to the response body. -#[derive(Clone)] -pub struct ResponseBodyTimeout { - inner: S, - timeout: Duration, -} - -impl ResponseBodyTimeout { - /// Creates a new [`ResponseBodyTimeout`]. - pub fn new(service: S, timeout: Duration) -> Self { - Self { - inner: service, - timeout, - } - } - - /// Returns a new [`Layer`] that wraps services with a [`ResponseBodyTimeoutLayer`] middleware. - /// - /// [`Layer`]: tower_layer::Layer - pub fn layer(timeout: Duration) -> ResponseBodyTimeoutLayer { - ResponseBodyTimeoutLayer::new(timeout) - } - - define_inner_service_accessors!(); -} - -impl Service> for ResponseBodyTimeout -where - S: Service, Response = Response>, - S::Error: Into>, -{ - type Response = Response>; - type Error = S::Error; - type Future = ResponseBodyTimeoutFuture; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, req: Request) -> Self::Future { - ResponseBodyTimeoutFuture { - inner: self.inner.call(req), - timeout: self.timeout, - } - } -} - -pin_project! { - /// Response future for [`ResponseBodyTimeout`]. - pub struct ResponseBodyTimeoutFuture { - #[pin] - inner: Fut, - timeout: Duration, - } -} - -impl Future for ResponseBodyTimeoutFuture -where - Fut: Future, E>>, -{ - type Output = Result>, E>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let timeout = self.timeout; - let this = self.project(); - let res = ready!(this.inner.poll(cx))?; - Poll::Ready(Ok(res.map(|body| TimeoutBody::new(timeout, body)))) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/tower-http/src/timeout/mod.rs b/tower-http/src/timeout/mod.rs index 49d7b07f..3cf6a528 100644 --- a/tower-http/src/timeout/mod.rs +++ b/tower-http/src/timeout/mod.rs @@ -1,4 +1,13 @@ //! Middleware for setting timeouts on requests and responses. -pub mod body; -pub mod service; +mod body; +mod service; + +pub use body::TimeoutBody; +pub use body::TimeoutError; +pub use service::RequestBodyTimeout; +pub use service::RequestBodyTimeoutLayer; +pub use service::ResponseBodyTimeout; +pub use service::ResponseBodyTimeoutLayer; +pub use service::Timeout; +pub use service::TimeoutLayer; diff --git a/tower-http/src/timeout/service.rs b/tower-http/src/timeout/service.rs index 472e9c20..e03782fd 100644 --- a/tower-http/src/timeout/service.rs +++ b/tower-http/src/timeout/service.rs @@ -39,6 +39,8 @@ //! //! [`Infallible`]: std::convert::Infallible +use crate::timeout::body::TimeoutBody; +use futures_core::ready; use http::{Request, Response, StatusCode}; use pin_project_lite::pin_project; use std::{ @@ -154,3 +156,162 @@ where this.inner.poll(cx) } } + +/// Applies a [`TimeoutBody`] to the request body. +#[derive(Clone, Debug)] +pub struct RequestBodyTimeoutLayer { + timeout: Duration, +} + +impl RequestBodyTimeoutLayer { + /// Creates a new [`RequestBodyTimeoutLayer`]. + pub fn new(timeout: Duration) -> Self { + Self { timeout } + } +} + +impl Layer for RequestBodyTimeoutLayer { + type Service = RequestBodyTimeout; + + fn layer(&self, inner: S) -> Self::Service { + RequestBodyTimeout::new(inner, self.timeout) + } +} + +/// Applies a [`TimeoutBody`] to the request body. +#[derive(Clone, Debug)] +pub struct RequestBodyTimeout { + inner: S, + timeout: Duration, +} + +impl RequestBodyTimeout { + /// Creates a new [`RequestBodyTimeout`]. + pub fn new(service: S, timeout: Duration) -> Self { + Self { + inner: service, + timeout, + } + } + + /// Returns a new [`Layer`] that wraps services with a [`RequestBodyTimeoutLayer`] middleware. + /// + /// [`Layer`]: tower_layer::Layer + pub fn layer(timeout: Duration) -> RequestBodyTimeoutLayer { + RequestBodyTimeoutLayer::new(timeout) + } + + define_inner_service_accessors!(); +} + +impl Service> for RequestBodyTimeout +where + S: Service>>, + S::Error: Into>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let req = req.map(|body| TimeoutBody::new(self.timeout, body)); + self.inner.call(req) + } +} + +/// Applies a [`TimeoutBody`] to the response body. +#[derive(Clone)] +pub struct ResponseBodyTimeoutLayer { + timeout: Duration, +} + +impl ResponseBodyTimeoutLayer { + /// Creates a new [`ResponseBodyTimeoutLayer`]. + pub fn new(timeout: Duration) -> Self { + Self { timeout } + } +} + +impl Layer for ResponseBodyTimeoutLayer { + type Service = ResponseBodyTimeout; + + fn layer(&self, inner: S) -> Self::Service { + ResponseBodyTimeout::new(inner, self.timeout) + } +} + + +/// Applies a [`TimeoutBody`] to the response body. +#[derive(Clone)] +pub struct ResponseBodyTimeout { + inner: S, + timeout: Duration, +} + +impl Service> for ResponseBodyTimeout +where + S: Service, Response = Response>, + S::Error: Into>, +{ + type Response = Response>; + type Error = S::Error; + type Future = ResponseBodyTimeoutFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + ResponseBodyTimeoutFuture { + inner: self.inner.call(req), + timeout: self.timeout, + } + } +} + +impl ResponseBodyTimeout { + /// Creates a new [`ResponseBodyTimeout`]. + pub fn new(service: S, timeout: Duration) -> Self { + Self { + inner: service, + timeout, + } + } + + /// Returns a new [`Layer`] that wraps services with a [`ResponseBodyTimeoutLayer`] middleware. + /// + /// [`Layer`]: tower_layer::Layer + pub fn layer(timeout: Duration) -> ResponseBodyTimeoutLayer { + ResponseBodyTimeoutLayer::new(timeout) + } + + define_inner_service_accessors!(); +} + +pin_project! { + /// Response future for [`ResponseBodyTimeout`]. + pub struct ResponseBodyTimeoutFuture { + #[pin] + inner: Fut, + timeout: Duration, + } +} + +impl Future for ResponseBodyTimeoutFuture +where + Fut: Future, E>>, +{ + type Output = Result>, E>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let timeout = self.timeout; + let this = self.project(); + let res = ready!(this.inner.poll(cx))?; + Poll::Ready(Ok(res.map(|body| TimeoutBody::new(timeout, body)))) + } +} + From f476dca28dd2f21627ce0adbdfba2825f5bc5dd2 Mon Sep 17 00:00:00 2001 From: 82marbag <69267416+82marbag@users.noreply.github.com> Date: Fri, 28 Oct 2022 17:55:45 +0200 Subject: [PATCH 14/16] Fix doc style Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/body.rs | 6 +++--- tower-http/src/timeout/service.rs | 4 +--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/tower-http/src/timeout/body.rs b/tower-http/src/timeout/body.rs index bb5265f4..79712efd 100644 --- a/tower-http/src/timeout/body.rs +++ b/tower-http/src/timeout/body.rs @@ -3,9 +3,9 @@ //! Bodies must produce data at most within the specified timeout. //! If the body does not produce a requested data frame within the timeout period, it will return an error. //! -//! # Differences from [`tower_http::timeout::Timeout`] +//! # Differences from [`crate::timeout::Timeout`] //! -//! [`tower_http::timeout::Timeout`] applies a timeout to the request future, not body. +//! [`crate::timeout::Timeout`] applies a timeout to the request future, not body. //! That timeout is not reset when bytes are handled, whether the request is active or not. //! Bodies are handled asynchronously outside of the tower stack's future and thus needs an additional timeout. //! @@ -18,7 +18,7 @@ //! use hyper::Body; //! use std::time::Duration; //! use tower::ServiceBuilder; -//! use tower_http::timeout::body::RequestBodyTimeoutLayer; +//! use tower_http::timeout::RequestBodyTimeoutLayer; //! //! async fn handle(_: Request) -> Result, std::convert::Infallible> { //! // ... diff --git a/tower-http/src/timeout/service.rs b/tower-http/src/timeout/service.rs index e03782fd..13347a34 100644 --- a/tower-http/src/timeout/service.rs +++ b/tower-http/src/timeout/service.rs @@ -20,7 +20,7 @@ //! use hyper::Body; //! use std::{convert::Infallible, time::Duration}; //! use tower::ServiceBuilder; -//! use tower_http::timeout::service::TimeoutLayer; +//! use tower_http::timeout::TimeoutLayer; //! //! async fn handle(_: Request) -> Result, Infallible> { //! // ... @@ -244,7 +244,6 @@ impl Layer for ResponseBodyTimeoutLayer { } } - /// Applies a [`TimeoutBody`] to the response body. #[derive(Clone)] pub struct ResponseBodyTimeout { @@ -314,4 +313,3 @@ where Poll::Ready(Ok(res.map(|body| TimeoutBody::new(timeout, body)))) } } - From 28dea10de635bd50ddfff608b9d34577930a0d43 Mon Sep 17 00:00:00 2001 From: 82marbag <69267416+82marbag@users.noreply.github.com> Date: Mon, 31 Oct 2022 10:20:03 +0100 Subject: [PATCH 15/16] Merge imports Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/mod.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/tower-http/src/timeout/mod.rs b/tower-http/src/timeout/mod.rs index 3cf6a528..2ce3599a 100644 --- a/tower-http/src/timeout/mod.rs +++ b/tower-http/src/timeout/mod.rs @@ -3,11 +3,5 @@ mod body; mod service; -pub use body::TimeoutBody; -pub use body::TimeoutError; -pub use service::RequestBodyTimeout; -pub use service::RequestBodyTimeoutLayer; -pub use service::ResponseBodyTimeout; -pub use service::ResponseBodyTimeoutLayer; -pub use service::Timeout; -pub use service::TimeoutLayer; +pub use body::{TimeoutBody, TimeoutError}; +pub use service::{RequestBodyTimeout, RequestBodyTimeoutLayer, ResponseBodyTimeout, ResponseBodyTimeoutLayer, Timeout, TimeoutLayer}; From b6a0b31132cb1358c580a321843bd773f232d2e0 Mon Sep 17 00:00:00 2001 From: 82marbag <69267416+82marbag@users.noreply.github.com> Date: Mon, 31 Oct 2022 16:02:15 +0100 Subject: [PATCH 16/16] Update style of imports Signed-off-by: Daniele Ahmed --- tower-http/src/timeout/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tower-http/src/timeout/mod.rs b/tower-http/src/timeout/mod.rs index 2ce3599a..4cbe476f 100644 --- a/tower-http/src/timeout/mod.rs +++ b/tower-http/src/timeout/mod.rs @@ -4,4 +4,7 @@ mod body; mod service; pub use body::{TimeoutBody, TimeoutError}; -pub use service::{RequestBodyTimeout, RequestBodyTimeoutLayer, ResponseBodyTimeout, ResponseBodyTimeoutLayer, Timeout, TimeoutLayer}; +pub use service::{ + RequestBodyTimeout, RequestBodyTimeoutLayer, ResponseBodyTimeout, ResponseBodyTimeoutLayer, + Timeout, TimeoutLayer, +};