From 430580411cf51c4408b9e85b51e14d1462e5b035 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 18 Nov 2021 10:20:37 +0100 Subject: [PATCH 01/16] Add generic trait to combine UnixListener and TcpListener --- tokio-util/src/lib.rs | 1 + tokio-util/src/net/mod.rs | 4 ++ tokio-util/src/net/unix/mod.rs | 108 +++++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+) create mode 100644 tokio-util/src/net/mod.rs create mode 100644 tokio-util/src/net/unix/mod.rs diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 0b3e5962343..c1970cfd108 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -31,6 +31,7 @@ cfg_codec! { cfg_net! { pub mod udp; + pub mod net; } cfg_compat! { diff --git a/tokio-util/src/net/mod.rs b/tokio-util/src/net/mod.rs new file mode 100644 index 00000000000..0c07837ac9d --- /dev/null +++ b/tokio-util/src/net/mod.rs @@ -0,0 +1,4 @@ +//! TCP/UDP/Unix helpers for tokio. + +#[cfg(unix)] +pub mod unix; diff --git a/tokio-util/src/net/unix/mod.rs b/tokio-util/src/net/unix/mod.rs new file mode 100644 index 00000000000..57f0d47886f --- /dev/null +++ b/tokio-util/src/net/unix/mod.rs @@ -0,0 +1,108 @@ +//! Unix domain socket helpers. + +use std::future::Future; +use std::io::Result; +use std::pin::Pin; + +/// Listening address. +#[derive(Debug)] +pub enum ListenAddr { + /// Socket address. + SocketAddr(std::net::SocketAddr), + /// Unix socket. + UnixSocket(tokio::net::unix::SocketAddr), +} + +impl From for ListenAddr { + fn from(addr: std::net::SocketAddr) -> Self { + Self::SocketAddr(addr) + } +} + +impl From for ListenAddr { + fn from(addr: tokio::net::unix::SocketAddr) -> Self { + Self::UnixSocket(addr) + } +} + +/// A trait for a listener: `TcpListener` and `UnixListener`. +pub trait Listener: Send + Unpin { + /// Accepts a new incoming connection from this listener. + fn accept<'a>( + &'a self, + ) -> Pin< + Box< + dyn Future< + Output = Result<(Box, ListenAddr)>, + > + Send + + 'a, + >, + >; + + /// Returns the local address that this listener is bound to. + fn local_addr(&self) -> Result; +} + +impl Listener for tokio::net::TcpListener { + fn accept<'a>( + &'a self, + ) -> Pin< + Box< + dyn Future< + Output = Result<(Box, ListenAddr)>, + > + Send + + 'a, + >, + > { + let accept = self.accept(); + Box::pin(async move { + let (stream, addr) = accept.await?; + Ok((Box::new(stream) as Box<_>, addr.into())) + }) + } + + fn local_addr(&self) -> Result { + self.local_addr().map(Into::into) + } +} + +impl Listener for tokio::net::UnixListener { + fn accept<'a>( + &'a self, + ) -> Pin< + Box< + dyn Future< + Output = Result<(Box, ListenAddr)>, + > + Send + + 'a, + >, + > { + let accept = self.accept(); + Box::pin(async { + let (stream, addr) = accept.await?; + Ok((Box::new(stream) as Box<_>, addr.into())) + }) + } + + fn local_addr(&self) -> Result { + self.local_addr().map(Into::into) + } +} + +/// A trait that combines `tokio::io::AsyncRead` and `tokio::io::AsyncWrite`. +pub trait AsyncReadWrite: tokio::io::AsyncRead + tokio::io::AsyncWrite { + /// Sets the value of the `TCP_NODELAY` option on this socket if this is a TCP socket. + fn set_nodelay(&self, nodelay: bool) -> Result<()>; +} + +impl AsyncReadWrite for tokio::net::TcpStream { + fn set_nodelay(&self, nodelay: bool) -> Result<()> { + self.set_nodelay(nodelay) + } +} + +impl AsyncReadWrite for tokio::net::UnixStream { + fn set_nodelay(&self, _nodelay: bool) -> Result<()> { + Ok(()) + } +} From 69b037934bbe2176a0cdf1b061033511fea1a911 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 18 Nov 2021 10:33:43 +0100 Subject: [PATCH 02/16] Remove unnecessary move --- tokio-util/src/net/unix/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/src/net/unix/mod.rs b/tokio-util/src/net/unix/mod.rs index 57f0d47886f..c9d29376735 100644 --- a/tokio-util/src/net/unix/mod.rs +++ b/tokio-util/src/net/unix/mod.rs @@ -55,7 +55,7 @@ impl Listener for tokio::net::TcpListener { >, > { let accept = self.accept(); - Box::pin(async move { + Box::pin(async { let (stream, addr) = accept.await?; Ok((Box::new(stream) as Box<_>, addr.into())) }) From cd1ddf918f152c08d8d78ce358e4303523927e61 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 7 Jan 2022 13:43:01 +0100 Subject: [PATCH 03/16] Replace Box by associated type and remove unused trait --- tokio-util/src/net/unix/mod.rs | 56 ++++++++-------------------------- 1 file changed, 12 insertions(+), 44 deletions(-) diff --git a/tokio-util/src/net/unix/mod.rs b/tokio-util/src/net/unix/mod.rs index c9d29376735..69dc2d0f9d7 100644 --- a/tokio-util/src/net/unix/mod.rs +++ b/tokio-util/src/net/unix/mod.rs @@ -27,37 +27,28 @@ impl From for ListenAddr { /// A trait for a listener: `TcpListener` and `UnixListener`. pub trait Listener: Send + Unpin { + /// The stream's type of this listener. + type Io: tokio::io::AsyncRead + tokio::io::AsyncWrite; + /// Accepts a new incoming connection from this listener. fn accept<'a>( &'a self, - ) -> Pin< - Box< - dyn Future< - Output = Result<(Box, ListenAddr)>, - > + Send - + 'a, - >, - >; + ) -> Pin> + Send + 'a>>; /// Returns the local address that this listener is bound to. fn local_addr(&self) -> Result; } impl Listener for tokio::net::TcpListener { + type Io = tokio::net::TcpStream; + fn accept<'a>( &'a self, - ) -> Pin< - Box< - dyn Future< - Output = Result<(Box, ListenAddr)>, - > + Send - + 'a, - >, - > { + ) -> Pin> + Send + 'a>> { let accept = self.accept(); Box::pin(async { let (stream, addr) = accept.await?; - Ok((Box::new(stream) as Box<_>, addr.into())) + Ok((stream, addr.into())) }) } @@ -67,20 +58,15 @@ impl Listener for tokio::net::TcpListener { } impl Listener for tokio::net::UnixListener { + type Io = tokio::net::UnixStream; + fn accept<'a>( &'a self, - ) -> Pin< - Box< - dyn Future< - Output = Result<(Box, ListenAddr)>, - > + Send - + 'a, - >, - > { + ) -> Pin> + Send + 'a>> { let accept = self.accept(); Box::pin(async { let (stream, addr) = accept.await?; - Ok((Box::new(stream) as Box<_>, addr.into())) + Ok((stream, addr.into())) }) } @@ -88,21 +74,3 @@ impl Listener for tokio::net::UnixListener { self.local_addr().map(Into::into) } } - -/// A trait that combines `tokio::io::AsyncRead` and `tokio::io::AsyncWrite`. -pub trait AsyncReadWrite: tokio::io::AsyncRead + tokio::io::AsyncWrite { - /// Sets the value of the `TCP_NODELAY` option on this socket if this is a TCP socket. - fn set_nodelay(&self, nodelay: bool) -> Result<()>; -} - -impl AsyncReadWrite for tokio::net::TcpStream { - fn set_nodelay(&self, nodelay: bool) -> Result<()> { - self.set_nodelay(nodelay) - } -} - -impl AsyncReadWrite for tokio::net::UnixStream { - fn set_nodelay(&self, _nodelay: bool) -> Result<()> { - Ok(()) - } -} From 9bda024b78071bcee3736a146030eb7f24d26e0c Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 12 Jan 2022 12:53:47 +0100 Subject: [PATCH 04/16] Remove ListenAddr enum & replace with type alias --- tokio-util/src/net/unix/mod.rs | 37 +++++++++------------------------- 1 file changed, 10 insertions(+), 27 deletions(-) diff --git a/tokio-util/src/net/unix/mod.rs b/tokio-util/src/net/unix/mod.rs index 69dc2d0f9d7..9e41904cdef 100644 --- a/tokio-util/src/net/unix/mod.rs +++ b/tokio-util/src/net/unix/mod.rs @@ -4,47 +4,29 @@ use std::future::Future; use std::io::Result; use std::pin::Pin; -/// Listening address. -#[derive(Debug)] -pub enum ListenAddr { - /// Socket address. - SocketAddr(std::net::SocketAddr), - /// Unix socket. - UnixSocket(tokio::net::unix::SocketAddr), -} - -impl From for ListenAddr { - fn from(addr: std::net::SocketAddr) -> Self { - Self::SocketAddr(addr) - } -} - -impl From for ListenAddr { - fn from(addr: tokio::net::unix::SocketAddr) -> Self { - Self::UnixSocket(addr) - } -} - /// A trait for a listener: `TcpListener` and `UnixListener`. pub trait Listener: Send + Unpin { /// The stream's type of this listener. type Io: tokio::io::AsyncRead + tokio::io::AsyncWrite; + /// The socket address type of this listener. + type Addr; /// Accepts a new incoming connection from this listener. fn accept<'a>( &'a self, - ) -> Pin> + Send + 'a>>; + ) -> Pin> + Send + 'a>>; /// Returns the local address that this listener is bound to. - fn local_addr(&self) -> Result; + fn local_addr(&self) -> Result; } impl Listener for tokio::net::TcpListener { type Io = tokio::net::TcpStream; + type Addr = std::net::SocketAddr; fn accept<'a>( &'a self, - ) -> Pin> + Send + 'a>> { + ) -> Pin> + Send + 'a>> { let accept = self.accept(); Box::pin(async { let (stream, addr) = accept.await?; @@ -52,17 +34,18 @@ impl Listener for tokio::net::TcpListener { }) } - fn local_addr(&self) -> Result { + fn local_addr(&self) -> Result { self.local_addr().map(Into::into) } } impl Listener for tokio::net::UnixListener { type Io = tokio::net::UnixStream; + type Addr = tokio::net::unix::SocketAddr; fn accept<'a>( &'a self, - ) -> Pin> + Send + 'a>> { + ) -> Pin> + Send + 'a>> { let accept = self.accept(); Box::pin(async { let (stream, addr) = accept.await?; @@ -70,7 +53,7 @@ impl Listener for tokio::net::UnixListener { }) } - fn local_addr(&self) -> Result { + fn local_addr(&self) -> Result { self.local_addr().map(Into::into) } } From d0dba7a76bc84f03b635b10eb166dd3eefe50be0 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 12 Jan 2022 15:54:10 +0100 Subject: [PATCH 05/16] Impl Listener for Either --- tokio-util/src/net/unix/mod.rs | 44 ++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tokio-util/src/net/unix/mod.rs b/tokio-util/src/net/unix/mod.rs index 9e41904cdef..1ecbdeb198f 100644 --- a/tokio-util/src/net/unix/mod.rs +++ b/tokio-util/src/net/unix/mod.rs @@ -1,5 +1,6 @@ //! Unix domain socket helpers. +use crate::either::Either; use std::future::Future; use std::io::Result; use std::pin::Pin; @@ -57,3 +58,46 @@ impl Listener for tokio::net::UnixListener { self.local_addr().map(Into::into) } } + +impl Listener for Either +where + L: Listener, + R: Listener, +{ + type Io = Either<::Io, ::Io>; + type Addr = Either<::Addr, ::Addr>; + + fn accept<'a>( + &'a self, + ) -> Pin> + Send + 'a>> { + match self { + Either::Left(listener) => { + let fut = listener.accept(); + Box::pin(async move { + let (stream, addr) = fut.await?; + Ok((Either::Left(stream), Either::Left(addr))) + }) + } + Either::Right(listener) => { + let fut = listener.accept(); + Box::pin(async move { + let (stream, addr) = fut.await?; + Ok((Either::Right(stream), Either::Right(addr))) + }) + } + } + } + + fn local_addr(&self) -> Result { + match self { + Either::Left(listener) => { + let addr = listener.local_addr()?; + Ok(Either::Left(addr)) + } + Either::Right(listener) => { + let addr = listener.local_addr()?; + Ok(Either::Right(addr)) + } + } + } +} From c2ec467a515e7d106b815fabd2771df837593205 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 13 Jan 2022 12:37:33 +0100 Subject: [PATCH 06/16] Add poll_accept & struct that impl Future --- tokio-util/src/net/unix/mod.rs | 83 +++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 36 deletions(-) diff --git a/tokio-util/src/net/unix/mod.rs b/tokio-util/src/net/unix/mod.rs index 1ecbdeb198f..5a720e4149c 100644 --- a/tokio-util/src/net/unix/mod.rs +++ b/tokio-util/src/net/unix/mod.rs @@ -4,6 +4,7 @@ use crate::either::Either; use std::future::Future; use std::io::Result; use std::pin::Pin; +use std::task::{Context, Poll}; /// A trait for a listener: `TcpListener` and `UnixListener`. pub trait Listener: Send + Unpin { @@ -12,10 +13,16 @@ pub trait Listener: Send + Unpin { /// The socket address type of this listener. type Addr; + /// Polls to accept a new incoming connection to this listener. + fn poll_accept(&self, cx: &mut Context<'_>) -> Poll>; + /// Accepts a new incoming connection from this listener. - fn accept<'a>( - &'a self, - ) -> Pin> + Send + 'a>>; + fn accept<'a>(&'a mut self) -> ListenerAcceptFut<'a, Self> + where + Self: Sized + Unpin, + { + ListenerAcceptFut::new(self) + } /// Returns the local address that this listener is bound to. fn local_addr(&self) -> Result; @@ -25,14 +32,8 @@ impl Listener for tokio::net::TcpListener { type Io = tokio::net::TcpStream; type Addr = std::net::SocketAddr; - fn accept<'a>( - &'a self, - ) -> Pin> + Send + 'a>> { - let accept = self.accept(); - Box::pin(async { - let (stream, addr) = accept.await?; - Ok((stream, addr.into())) - }) + fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { + Self::poll_accept(self, cx) } fn local_addr(&self) -> Result { @@ -44,14 +45,8 @@ impl Listener for tokio::net::UnixListener { type Io = tokio::net::UnixStream; type Addr = tokio::net::unix::SocketAddr; - fn accept<'a>( - &'a self, - ) -> Pin> + Send + 'a>> { - let accept = self.accept(); - Box::pin(async { - let (stream, addr) = accept.await?; - Ok((stream, addr.into())) - }) + fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { + Self::poll_accept(self, cx) } fn local_addr(&self) -> Result { @@ -67,24 +62,14 @@ where type Io = Either<::Io, ::Io>; type Addr = Either<::Addr, ::Addr>; - fn accept<'a>( - &'a self, - ) -> Pin> + Send + 'a>> { + fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { match self { - Either::Left(listener) => { - let fut = listener.accept(); - Box::pin(async move { - let (stream, addr) = fut.await?; - Ok((Either::Left(stream), Either::Left(addr))) - }) - } - Either::Right(listener) => { - let fut = listener.accept(); - Box::pin(async move { - let (stream, addr) = fut.await?; - Ok((Either::Right(stream), Either::Right(addr))) - }) - } + Either::Left(listener) => listener + .poll_accept(cx) + .map(|res| res.map(|(stream, addr)| (Either::Left(stream), Either::Left(addr)))), + Either::Right(listener) => listener + .poll_accept(cx) + .map(|res| res.map(|(stream, addr)| (Either::Right(stream), Either::Right(addr)))), } } @@ -101,3 +86,29 @@ where } } } + +/// Future for accepting a new connection from a listener. +#[derive(Debug)] +pub struct ListenerAcceptFut<'a, L> { + listener: &'a L, +} + +impl<'a, L> ListenerAcceptFut<'a, L> +where + L: Listener, +{ + fn new(listener: &'a L) -> Self { + Self { listener } + } +} + +impl<'a, L> Future for ListenerAcceptFut<'a, L> +where + L: Listener, +{ + type Output = Result<(L::Io, L::Addr)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.listener.poll_accept(cx) + } +} From 8afdc3e57d4fe8ccbbbab468681bef56f7f82b0a Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 13 Jan 2022 12:42:49 +0100 Subject: [PATCH 07/16] Move things to the right place --- tokio-util/src/net/mod.rs | 100 +++++++++++++++++++++++++++++++++ tokio-util/src/net/unix/mod.rs | 98 +------------------------------- 2 files changed, 101 insertions(+), 97 deletions(-) diff --git a/tokio-util/src/net/mod.rs b/tokio-util/src/net/mod.rs index 0c07837ac9d..ca93461890b 100644 --- a/tokio-util/src/net/mod.rs +++ b/tokio-util/src/net/mod.rs @@ -1,4 +1,104 @@ //! TCP/UDP/Unix helpers for tokio. +use crate::either::Either; +use std::future::Future; +use std::io::Result; +use std::pin::Pin; +use std::task::{Context, Poll}; + #[cfg(unix)] pub mod unix; + +/// A trait for a listener: `TcpListener` and `UnixListener`. +pub trait Listener: Send + Unpin { + /// The stream's type of this listener. + type Io: tokio::io::AsyncRead + tokio::io::AsyncWrite; + /// The socket address type of this listener. + type Addr; + + /// Polls to accept a new incoming connection to this listener. + fn poll_accept(&self, cx: &mut Context<'_>) -> Poll>; + + /// Accepts a new incoming connection from this listener. + fn accept<'a>(&'a self) -> ListenerAcceptFut<'a, Self> + where + Self: Sized + Unpin, + { + ListenerAcceptFut::new(self) + } + + /// Returns the local address that this listener is bound to. + fn local_addr(&self) -> Result; +} + +impl Listener for tokio::net::TcpListener { + type Io = tokio::net::TcpStream; + type Addr = std::net::SocketAddr; + + fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { + Self::poll_accept(self, cx) + } + + fn local_addr(&self) -> Result { + self.local_addr().map(Into::into) + } +} + +/// Future for accepting a new connection from a listener. +#[derive(Debug)] +pub struct ListenerAcceptFut<'a, L> { + listener: &'a L, +} + +impl<'a, L> ListenerAcceptFut<'a, L> +where + L: Listener, +{ + fn new(listener: &'a L) -> Self { + Self { listener } + } +} + +impl<'a, L> Future for ListenerAcceptFut<'a, L> +where + L: Listener, +{ + type Output = Result<(L::Io, L::Addr)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.listener.poll_accept(cx) + } +} + +impl Listener for Either +where + L: Listener, + R: Listener, +{ + type Io = Either<::Io, ::Io>; + type Addr = Either<::Addr, ::Addr>; + + fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { + match self { + Either::Left(listener) => listener + .poll_accept(cx) + .map(|res| res.map(|(stream, addr)| (Either::Left(stream), Either::Left(addr)))), + Either::Right(listener) => listener + .poll_accept(cx) + .map(|res| res.map(|(stream, addr)| (Either::Right(stream), Either::Right(addr)))), + } + } + + fn local_addr(&self) -> Result { + match self { + Either::Left(listener) => { + let addr = listener.local_addr()?; + Ok(Either::Left(addr)) + } + Either::Right(listener) => { + let addr = listener.local_addr()?; + Ok(Either::Right(addr)) + } + } + } +} diff --git a/tokio-util/src/net/unix/mod.rs b/tokio-util/src/net/unix/mod.rs index 5a720e4149c..42c4fe27827 100644 --- a/tokio-util/src/net/unix/mod.rs +++ b/tokio-util/src/net/unix/mod.rs @@ -1,46 +1,9 @@ //! Unix domain socket helpers. -use crate::either::Either; -use std::future::Future; +use super::Listener; use std::io::Result; -use std::pin::Pin; use std::task::{Context, Poll}; -/// A trait for a listener: `TcpListener` and `UnixListener`. -pub trait Listener: Send + Unpin { - /// The stream's type of this listener. - type Io: tokio::io::AsyncRead + tokio::io::AsyncWrite; - /// The socket address type of this listener. - type Addr; - - /// Polls to accept a new incoming connection to this listener. - fn poll_accept(&self, cx: &mut Context<'_>) -> Poll>; - - /// Accepts a new incoming connection from this listener. - fn accept<'a>(&'a mut self) -> ListenerAcceptFut<'a, Self> - where - Self: Sized + Unpin, - { - ListenerAcceptFut::new(self) - } - - /// Returns the local address that this listener is bound to. - fn local_addr(&self) -> Result; -} - -impl Listener for tokio::net::TcpListener { - type Io = tokio::net::TcpStream; - type Addr = std::net::SocketAddr; - - fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { - Self::poll_accept(self, cx) - } - - fn local_addr(&self) -> Result { - self.local_addr().map(Into::into) - } -} - impl Listener for tokio::net::UnixListener { type Io = tokio::net::UnixStream; type Addr = tokio::net::unix::SocketAddr; @@ -53,62 +16,3 @@ impl Listener for tokio::net::UnixListener { self.local_addr().map(Into::into) } } - -impl Listener for Either -where - L: Listener, - R: Listener, -{ - type Io = Either<::Io, ::Io>; - type Addr = Either<::Addr, ::Addr>; - - fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { - match self { - Either::Left(listener) => listener - .poll_accept(cx) - .map(|res| res.map(|(stream, addr)| (Either::Left(stream), Either::Left(addr)))), - Either::Right(listener) => listener - .poll_accept(cx) - .map(|res| res.map(|(stream, addr)| (Either::Right(stream), Either::Right(addr)))), - } - } - - fn local_addr(&self) -> Result { - match self { - Either::Left(listener) => { - let addr = listener.local_addr()?; - Ok(Either::Left(addr)) - } - Either::Right(listener) => { - let addr = listener.local_addr()?; - Ok(Either::Right(addr)) - } - } - } -} - -/// Future for accepting a new connection from a listener. -#[derive(Debug)] -pub struct ListenerAcceptFut<'a, L> { - listener: &'a L, -} - -impl<'a, L> ListenerAcceptFut<'a, L> -where - L: Listener, -{ - fn new(listener: &'a L) -> Self { - Self { listener } - } -} - -impl<'a, L> Future for ListenerAcceptFut<'a, L> -where - L: Listener, -{ - type Output = Result<(L::Io, L::Addr)>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.listener.poll_accept(cx) - } -} From 75dcc76c88bf5fc1b0857c4c470ee75fd3d1d0cd Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 13 Jan 2022 17:25:04 +0100 Subject: [PATCH 08/16] Clippy fixes --- tokio-util/src/net/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/src/net/mod.rs b/tokio-util/src/net/mod.rs index ca93461890b..ef08b8d58a1 100644 --- a/tokio-util/src/net/mod.rs +++ b/tokio-util/src/net/mod.rs @@ -20,7 +20,7 @@ pub trait Listener: Send + Unpin { fn poll_accept(&self, cx: &mut Context<'_>) -> Poll>; /// Accepts a new incoming connection from this listener. - fn accept<'a>(&'a self) -> ListenerAcceptFut<'a, Self> + fn accept(&self) -> ListenerAcceptFut<'_, Self> where Self: Sized + Unpin, { From 4a46da1a2123683a266ea2918f4afbe86a976118 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 14 Jan 2022 12:11:01 +0100 Subject: [PATCH 09/16] Update tokio-util/src/net/mod.rs Co-authored-by: Alice Ryhl --- tokio-util/src/net/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/src/net/mod.rs b/tokio-util/src/net/mod.rs index ef08b8d58a1..a523e7a43b5 100644 --- a/tokio-util/src/net/mod.rs +++ b/tokio-util/src/net/mod.rs @@ -10,7 +10,7 @@ use std::task::{Context, Poll}; pub mod unix; /// A trait for a listener: `TcpListener` and `UnixListener`. -pub trait Listener: Send + Unpin { +pub trait Listener { /// The stream's type of this listener. type Io: tokio::io::AsyncRead + tokio::io::AsyncWrite; /// The socket address type of this listener. From fe231ec671e442542ee656078c39e1a7af171857 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 14 Jan 2022 12:13:17 +0100 Subject: [PATCH 10/16] Use &mut self instead of &self for accept() and poll_accept() --- tokio-util/src/net/mod.rs | 14 +++++++------- tokio-util/src/net/unix/mod.rs | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tokio-util/src/net/mod.rs b/tokio-util/src/net/mod.rs index a523e7a43b5..1b7688d8a12 100644 --- a/tokio-util/src/net/mod.rs +++ b/tokio-util/src/net/mod.rs @@ -17,10 +17,10 @@ pub trait Listener { type Addr; /// Polls to accept a new incoming connection to this listener. - fn poll_accept(&self, cx: &mut Context<'_>) -> Poll>; + fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll>; /// Accepts a new incoming connection from this listener. - fn accept(&self) -> ListenerAcceptFut<'_, Self> + fn accept(&mut self) -> ListenerAcceptFut<'_, Self> where Self: Sized + Unpin, { @@ -35,7 +35,7 @@ impl Listener for tokio::net::TcpListener { type Io = tokio::net::TcpStream; type Addr = std::net::SocketAddr; - fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { + fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll> { Self::poll_accept(self, cx) } @@ -47,14 +47,14 @@ impl Listener for tokio::net::TcpListener { /// Future for accepting a new connection from a listener. #[derive(Debug)] pub struct ListenerAcceptFut<'a, L> { - listener: &'a L, + listener: &'a mut L, } impl<'a, L> ListenerAcceptFut<'a, L> where L: Listener, { - fn new(listener: &'a L) -> Self { + fn new(listener: &'a mut L) -> Self { Self { listener } } } @@ -65,7 +65,7 @@ where { type Output = Result<(L::Io, L::Addr)>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.listener.poll_accept(cx) } } @@ -78,7 +78,7 @@ where type Io = Either<::Io, ::Io>; type Addr = Either<::Addr, ::Addr>; - fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { + fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll> { match self { Either::Left(listener) => listener .poll_accept(cx) diff --git a/tokio-util/src/net/unix/mod.rs b/tokio-util/src/net/unix/mod.rs index 42c4fe27827..0b522c90a34 100644 --- a/tokio-util/src/net/unix/mod.rs +++ b/tokio-util/src/net/unix/mod.rs @@ -8,7 +8,7 @@ impl Listener for tokio::net::UnixListener { type Io = tokio::net::UnixStream; type Addr = tokio::net::unix::SocketAddr; - fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { + fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll> { Self::poll_accept(self, cx) } From a538f0bdc6124628e3649a61f2dc6a519847e2f2 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 14 Jan 2022 18:48:46 +0100 Subject: [PATCH 11/16] Remove Unpin from accept() --- tokio-util/src/net/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/src/net/mod.rs b/tokio-util/src/net/mod.rs index 1b7688d8a12..3143173b6f0 100644 --- a/tokio-util/src/net/mod.rs +++ b/tokio-util/src/net/mod.rs @@ -22,7 +22,7 @@ pub trait Listener { /// Accepts a new incoming connection from this listener. fn accept(&mut self) -> ListenerAcceptFut<'_, Self> where - Self: Sized + Unpin, + Self: Sized, { ListenerAcceptFut::new(self) } From e194407f064705a5ac0520bf6a6f709cd5d7240f Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Mon, 24 Jan 2022 15:22:41 +0100 Subject: [PATCH 12/16] Update tokio-util/src/net/mod.rs Co-authored-by: Alice Ryhl --- tokio-util/src/net/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio-util/src/net/mod.rs b/tokio-util/src/net/mod.rs index 3143173b6f0..0a87423c38b 100644 --- a/tokio-util/src/net/mod.rs +++ b/tokio-util/src/net/mod.rs @@ -46,6 +46,7 @@ impl Listener for tokio::net::TcpListener { /// Future for accepting a new connection from a listener. #[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ListenerAcceptFut<'a, L> { listener: &'a mut L, } From b639d98601b57fbc23979977f039799ec3b7035b Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Tue, 25 Jan 2022 10:18:46 +0100 Subject: [PATCH 13/16] Replace Listener impl on Either with custom methods --- tokio-util/src/net/mod.rs | 54 +++++++++++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/tokio-util/src/net/mod.rs b/tokio-util/src/net/mod.rs index 0a87423c38b..750fb306450 100644 --- a/tokio-util/src/net/mod.rs +++ b/tokio-util/src/net/mod.rs @@ -71,26 +71,36 @@ where } } -impl Listener for Either +impl Either where L: Listener, R: Listener, { - type Io = Either<::Io, ::Io>; - type Addr = Either<::Addr, ::Addr>; - - fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll> { + /// Polls to accept a new incoming connection to this listener. + pub fn poll_accept( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { match self { Either::Left(listener) => listener .poll_accept(cx) - .map(|res| res.map(|(stream, addr)| (Either::Left(stream), Either::Left(addr)))), + .map(|res| res.map(|(stream, addr)| Either::Left((stream, addr)))), Either::Right(listener) => listener .poll_accept(cx) - .map(|res| res.map(|(stream, addr)| (Either::Right(stream), Either::Right(addr)))), + .map(|res| res.map(|(stream, addr)| Either::Right((stream, addr)))), } } - fn local_addr(&self) -> Result { + /// Accepts a new incoming connection from this listener. + pub fn accept(&mut self) -> EitherListenerAcceptFut<'_, L, R> + where + Self: Sized, + { + EitherListenerAcceptFut::new(self) + } + + /// Returns the local address that this listener is bound to. + pub fn local_addr(&self) -> Result> { match self { Either::Left(listener) => { let addr = listener.local_addr()?; @@ -103,3 +113,31 @@ where } } } + +/// Future for accepting a new connection from a listener. +#[derive(Debug)] +pub struct EitherListenerAcceptFut<'a, L, R> { + listener: &'a mut Either, +} + +impl<'a, L, R> EitherListenerAcceptFut<'a, L, R> +where + L: Listener, + R: Listener, +{ + fn new(listener: &'a mut Either) -> Self { + Self { listener } + } +} + +impl<'a, L, R> Future for EitherListenerAcceptFut<'a, L, R> +where + L: Listener, + R: Listener, +{ + type Output = Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.listener.poll_accept(cx) + } +} From 27912b7a9cc6eead67767172a26a06832b3dbbf2 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Tue, 25 Jan 2022 10:23:06 +0100 Subject: [PATCH 14/16] Remove unnecessary new() functions --- tokio-util/src/net/mod.rs | 23 ++--------------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/tokio-util/src/net/mod.rs b/tokio-util/src/net/mod.rs index 750fb306450..906017ee91e 100644 --- a/tokio-util/src/net/mod.rs +++ b/tokio-util/src/net/mod.rs @@ -24,7 +24,7 @@ pub trait Listener { where Self: Sized, { - ListenerAcceptFut::new(self) + ListenerAcceptFut { listener: self } } /// Returns the local address that this listener is bound to. @@ -51,15 +51,6 @@ pub struct ListenerAcceptFut<'a, L> { listener: &'a mut L, } -impl<'a, L> ListenerAcceptFut<'a, L> -where - L: Listener, -{ - fn new(listener: &'a mut L) -> Self { - Self { listener } - } -} - impl<'a, L> Future for ListenerAcceptFut<'a, L> where L: Listener, @@ -96,7 +87,7 @@ where where Self: Sized, { - EitherListenerAcceptFut::new(self) + EitherListenerAcceptFut { listener: self } } /// Returns the local address that this listener is bound to. @@ -120,16 +111,6 @@ pub struct EitherListenerAcceptFut<'a, L, R> { listener: &'a mut Either, } -impl<'a, L, R> EitherListenerAcceptFut<'a, L, R> -where - L: Listener, - R: Listener, -{ - fn new(listener: &'a mut Either) -> Self { - Self { listener } - } -} - impl<'a, L, R> Future for EitherListenerAcceptFut<'a, L, R> where L: Listener, From 316f015ef0f03312ab59f09bb966d9e8d7f26123 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Tue, 25 Jan 2022 13:32:58 +0100 Subject: [PATCH 15/16] Remove EitherListenerAcceptFut --- tokio-util/src/net/mod.rs | 46 ++++++++++----------------------------- 1 file changed, 11 insertions(+), 35 deletions(-) diff --git a/tokio-util/src/net/mod.rs b/tokio-util/src/net/mod.rs index 906017ee91e..18d59c61e6b 100644 --- a/tokio-util/src/net/mod.rs +++ b/tokio-util/src/net/mod.rs @@ -67,27 +67,21 @@ where L: Listener, R: Listener, { - /// Polls to accept a new incoming connection to this listener. - pub fn poll_accept( - &mut self, - cx: &mut Context<'_>, - ) -> Poll>> { - match self { - Either::Left(listener) => listener - .poll_accept(cx) - .map(|res| res.map(|(stream, addr)| Either::Left((stream, addr)))), - Either::Right(listener) => listener - .poll_accept(cx) - .map(|res| res.map(|(stream, addr)| Either::Right((stream, addr)))), - } - } - /// Accepts a new incoming connection from this listener. - pub fn accept(&mut self) -> EitherListenerAcceptFut<'_, L, R> + pub async fn accept(&mut self) -> Result> where Self: Sized, { - EitherListenerAcceptFut { listener: self } + match self { + Either::Left(listener) => { + let (stream, addr) = listener.accept().await?; + Ok(Either::Left((stream, addr))) + } + Either::Right(listener) => { + let (stream, addr) = listener.accept().await?; + Ok(Either::Right((stream, addr))) + } + } } /// Returns the local address that this listener is bound to. @@ -104,21 +98,3 @@ where } } } - -/// Future for accepting a new connection from a listener. -#[derive(Debug)] -pub struct EitherListenerAcceptFut<'a, L, R> { - listener: &'a mut Either, -} - -impl<'a, L, R> Future for EitherListenerAcceptFut<'a, L, R> -where - L: Listener, - R: Listener, -{ - type Output = Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.listener.poll_accept(cx) - } -} From 61bd0fa56abeb889630997bc0796f456548642f4 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Tue, 25 Jan 2022 13:45:22 +0100 Subject: [PATCH 16/16] Removed Sized --- tokio-util/src/net/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tokio-util/src/net/mod.rs b/tokio-util/src/net/mod.rs index 18d59c61e6b..4817e10d0f3 100644 --- a/tokio-util/src/net/mod.rs +++ b/tokio-util/src/net/mod.rs @@ -68,10 +68,7 @@ where R: Listener, { /// Accepts a new incoming connection from this listener. - pub async fn accept(&mut self) -> Result> - where - Self: Sized, - { + pub async fn accept(&mut self) -> Result> { match self { Either::Left(listener) => { let (stream, addr) = listener.accept().await?;