Skip to content
This repository has been archived by the owner on Mar 13, 2021. It is now read-only.

Small, self contained example #3

Open
benschulz opened this issue Apr 18, 2020 · 1 comment
Open

Small, self contained example #3

benschulz opened this issue Apr 18, 2020 · 1 comment

Comments

@benschulz
Copy link

I was looking into tower recently and wanted to play with it a little. However, it was a bit tough getting started. A small and self contained example would have helped a lot. Everything I could find is built on top of hyper.

I ended up building the small example project below, based on tokio-tower. Ordinarily I would file a PR, but a) tower is in a bit of flux right now and b) I don't think it's good enough. There are three things that need improvement, I think.

  • Demonstrate how to properly use ServiceBuilder and the various layers it can add.
  • Use MakeService on the server side. That's what the documentation says it's for, but I couldn't figure out how.
  • A general clean up. My Rust game isn't at a level yet where I should write examples for other people.
use std::future::Future;
use std::pin::Pin;

use futures::future::{self, BoxFuture, FutureExt};
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use tokio_tower::multiplex;
use tower::Service;

#[tokio::main]
async fn main() -> Result<(), ClientError> {
    let addr = "127.0.0.1:4444".parse().unwrap();

    tokio::spawn(run_server(addr).map(|_| ()));

    // give the server a chance to start up
    tokio::task::yield_now().await;

    let mut client = setup_client(addr);

    let exprs = [
        Expr::add(187.0, 578.0),
        Expr::div(Expr::add(123.0, 321.0), 2.0),
        Expr::sub(Expr::mul(2.0, 25.0), 8.0),
    ];

    for expr in exprs.iter() {
        futures::future::poll_fn(|cx| client.poll_ready(cx)).await?;
        let res = client.call(Tagged::blank(expr.clone())).await?;

        println!("{} = {}", expr, res.content);
    }

    Ok(())
}

//
// Client
//

fn setup_client(
    addr: std::net::SocketAddrV4,
) -> impl Service<
    Tagged<Expr>,
    Response = Tagged<f64>,
    Error = ClientError,
    Future = impl Future<Output = Result<Tagged<f64>, ClientError>>,
> {
    tower::reconnect::Reconnect::new::<
        <ConnectToServer as Service<std::net::SocketAddrV4>>::Response,
        Tagged<Expr>,
    >(ConnectToServer, addr)
}

type ClientError = Box<dyn std::error::Error + Send + Sync>;

struct ConnectToServer;

impl Service<std::net::SocketAddrV4> for ConnectToServer {
    type Response = CalcClient;
    type Error = ClientError;
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(
        &mut self,
        _cx: &mut std::task::Context,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        Ok(()).into()
    }

    fn call(&mut self, req: std::net::SocketAddrV4) -> Self::Future {
        Box::pin(async move {
            let tcp_stream = tokio::net::TcpStream::connect(req).await?;

            let transport = multiplex::MultiplexTransport::new(
                async_bincode::AsyncBincodeStream::from(tcp_stream).for_async(),
                Tagger::new(),
            );

            let client = multiplex::Client::new(transport);

            Ok(CalcClient {
                delegate: Box::new(client),
            })
        })
    }
}

struct CalcClient {
    delegate: Box<
        dyn Service<
            Tagged<Expr>,
            Response = Tagged<f64>,
            Error = ClientError,
            Future = Pin<Box<dyn Future<Output = Result<Tagged<f64>, ClientError>> + Send>>,
        >,
    >,
}

impl Service<Tagged<Expr>> for CalcClient {
    type Response = Tagged<f64>;
    type Error = ClientError;
    type Future = Pin<Box<dyn Future<Output = Result<Tagged<f64>, ClientError>> + Send>>;

    fn poll_ready(
        &mut self,
        cx: &mut std::task::Context,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        self.delegate.poll_ready(cx)
    }

    fn call(&mut self, req: Tagged<Expr>) -> Self::Future {
        self.delegate.call(req)
    }
}

impl tokio_tower::multiplex::TagStore<Tagged<Expr>, Tagged<f64>> for Tagger {
    type Tag = u32;

    fn assign_tag(mut self: std::pin::Pin<&mut Self>, req: &mut Tagged<Expr>) -> Self::Tag {
        req.tag = self.slab.insert(()) as u32;
        req.tag
    }

    fn finish_tag(mut self: std::pin::Pin<&mut Self>, resp: &Tagged<f64>) -> Self::Tag {
        self.slab.remove(resp.tag as usize);
        resp.tag
    }
}

//
// Server
//

async fn run_server(addr: std::net::SocketAddrV4) -> Result<(), Box<dyn std::error::Error>> {
    let mut listener = TcpListener::bind(addr).await?;

    loop {
        let (tcp_stream, _) = listener.accept().await?;

        let server = multiplex::Server::new(
            async_bincode::AsyncBincodeStream::from(tcp_stream).for_async(),
            CalcServer,
        );

        let server = tower::ServiceBuilder::new().service(server);

        tokio::spawn(server);
    }
}

#[derive(Clone)]
struct CalcServer;

impl Service<Tagged<Expr>> for CalcServer {
    type Response = Tagged<f64>;
    type Error = ();
    type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(
        &mut self,
        _cx: &mut std::task::Context,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        Ok(()).into()
    }

    fn call(&mut self, req: Tagged<Expr>) -> Self::Future {
        future::ready(Ok(Tagged {
            tag: req.tag,
            content: eval(req.content),
        }))
    }
}

fn eval(expr: Expr) -> f64 {
    match expr {
        Expr::Add(a, b) => eval(*a) + eval(*b),
        Expr::Div(a, b) => eval(*a) / eval(*b),
        Expr::Mul(a, b) => eval(*a) * eval(*b),
        Expr::Sub(a, b) => eval(*a) - eval(*b),
        Expr::Val(x) => x,
    }
}

//
// Shared
//

#[derive(Clone, Deserialize, Serialize)]
enum Expr {
    Add(Box<Expr>, Box<Expr>),
    Div(Box<Expr>, Box<Expr>),
    Mul(Box<Expr>, Box<Expr>),
    Sub(Box<Expr>, Box<Expr>),
    Val(f64),
}

impl From<f64> for Expr {
    fn from(x: f64) -> Expr {
        Expr::Val(x)
    }
}

impl Expr {
    fn add<A: Into<Expr>, B: Into<Expr>>(a: A, b: B) -> Expr {
        Expr::Add(Box::new(a.into()), Box::new(b.into()))
    }

    fn div<A: Into<Expr>, B: Into<Expr>>(a: A, b: B) -> Expr {
        Expr::Div(Box::new(a.into()), Box::new(b.into()))
    }

    fn mul<A: Into<Expr>, B: Into<Expr>>(a: A, b: B) -> Expr {
        Expr::Mul(Box::new(a.into()), Box::new(b.into()))
    }

    fn sub<A: Into<Expr>, B: Into<Expr>>(a: A, b: B) -> Expr {
        Expr::Sub(Box::new(a.into()), Box::new(b.into()))
    }
}

impl std::fmt::Display for Expr {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Expr::Add(a, b) => write!(f, "({} + {})", a, b),
            Expr::Div(a, b) => write!(f, "({} ÷ {})", a, b),
            Expr::Mul(a, b) => write!(f, "({} * {})", a, b),
            Expr::Sub(a, b) => write!(f, "({} - {})", a, b),
            Expr::Val(x) => write!(f, "{}", x),
        }
    }
}

struct Tagger {
    slab: slab::Slab<()>,
}

impl Tagger {
    pub fn new() -> Self {
        Self {
            slab: slab::Slab::new(),
        }
    }
}

#[derive(Debug, Deserialize, Serialize)]
struct Tagged<T> {
    tag: u32,
    content: T,
}

impl<T> Tagged<T> {
    fn blank(content: T) -> Self {
        Self { tag: 0, content }
    }
}
@LucioFranco
Copy link
Member

From a quick glance this example looks nice! We do need more examples etc. Hopefully we can find the time once our spring cleaning work is done and we ship 0.4.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants