diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index 3f9bca038bb..5e8ec15a145 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -1,4 +1,4 @@ -use crate::io::PollEvented; +use crate::io::AsyncFd; use crate::net::tcp::TcpStream; use crate::net::{to_socket_addrs, ToSocketAddrs}; @@ -8,66 +8,64 @@ use std::io; use std::net::{self, SocketAddr}; use std::task::{Context, Poll}; -cfg_net! { - /// A TCP socket server, listening for connections. - /// - /// You can accept a new connection by using the [`accept`](`TcpListener::accept`) method. Alternatively `TcpListener` - /// implements the [`Stream`](`crate::stream::Stream`) trait, which allows you to use the listener in places that want a - /// stream. The stream will never return `None` and will also not yield the peer's `SocketAddr` structure. Iterating over - /// it is equivalent to calling accept in a loop. - /// - /// # Errors - /// - /// Note that accepting a connection can lead to various errors and not all - /// of them are necessarily fatal ‒ for example having too many open file - /// descriptors or the other side closing the connection while it waits in - /// an accept queue. These would terminate the stream if not handled in any - /// way. - /// - /// # Examples - /// - /// Using `accept`: - /// ```no_run - /// use tokio::net::TcpListener; - /// - /// use std::io; - /// - /// async fn process_socket(socket: T) { - /// # drop(socket); - /// // do work with socket here - /// } - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let listener = TcpListener::bind("127.0.0.1:8080").await?; - /// - /// loop { - /// let (socket, _) = listener.accept().await?; - /// process_socket(socket).await; - /// } - /// } - /// ``` - /// - /// Using `impl Stream`: - /// ```no_run - /// use tokio::{net::TcpListener, stream::StreamExt}; - /// - /// #[tokio::main] - /// async fn main() { - /// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); - /// while let Some(stream) = listener.next().await { - /// match stream { - /// Ok(stream) => { - /// println!("new client!"); - /// } - /// Err(e) => { /* connection failed */ } - /// } - /// } - /// } - /// ``` - pub struct TcpListener { - io: PollEvented, - } +/// A TCP socket server, listening for connections. +/// +/// You can accept a new connection by using the [`accept`](`TcpListener::accept`) method. Alternatively `TcpListener` +/// implements the [`Stream`](`crate::stream::Stream`) trait, which allows you to use the listener in places that want a +/// stream. The stream will never return `None` and will also not yield the peer's `SocketAddr` structure. Iterating over +/// it is equivalent to calling accept in a loop. +/// +/// # Errors +/// +/// Note that accepting a connection can lead to various errors and not all +/// of them are necessarily fatal ‒ for example having too many open file +/// descriptors or the other side closing the connection while it waits in +/// an accept queue. These would terminate the stream if not handled in any +/// way. +/// +/// # Examples +/// +/// Using `accept`: +/// ```no_run +/// use tokio::net::TcpListener; +/// +/// use std::io; +/// +/// async fn process_socket(socket: T) { +/// # drop(socket); +/// // do work with socket here +/// } +/// +/// #[tokio::main] +/// async fn main() -> io::Result<()> { +/// let listener = TcpListener::bind("127.0.0.1:8080").await?; +/// +/// loop { +/// let (socket, _) = listener.accept().await?; +/// process_socket(socket).await; +/// } +/// } +/// ``` +/// +/// Using `impl Stream`: +/// ```no_run +/// use tokio::{net::TcpListener, stream::StreamExt}; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); +/// while let Some(stream) = listener.next().await { +/// match stream { +/// Ok(stream) => { +/// println!("new client!"); +/// } +/// Err(e) => { /* connection failed */ } +/// } +/// } +/// } +/// ``` +pub struct TcpListener { + io: AsyncFd, } impl TcpListener { @@ -162,13 +160,18 @@ impl TcpListener { /// } /// ``` pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { - let (mio, addr) = self - .io - .async_io(mio::Interest::READABLE, |sock| sock.accept()) - .await?; + loop { + let mut ev = self.io.readable().await?; - let stream = TcpStream::new(mio)?; - Ok((stream, addr)) + match ev.with_io(|| self.io.inner().accept()) { + Ok((mio, addr)) => { + let stream = TcpStream::new(mio)?; + break Ok((stream, addr)); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => break Err(e), + } + } } /// Polls to accept a new incoming connection to this listener. @@ -181,15 +184,15 @@ impl TcpListener { /// single task. Failing to do this could result in tasks hanging. pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { loop { - let ev = ready!(self.io.poll_read_ready(cx))?; + let mut ev = ready!(self.io.poll_read_ready(cx))?; - match self.io.get_ref().accept() { + match self.io.inner().accept() { Ok((io, addr)) => { let io = TcpStream::new(io)?; return Poll::Ready(Ok((io, addr))); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); + ev.clear_ready(); } Err(e) => return Poll::Ready(Err(e)), } @@ -232,12 +235,12 @@ impl TcpListener { /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(listener: net::TcpListener) -> io::Result { let io = mio::net::TcpListener::from_std(listener); - let io = PollEvented::new(io)?; + let io = AsyncFd::new(io)?; Ok(TcpListener { io }) } pub(crate) fn new(listener: mio::net::TcpListener) -> io::Result { - let io = PollEvented::new(listener)?; + let io = AsyncFd::new(listener)?; Ok(TcpListener { io }) } @@ -265,7 +268,7 @@ impl TcpListener { /// } /// ``` pub fn local_addr(&self) -> io::Result { - self.io.get_ref().local_addr() + self.io.inner().local_addr() } /// Gets the value of the `IP_TTL` option for this socket. @@ -292,7 +295,7 @@ impl TcpListener { /// } /// ``` pub fn ttl(&self) -> io::Result { - self.io.get_ref().ttl() + self.io.inner().ttl() } /// Sets the value for the `IP_TTL` option on this socket. @@ -317,7 +320,7 @@ impl TcpListener { /// } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.io.get_ref().set_ttl(ttl) + self.io.inner().set_ttl(ttl) } } @@ -345,7 +348,7 @@ impl TryFrom for TcpListener { impl fmt::Debug for TcpListener { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.inner().fmt(f) } } @@ -356,7 +359,7 @@ mod sys { impl AsRawFd for TcpListener { fn as_raw_fd(&self) -> RawFd { - self.io.get_ref().as_raw_fd() + self.io.inner().as_raw_fd() } } } @@ -368,7 +371,7 @@ mod sys { impl AsRawSocket for TcpListener { fn as_raw_socket(&self) -> RawSocket { - self.io.get_ref().as_raw_socket() + self.io.inner().as_raw_socket() } } } diff --git a/tokio/tests/io_driver_drop.rs b/tokio/tests/io_driver_drop.rs index 631e66e9fbe..804746e8c26 100644 --- a/tokio/tests/io_driver_drop.rs +++ b/tokio/tests/io_driver_drop.rs @@ -12,6 +12,7 @@ fn tcp_doesnt_block() { let listener = { let _enter = rt.enter(); let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + listener.set_nonblocking(true).unwrap(); TcpListener::from_std(listener).unwrap() }; @@ -31,6 +32,7 @@ fn drop_wakes() { let listener = { let _enter = rt.enter(); let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + listener.set_nonblocking(true).unwrap(); TcpListener::from_std(listener).unwrap() };