Skip to content
This repository has been archived by the owner on Apr 16, 2020. It is now read-only.

How to use with the futures 0.3 compatibility layer? #165

Open
archseer opened this issue May 3, 2019 · 0 comments
Open

How to use with the futures 0.3 compatibility layer? #165

archseer opened this issue May 3, 2019 · 0 comments

Comments

@archseer
Copy link

archseer commented May 3, 2019

I've been experimenting with #157 and trying to use async fn for the RPC implementation bodies. I've run into issues since we can't return impl Future from traits / use async futures.

    fn subscribe(&mut self, request: Request<SubscribeRequest>) -> SubscribeFuture {
        use std::thread;
        async move {
            println!("Subscribe = {:?}", request);

            let (tx, rx) = mpsc::channel(4);

            let state = self.state.clone();

            thread::spawn(move || {
                let mut tx = tx.wait();

                tx.send(Event {
                    event_id: "abc".to_owned(),
                })
                .unwrap();
            });

            let rx = rx.map_err(|_| unimplemented!());
            let rx: Self::SubscribeStream = Box::new(rx);
            Ok(Response::new(rx))
        }.boxed().compat();
    }

I saw there was a FutureObj to sort of fill the gap, but I couldn't get it to work either:

    fn subscribe(&mut self, request: Request<SubscribeRequest>) -> futures::future::FutureObj<Result<Response<Self::SubscribeStream>, tower_grpc::Status>> {
        use std::thread;
        let fut = async move {
            println!("Subscribe = {:?}", request);

            let (tx, rx) = mpsc::channel(4);

            let state = self.state.clone();

            thread::spawn(move || {
                let mut tx = tx.wait();

                tx.send(Event {
                    event_id: "abc".to_owned(),
                })
                .unwrap();
            });

            let rx = rx.map_err(|_| unimplemented!());
            let rx: Self::SubscribeStream = Box::new(rx);
            Ok(Response::new(rx))
        };
        futures::future::FutureObj::new(Box::new(fut))
    }
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant