diff --git a/examples/src/uds/server.rs b/examples/src/uds/server.rs index 8ffe4fa0a..b4ca31002 100644 --- a/examples/src/uds/server.rs +++ b/examples/src/uds/server.rs @@ -1,9 +1,12 @@ #![cfg_attr(not(unix), allow(unused_imports))] -use futures::TryFutureExt; use std::path::Path; #[cfg(unix)] use tokio::net::UnixListener; +#[cfg(unix)] +use tokio_stream::wrappers::UnixListenerStream; +#[cfg(unix)] +use tonic::transport::server::UdsConnectInfo; use tonic::{transport::Server, Request, Response, Status}; pub mod hello_world { @@ -26,7 +29,7 @@ impl Greeter for MyGreeter { ) -> Result, Status> { #[cfg(unix)] { - let conn_info = request.extensions().get::().unwrap(); + let conn_info = request.extensions().get::().unwrap(); println!("Got a request {:?} with info {:?}", request, conn_info); } @@ -46,89 +49,17 @@ async fn main() -> Result<(), Box> { let greeter = MyGreeter::default(); - let incoming = { - let uds = UnixListener::bind(path)?; - - async_stream::stream! { - loop { - let item = uds.accept().map_ok(|(st, _)| unix::UnixStream(st)).await; - - yield item; - } - } - }; + let uds = UnixListener::bind(path)?; + let uds_stream = UnixListenerStream::new(uds); Server::builder() .add_service(GreeterServer::new(greeter)) - .serve_with_incoming(incoming) + .serve_with_incoming(uds_stream) .await?; Ok(()) } -#[cfg(unix)] -mod unix { - use std::{ - pin::Pin, - sync::Arc, - task::{Context, Poll}, - }; - - use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; - use tonic::transport::server::Connected; - - #[derive(Debug)] - pub struct UnixStream(pub tokio::net::UnixStream); - - impl Connected for UnixStream { - type ConnectInfo = UdsConnectInfo; - - fn connect_info(&self) -> Self::ConnectInfo { - UdsConnectInfo { - peer_addr: self.0.peer_addr().ok().map(Arc::new), - peer_cred: self.0.peer_cred().ok(), - } - } - } - - #[derive(Clone, Debug)] - pub struct UdsConnectInfo { - pub peer_addr: Option>, - pub peer_cred: Option, - } - - impl AsyncRead for UnixStream { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) - } - } - - impl AsyncWrite for UnixStream { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_flush(cx) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.0).poll_shutdown(cx) - } - } -} - #[cfg(not(unix))] fn main() { panic!("The `uds` example only works on unix systems!");