Skip to content

Commit

Permalink
tcp: port TcpListener to AsyncFd
Browse files Browse the repository at this point in the history
This is a proof-of-concept to help verify that we can remove poll_evented in
favor of AsyncFd.
  • Loading branch information
Bryan Donlan committed Oct 20, 2020
1 parent 799e763 commit 68b353a
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 78 deletions.
159 changes: 81 additions & 78 deletions tokio/src/net/tcp/listener.rs
@@ -1,4 +1,4 @@
use crate::io::PollEvented;
use crate::io::AsyncFd;
use crate::net::tcp::TcpStream;
use crate::net::{to_socket_addrs, ToSocketAddrs};

Expand All @@ -8,66 +8,64 @@ use std::io;
use std::net::{self, SocketAddr};
use std::task::{Context, Poll};

cfg_net! {
/// A TCP socket server, listening for connections.
///
/// You can accept a new connection by using the [`accept`](`TcpListener::accept`) method. Alternatively `TcpListener`
/// implements the [`Stream`](`crate::stream::Stream`) trait, which allows you to use the listener in places that want a
/// stream. The stream will never return `None` and will also not yield the peer's `SocketAddr` structure. Iterating over
/// it is equivalent to calling accept in a loop.
///
/// # Errors
///
/// Note that accepting a connection can lead to various errors and not all
/// of them are necessarily fatal ‒ for example having too many open file
/// descriptors or the other side closing the connection while it waits in
/// an accept queue. These would terminate the stream if not handled in any
/// way.
///
/// # Examples
///
/// Using `accept`:
/// ```no_run
/// use tokio::net::TcpListener;
///
/// use std::io;
///
/// async fn process_socket<T>(socket: T) {
/// # drop(socket);
/// // do work with socket here
/// }
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// let (socket, _) = listener.accept().await?;
/// process_socket(socket).await;
/// }
/// }
/// ```
///
/// Using `impl Stream`:
/// ```no_run
/// use tokio::{net::TcpListener, stream::StreamExt};
///
/// #[tokio::main]
/// async fn main() {
/// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
/// while let Some(stream) = listener.next().await {
/// match stream {
/// Ok(stream) => {
/// println!("new client!");
/// }
/// Err(e) => { /* connection failed */ }
/// }
/// }
/// }
/// ```
pub struct TcpListener {
io: PollEvented<mio::net::TcpListener>,
}
/// A TCP socket server, listening for connections.
///
/// You can accept a new connection by using the [`accept`](`TcpListener::accept`) method. Alternatively `TcpListener`
/// implements the [`Stream`](`crate::stream::Stream`) trait, which allows you to use the listener in places that want a
/// stream. The stream will never return `None` and will also not yield the peer's `SocketAddr` structure. Iterating over
/// it is equivalent to calling accept in a loop.
///
/// # Errors
///
/// Note that accepting a connection can lead to various errors and not all
/// of them are necessarily fatal ‒ for example having too many open file
/// descriptors or the other side closing the connection while it waits in
/// an accept queue. These would terminate the stream if not handled in any
/// way.
///
/// # Examples
///
/// Using `accept`:
/// ```no_run
/// use tokio::net::TcpListener;
///
/// use std::io;
///
/// async fn process_socket<T>(socket: T) {
/// # drop(socket);
/// // do work with socket here
/// }
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// let (socket, _) = listener.accept().await?;
/// process_socket(socket).await;
/// }
/// }
/// ```
///
/// Using `impl Stream`:
/// ```no_run
/// use tokio::{net::TcpListener, stream::StreamExt};
///
/// #[tokio::main]
/// async fn main() {
/// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
/// while let Some(stream) = listener.next().await {
/// match stream {
/// Ok(stream) => {
/// println!("new client!");
/// }
/// Err(e) => { /* connection failed */ }
/// }
/// }
/// }
/// ```
pub struct TcpListener {
io: AsyncFd<mio::net::TcpListener>,
}

impl TcpListener {
Expand Down Expand Up @@ -162,13 +160,18 @@ impl TcpListener {
/// }
/// ```
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
let (mio, addr) = self
.io
.async_io(mio::Interest::READABLE, |sock| sock.accept())
.await?;
loop {
let mut ev = self.io.readable().await?;

let stream = TcpStream::new(mio)?;
Ok((stream, addr))
match ev.with_io(|| self.io.inner().accept()) {
Ok((mio, addr)) => {
let stream = TcpStream::new(mio)?;
break Ok((stream, addr));
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => break Err(e),
}
}
}

/// Polls to accept a new incoming connection to this listener.
Expand All @@ -181,15 +184,15 @@ impl TcpListener {
/// single task. Failing to do this could result in tasks hanging.
pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;
let mut ev = ready!(self.io.poll_read_ready(cx))?;

match self.io.get_ref().accept() {
match self.io.inner().accept() {
Ok((io, addr)) => {
let io = TcpStream::new(io)?;
return Poll::Ready(Ok((io, addr)));
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
ev.clear_ready();
}
Err(e) => return Poll::Ready(Err(e)),
}
Expand Down Expand Up @@ -232,12 +235,12 @@ impl TcpListener {
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> {
let io = mio::net::TcpListener::from_std(listener);
let io = PollEvented::new(io)?;
let io = AsyncFd::new(io)?;
Ok(TcpListener { io })
}

pub(crate) fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> {
let io = PollEvented::new(listener)?;
let io = AsyncFd::new(listener)?;
Ok(TcpListener { io })
}

Expand Down Expand Up @@ -265,7 +268,7 @@ impl TcpListener {
/// }
/// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().local_addr()
self.io.inner().local_addr()
}

/// Gets the value of the `IP_TTL` option for this socket.
Expand All @@ -292,7 +295,7 @@ impl TcpListener {
/// }
/// ```
pub fn ttl(&self) -> io::Result<u32> {
self.io.get_ref().ttl()
self.io.inner().ttl()
}

/// Sets the value for the `IP_TTL` option on this socket.
Expand All @@ -317,7 +320,7 @@ impl TcpListener {
/// }
/// ```
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.io.get_ref().set_ttl(ttl)
self.io.inner().set_ttl(ttl)
}
}

Expand Down Expand Up @@ -345,7 +348,7 @@ impl TryFrom<net::TcpListener> for TcpListener {

impl fmt::Debug for TcpListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.io.get_ref().fmt(f)
self.io.inner().fmt(f)
}
}

Expand All @@ -356,7 +359,7 @@ mod sys {

impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.io.get_ref().as_raw_fd()
self.io.inner().as_raw_fd()
}
}
}
Expand All @@ -368,7 +371,7 @@ mod sys {

impl AsRawSocket for TcpListener {
fn as_raw_socket(&self) -> RawSocket {
self.io.get_ref().as_raw_socket()
self.io.inner().as_raw_socket()
}
}
}
2 changes: 2 additions & 0 deletions tokio/tests/io_driver_drop.rs
Expand Up @@ -12,6 +12,7 @@ fn tcp_doesnt_block() {
let listener = {
let _enter = rt.enter();
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
listener.set_nonblocking(true).unwrap();
TcpListener::from_std(listener).unwrap()
};

Expand All @@ -31,6 +32,7 @@ fn drop_wakes() {
let listener = {
let _enter = rt.enter();
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
listener.set_nonblocking(true).unwrap();
TcpListener::from_std(listener).unwrap()
};

Expand Down

0 comments on commit 68b353a

Please sign in to comment.