Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider making Streaming<T> Sync #81

Closed
NeoLegends opened this issue Oct 21, 2019 · 8 comments · Fixed by #84
Closed

Consider making Streaming<T> Sync #81

NeoLegends opened this issue Oct 21, 2019 · 8 comments · Fixed by #84

Comments

@NeoLegends
Copy link

Feature Request

Crates

Main Crate

Motivation

I'm trying to output a unidirectional tonic gRPC stream via SSE using warp. I'm using warp from seanmonstar/warp#265 to be able to use it with async await. Changing warp to be compatible with async / await requires that all responses handed over to warp (and in turn handed over to hyper) to be Sync. However, the stream returned from a streaming gRPC response is not Sync.

Proposal

Rustc complains that the decoder specifically is the part that is not sync:

decoder: Box<dyn Decoder<Item = T, Error = Status> + Send + 'static>,

As far as I can see, decoder is not accessible from the outside, and thus it should be safe to mark the whole Streaming struct as Sync. I would be happy to file a PR for that.

Alternatives

I don't see any, because AFAIK the Sync bound stems from a layer deep down inside hyper, and I guess it's unfeasible to change that somehow to remove it.

NeoLegends added a commit to NeoLegends/tonic that referenced this issue Oct 21, 2019
Safe because there are no `&self` methods on `Streaming<T>` that access any of its non `Sync` members.

Fixes hyperium#81
@NeoLegends
Copy link
Author

NeoLegends commented Oct 21, 2019

In case you accept this I have already filed PR #82 that applies the necessary change. Feel free to close it otherwise.

@LucioFranco
Copy link
Member

@NeoLegends sorry for the delay, this should be done! Let me know if you run into any issues. I plan to get a new release out soon.

@parasyte
Copy link

parasyte commented Nov 2, 2019

@LucioFranco This new trait bound broke my code because hyper::client:ResponseFuture is not Sync.

error[E0277]: `(dyn core::future::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely
   --> src/lib.rs:153:26
    |
153 |         Ok(Response::new(Box::pin(outbound)
    |                          ^^^^^^^^^^^^^^^^^^ `(dyn core::future::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely

Still looking into a way to resolve this... Does hyper need to be patched to add the trait bound to ResponseFuture? Or can I wrap this future in something to make it Sync, like futures::lock::Mutex?

cc @seanmonstar Maybe you can help out here?

@LucioFranco
Copy link
Member

@parasyte maybe I'm missing something but what is the reason you are returning a future as the response type?

@parasyte
Copy link

parasyte commented Nov 2, 2019

@LucioFranco Sorry for the confusion, I'm not returning Future as a response type at all. It will help to have a full example to the issue. I'll get to that for you soon.

The short description is that I'm calling the get method on a hyper::client::Client in an async_stream::try_stream! {} loop; the return type is a gRPC streaming response. E.g. Pin<Box<dyn Stream<Item = Result<Message, Status> + Send + Sync + 'static>>

Awaiting this non-Sync ResponseFuture from the hyper client (and adding the necessary Sync bound to my streaming response type) is what causes the compile error.

@LucioFranco
Copy link
Member

@parasyte you're right...I'm not sure how to move forward on this one @seanmonstar any ideas?

@seanmonstar
Copy link
Member

Hrm, is the problem that hyper::client::ResponseFuture has a Box<dyn Future + Send> inside it, and not Box<dyn Future + Send + Sync>?

Oh wait, is this because it technically needs a Sync because of await being a jerk?

@parasyte
Copy link

parasyte commented Nov 5, 2019

@seanmonstar More info in #117 Yes, I think it has to do with await seeing the Sync trait bound and expecting everything to be Sync

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants