Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to declare type S for into_service ? #857

Closed
takkuumi opened this issue Dec 3, 2021 · 5 comments
Closed

How to declare type S for into_service ? #857

takkuumi opened this issue Dec 3, 2021 · 5 comments

Comments

@takkuumi
Copy link

takkuumi commented Dec 3, 2021

let grpc_service: S = transport::Server::builder()
    .add_service(service1)
    .add_service(service2)
    .add_service(service3)
    .into_service();

How to declare type S?

I had try tonic::server::RouterService<A>, but i can not find generic type in multiple services mode for type A.

@davidpdrsn
Copy link
Member

This is currently not very easy since tonic's router uses quite a few generics internally. #830 will make it easier though.

What is you use case?

@takkuumi
Copy link
Author

takkuumi commented Dec 3, 2021

@davidpdrsn i am use hyper tower tower_http and tonic before, now i want to add axum to my project. i want to make two creates, one for http api and an other for rpc, then i use them for example code:

use std::{convert::Infallible, net::SocketAddr, pin::Pin};

use bootlib::hybrid;
use futures::Future;
use hyper::Server;
use tonic::transport::{self, server::RouterService};

pub type ResponseResult = bootlib::Result<hyper::Body>;
pub type ResponseFuture = Pin<Box<dyn Future<Output = ResponseResult> + Send + 'static>>;

#[tokio::main]
async fn main() {
  let addr = SocketAddr::from(([0, 0, 0, 0], 8999));

  let axum_make_service: axum::routing::IntoMakeService<axum::Router> = httpapi::make_svc();

  
  let grpc_service = rpc::make_svc();

  let hybrid_make_service = hybrid(axum_make_service, grpc_service);

  let server = Server::bind(&addr).serve(hybrid_make_service);

  if let Err(e) = server.await {
    eprintln!("server error: {}", e);
  }
}

the hybrid.rs code:

use std::{future::Future, pin::Pin, task::Poll};

use hyper::{body::HttpBody, Body, HeaderMap, Request, Response};
use pin_project::pin_project;
use tower::Service;

use super::GenericError;

pub fn hybrid<MakeWeb, Grpc>(make_web: MakeWeb, grpc: Grpc) -> HybridMakeService<MakeWeb, Grpc> {
  HybridMakeService { make_web, grpc }
}

fn map_option_err<T, U: Into<GenericError>>(
  err: Option<Result<T, U>>,
) -> Option<Result<T, GenericError>> {
  err.map(|e| e.map_err(Into::into))
}

pub struct HybridMakeService<MakeWeb, Grpc> {
  make_web: MakeWeb,
  grpc:     Grpc,
}

impl<ConnInfo, MakeWeb, Grpc> Service<ConnInfo> for HybridMakeService<MakeWeb, Grpc>
where
  MakeWeb: Service<ConnInfo>,
  Grpc: Clone,
{
  type Error = MakeWeb::Error;
  type Future = HybridMakeServiceFuture<MakeWeb::Future, Grpc>;
  type Response = HybridService<MakeWeb::Response, Grpc>;

  fn poll_ready(
    &mut self,
    cx: &mut std::task::Context,
  ) -> std::task::Poll<Result<(), Self::Error>> {
    self.make_web.poll_ready(cx)
  }

  fn call(&mut self, conn_info: ConnInfo) -> Self::Future {
    HybridMakeServiceFuture {
      web_future: self.make_web.call(conn_info),
      grpc:       Some(self.grpc.clone()),
    }
  }
}

#[pin_project]
pub struct HybridMakeServiceFuture<WebFuture, Grpc> {
  #[pin]
  web_future: WebFuture,
  grpc:       Option<Grpc>,
}

impl<WebFuture, Web, WebError, Grpc> Future for HybridMakeServiceFuture<WebFuture, Grpc>
where
  WebFuture: Future<Output = Result<Web, WebError>>,
{
  type Output = Result<HybridService<Web, Grpc>, WebError>;

  fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
    let this = self.project();
    match this.web_future.poll(cx) {
      Poll::Pending => Poll::Pending,
      Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
      Poll::Ready(Ok(web)) => Poll::Ready(Ok(HybridService {
        web,
        grpc: this.grpc.take().expect("Cannot poll twice!"),
      })),
    }
  }
}

pub struct HybridService<Web, Grpc> {
  web:  Web,
  grpc: Grpc,
}

impl<Web, Grpc, WebBody, GrpcBody> Service<Request<Body>> for HybridService<Web, Grpc>
where
  Web: Service<Request<Body>, Response = Response<WebBody>>,
  Grpc: Service<Request<Body>, Response = Response<GrpcBody>>,
  Web::Error: Into<GenericError>,
  Grpc::Error: Into<GenericError>,
{
  type Error = GenericError;
  type Future = HybridFuture<Web::Future, Grpc::Future>;
  type Response = Response<HybridBody<WebBody, GrpcBody>>;

  fn poll_ready(
    &mut self,
    cx: &mut std::task::Context<'_>,
  ) -> std::task::Poll<Result<(), Self::Error>> {
    match self.web.poll_ready(cx) {
      Poll::Ready(Ok(())) => match self.grpc.poll_ready(cx) {
        Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
        Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
        Poll::Pending => Poll::Pending,
      },
      Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
      Poll::Pending => Poll::Pending,
    }
  }

  fn call(&mut self, req: Request<Body>) -> Self::Future {
    if req.headers().get("content-type").map(|x| x.as_bytes()) == Some(b"application/grpc") {
      HybridFuture::Grpc(self.grpc.call(req))
    } else {
      HybridFuture::Web(self.web.call(req))
    }
  }
}

#[pin_project(project = HybridBodyProj)]
pub enum HybridBody<WebBody, GrpcBody> {
  Web(#[pin] WebBody),
  Grpc(#[pin] GrpcBody),
}

impl<WebBody, GrpcBody> HttpBody for HybridBody<WebBody, GrpcBody>
where
  WebBody: HttpBody + Send + Unpin,
  GrpcBody: HttpBody<Data = WebBody::Data> + Send + Unpin,
  WebBody::Error: std::error::Error + Send + Sync + 'static,
  GrpcBody::Error: std::error::Error + Send + Sync + 'static,
{
  type Data = WebBody::Data;
  type Error = GenericError;

  fn is_end_stream(&self) -> bool {
    match self {
      HybridBody::Web(b) => b.is_end_stream(),
      HybridBody::Grpc(b) => b.is_end_stream(),
    }
  }

  fn poll_data(
    self: Pin<&mut Self>,
    cx: &mut std::task::Context,
  ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
    match self.project() {
      HybridBodyProj::Web(b) => b.poll_data(cx).map(map_option_err),
      HybridBodyProj::Grpc(b) => b.poll_data(cx).map(map_option_err),
    }
  }

  fn poll_trailers(
    self: Pin<&mut Self>,
    cx: &mut std::task::Context,
  ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
    match self.project() {
      HybridBodyProj::Web(b) => b.poll_trailers(cx).map_err(Into::into),
      HybridBodyProj::Grpc(b) => b.poll_trailers(cx).map_err(Into::into),
    }
  }
}

#[pin_project(project = HybridFutureProj)]
pub enum HybridFuture<WebFuture, GrpcFuture> {
  Web(#[pin] WebFuture),
  Grpc(#[pin] GrpcFuture),
}

impl<WebFuture, GrpcFuture, WebBody, GrpcBody, WebError, GrpcError> Future
  for HybridFuture<WebFuture, GrpcFuture>
where
  WebFuture: Future<Output = Result<Response<WebBody>, WebError>>,
  GrpcFuture: Future<Output = Result<Response<GrpcBody>, GrpcError>>,
  WebError: Into<GenericError>,
  GrpcError: Into<GenericError>,
{
  type Output = Result<Response<HybridBody<WebBody, GrpcBody>>, GenericError>;

  fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
    match self.project() {
      HybridFutureProj::Web(a) => match a.poll(cx) {
        Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(HybridBody::Web))),
        Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
        Poll::Pending => Poll::Pending,
      },
      HybridFutureProj::Grpc(b) => match b.poll(cx) {
        Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(HybridBody::Grpc))),
        Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
        Poll::Pending => Poll::Pending,
      },
    }
  }
}

@davidpdrsn
Copy link
Member

I think using BoxCloneService from tower would make sense for this. It allows you to erase the type of a service while maintaining the Clone impl, which axum (and possibly tonic) requires.

@takkuumi
Copy link
Author

takkuumi commented Dec 6, 2021

@davidpdrsn it works, why dont erase the type of a service on into_service return type?

@davidpdrsn
Copy link
Member

Not all users need it but it’ll be done regardless by the move to axum.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants