Skip to content

Commit

Permalink
Allow Adapter to be .boxed() (#540)
Browse files Browse the repository at this point in the history
Allowing `Adapter` to be [`.boxed()`](https://docs.rs/tower/0.4.13/tower/trait.ServiceExt.html#method.boxed) enables easier composition of `tower::Service`s that may include additional middleware prior to `Adapter` converting the `LambdaEvent` to a `Request`.
  • Loading branch information
dcormier committed Oct 11, 2022
1 parent bab0235 commit 1e16502
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 10 deletions.
62 changes: 59 additions & 3 deletions lambda-http/src/lib.rs
Expand Up @@ -140,7 +140,7 @@ pub struct Adapter<'a, R, S> {
impl<'a, R, S, E> From<S> for Adapter<'a, R, S>
where
S: Service<Request, Response = R, Error = E>,
S::Future: 'a,
S::Future: Send + 'a,
R: IntoResponse,
{
fn from(service: S) -> Self {
Expand All @@ -154,7 +154,7 @@ where
impl<'a, R, S, E> Service<LambdaEvent<LambdaRequest>> for Adapter<'a, R, S>
where
S: Service<Request, Response = R, Error = E>,
S::Future: 'a,
S::Future: Send + 'a,
R: IntoResponse,
{
type Response = LambdaResponse;
Expand Down Expand Up @@ -182,9 +182,65 @@ where
pub async fn run<'a, R, S, E>(handler: S) -> Result<(), Error>
where
S: Service<Request, Response = R, Error = E>,
S::Future: 'a,
S::Future: Send + 'a,
R: IntoResponse,
E: std::fmt::Debug + std::fmt::Display,
{
lambda_runtime::run(Adapter::from(handler)).await
}

#[cfg(test)]
mod test_adapter {
use std::task::{Context, Poll};

use crate::{
http::{Response, StatusCode},
lambda_runtime::LambdaEvent,
request::LambdaRequest,
response::LambdaResponse,
tower::{util::BoxService, Service, ServiceBuilder, ServiceExt},
Adapter, Body, Request,
};

// A middleware that logs requests before forwarding them to another service
struct LogService<S> {
inner: S,
}

impl<S> Service<LambdaEvent<LambdaRequest>> for LogService<S>
where
S: Service<LambdaEvent<LambdaRequest>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, event: LambdaEvent<LambdaRequest>) -> Self::Future {
// Log the request
println!("Lambda event: {:#?}", event);

self.inner.call(event)
}
}

/// This tests that `Adapter` can be used in a `tower::Service` where the user
/// may require additional middleware between `lambda_runtime::run` and where
/// the `LambdaEvent` is converted into a `Request`.
#[test]
fn adapter_is_boxable() {
let _service: BoxService<LambdaEvent<LambdaRequest>, LambdaResponse, http::Error> = ServiceBuilder::new()
.layer_fn(|service| {
// This could be any middleware that logs, inspects, or manipulates
// the `LambdaEvent` before it's converted to a `Request` by `Adapter`.

LogService { inner: service }
})
.layer_fn(Adapter::from)
.service_fn(|_event: Request| async move { Response::builder().status(StatusCode::OK).body(Body::Empty) })
.boxed();
}
}
2 changes: 1 addition & 1 deletion lambda-http/src/request.rs
Expand Up @@ -61,7 +61,7 @@ impl LambdaRequest {
}

/// RequestFuture type
pub type RequestFuture<'a, R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + 'a>>;
pub type RequestFuture<'a, R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + Send + 'a>>;

/// Represents the origin from which the lambda was requested from.
#[doc(hidden)]
Expand Down
15 changes: 9 additions & 6 deletions lambda-http/src/response.rs
Expand Up @@ -129,7 +129,7 @@ pub trait IntoResponse {

impl<B> IntoResponse for Response<B>
where
B: ConvertBody + 'static,
B: ConvertBody + Send + 'static,
{
fn into_response(self) -> ResponseFuture {
let (parts, body) = self.into_parts();
Expand Down Expand Up @@ -180,15 +180,16 @@ impl IntoResponse for serde_json::Value {
}
}

pub type ResponseFuture = Pin<Box<dyn Future<Output = Response<Body>>>>;
pub type ResponseFuture = Pin<Box<dyn Future<Output = Response<Body>> + Send>>;

pub trait ConvertBody {
fn convert(self, parts: HeaderMap) -> BodyFuture;
}

impl<B> ConvertBody for B
where
B: HttpBody + Unpin + 'static,
B: HttpBody + Unpin + Send + 'static,
B::Data: Send,
B::Error: fmt::Debug,
{
fn convert(self, headers: HeaderMap) -> BodyFuture {
Expand Down Expand Up @@ -227,15 +228,17 @@ where

fn convert_to_binary<B>(body: B) -> BodyFuture
where
B: HttpBody + Unpin + 'static,
B: HttpBody + Unpin + Send + 'static,
B::Data: Send,
B::Error: fmt::Debug,
{
Box::pin(async move { Body::from(to_bytes(body).await.expect("unable to read bytes from body").to_vec()) })
}

fn convert_to_text<B>(body: B, content_type: &str) -> BodyFuture
where
B: HttpBody + Unpin + 'static,
B: HttpBody + Unpin + Send + 'static,
B::Data: Send,
B::Error: fmt::Debug,
{
let mime_type = content_type.parse::<Mime>();
Expand All @@ -260,7 +263,7 @@ where
})
}

pub type BodyFuture = Pin<Box<dyn Future<Output = Body>>>;
pub type BodyFuture = Pin<Box<dyn Future<Output = Body> + Send>>;

#[cfg(test)]
mod tests {
Expand Down

0 comments on commit 1e16502

Please sign in to comment.