Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: use &self with TcpListener::accept #2919

Merged
merged 9 commits into from Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/chat.rs
Expand Up @@ -77,7 +77,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Bind a TCP listener to the socket address.
//
// Note that this is the Tokio TcpListener, which is fully async.
let mut listener = TcpListener::bind(&addr).await?;
let listener = TcpListener::bind(&addr).await?;

tracing::info!("server running on {}", addr);

Expand Down
2 changes: 1 addition & 1 deletion examples/echo.rs
Expand Up @@ -39,7 +39,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined
// above and must be associated with an event loop.
let mut listener = TcpListener::bind(&addr).await?;
let listener = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);

loop {
Expand Down
2 changes: 1 addition & 1 deletion examples/print_each_packet.rs
Expand Up @@ -74,7 +74,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// above and must be associated with an event loop, so we pass in a handle
// to our event loop. After the socket's created we inform that we're ready
// to go and start accepting connections.
let mut listener = TcpListener::bind(&addr).await?;
let listener = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);

loop {
Expand Down
2 changes: 1 addition & 1 deletion examples/proxy.rs
Expand Up @@ -43,7 +43,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);

let mut listener = TcpListener::bind(listen_addr).await?;
let listener = TcpListener::bind(listen_addr).await?;

while let Ok((inbound, _)) = listener.accept().await {
let transfer = transfer(inbound, server_addr.clone()).map(|r| {
Expand Down
2 changes: 1 addition & 1 deletion examples/tinydb.rs
Expand Up @@ -89,7 +89,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());

let mut listener = TcpListener::bind(&addr).await?;
let listener = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);

// Create the shared state of this server that will be shared amongst all
Expand Down
8 changes: 3 additions & 5 deletions examples/tinyhttp.rs
Expand Up @@ -30,19 +30,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
let mut server = TcpListener::bind(&addr).await?;
let mut incoming = server.incoming();
let server = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);

while let Some(Ok(stream)) = incoming.next().await {
loop {
let (stream, _) = server.accept().await?;
tokio::spawn(async move {
if let Err(e) = process(stream).await {
println!("failed to process connection; error = {}", e);
}
});
}

Ok(())
}

async fn process(stream: TcpStream) -> Result<(), Box<dyn Error>> {
Expand Down
67 changes: 56 additions & 11 deletions tokio/src/io/driver/scheduled_io.rs
Expand Up @@ -32,7 +32,7 @@ cfg_io_readiness! {

#[derive(Debug, Default)]
struct Waiters {
#[cfg(any(feature = "udp", feature = "uds"))]
#[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))]
/// List of all current waiters
list: WaitList,

Expand Down Expand Up @@ -186,33 +186,78 @@ impl ScheduledIo {
}
}

/// Notifies all pending waiters that have registered interest in `ready`.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes here solve a potential deadlock during shutdown. Waking a task must be done from outside of the lock or it could result in a deadlock due to an attempt to re-enter the lock.

///
/// There may be many waiters to notify. Waking the pending task **must** be
/// done from outside of the lock otherwise there is a potential for a
/// deadlock.
///
/// A stack array of wakers is created and filled with wakers to notify, the
/// lock is released, and the wakers are notified. Because there may be more
/// than 32 wakers to notify, if the stack array fills up, the lock is
/// released, the array is cleared, and the iteration continues.
pub(super) fn wake(&self, ready: Ready) {
const NUM_WAKERS: usize = 32;

let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
let mut curr = 0;

let mut waiters = self.waiters.lock();

// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
waker.wake();
wakers[curr] = Some(waker);
curr += 1;
}
}

// check for AsyncWrite slot
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
waker.wake();
wakers[curr] = Some(waker);
curr += 1;
}
}

#[cfg(any(feature = "udp", feature = "uds"))]
{
// check list of waiters
for waiter in waiters.list.drain_filter(|w| ready.satisfies(w.interest)) {
let waiter = unsafe { &mut *waiter.as_ptr() };
if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
waker.wake();
#[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))]
'outer: loop {
let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));

while curr < NUM_WAKERS {
match iter.next() {
Some(waiter) => {
let waiter = unsafe { &mut *waiter.as_ptr() };

if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
wakers[curr] = Some(waker);
curr += 1;
}
}
None => {
break 'outer;
}
}
}

drop(waiters);

for waker in wakers.iter_mut().take(curr) {
waker.take().unwrap().wake();
}

curr = 0;

// Acquire the lock again.
waiters = self.waiters.lock();
}

// Release the lock before notifying
drop(waiters);

for waker in wakers.iter_mut().take(curr) {
waker.take().unwrap().wake();
}
}

Expand Down
15 changes: 13 additions & 2 deletions tokio/src/io/registration.rs
Expand Up @@ -132,8 +132,19 @@ impl Registration {
cfg_io_readiness! {
impl Registration {
pub(super) async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyEvent> {
// TODO: does this need to return a `Result`?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out that it does need to return a result :)

This implementation is not great but I believe the shutdown logic needs to be revisited post 0.3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tracked my thoughts in #2924

Ok(self.shared.readiness(interest).await)
use std::future::Future;
use std::pin::Pin;

let fut = self.shared.readiness(interest);
pin!(fut);

crate::future::poll_fn(|cx| {
if self.handle.inner().is_none() {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
}

Pin::new(&mut fut).poll(cx).map(Ok)
}).await
}
}
}
2 changes: 1 addition & 1 deletion tokio/src/lib.rs
Expand Up @@ -306,7 +306,7 @@
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
//! let listener = TcpListener::bind("127.0.0.1:8080").await?;
//!
//! loop {
//! let (mut socket, _) = listener.accept().await?;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/macros/cfg.rs
Expand Up @@ -176,7 +176,7 @@ macro_rules! cfg_not_io_driver {
macro_rules! cfg_io_readiness {
($($item:item)*) => {
$(
#[cfg(any(feature = "udp", feature = "uds"))]
#[cfg(any(feature = "udp", feature = "uds", feature = "tcp"))]
$item
)*
}
Expand Down
42 changes: 0 additions & 42 deletions tokio/src/net/tcp/incoming.rs

This file was deleted.

75 changes: 19 additions & 56 deletions tokio/src/net/tcp/listener.rs
@@ -1,6 +1,5 @@
use crate::future::poll_fn;
use crate::io::PollEvented;
use crate::net::tcp::{Incoming, TcpStream};
use crate::net::tcp::TcpStream;
use crate::net::{to_socket_addrs, ToSocketAddrs};

use std::convert::TryFrom;
Expand Down Expand Up @@ -40,7 +39,7 @@ cfg_tcp! {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// let (socket, _) = listener.accept().await?;
Expand Down Expand Up @@ -171,7 +170,7 @@ impl TcpListener {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// match listener.accept().await {
/// Ok((_socket, addr)) => println!("new client: {:?}", addr),
Expand All @@ -181,18 +180,25 @@ impl TcpListener {
/// Ok(())
/// }
/// ```
pub async fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
poll_fn(|cx| self.poll_accept(cx)).await
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
let (mio, addr) = self
.io
.async_io(mio::Interest::READABLE, |sock| sock.accept())
.await?;

let stream = TcpStream::new(mio)?;
Ok((stream, addr))
}

/// Polls to accept a new incoming connection to this listener.
///
/// If there is no connection to accept, `Poll::Pending` is returned and
/// the current task will be notified by a waker.
pub fn poll_accept(
&mut self,
cx: &mut Context<'_>,
) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
/// If there is no connection to accept, `Poll::Pending` is returned and the
/// current task will be notified by a waker.
///
/// When ready, the most recent task that called `poll_accept` is notified.
/// The caller is responsble to ensure that `poll_accept` is called from a
/// single task. Failing to do this could result in tasks hanging.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should eventually include language that poll_accept should only be used for 1 task.

pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;

Expand Down Expand Up @@ -293,46 +299,6 @@ impl TcpListener {
self.io.get_ref().local_addr()
}

/// Returns a stream over the connections being received on this listener.
///
/// Note that `TcpListener` also directly implements `Stream`.
///
/// The returned stream will never return `None` and will also not yield the
/// peer's `SocketAddr` structure. Iterating over it is equivalent to
/// calling accept in a loop.
///
/// # Errors
///
/// Note that accepting a connection can lead to various errors and not all
/// of them are necessarily fatal ‒ for example having too many open file
/// descriptors or the other side closing the connection while it waits in
/// an accept queue. These would terminate the stream if not handled in any
/// way.
///
/// # Examples
///
/// ```no_run
/// use tokio::{net::TcpListener, stream::StreamExt};
///
/// #[tokio::main]
/// async fn main() {
/// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
/// let mut incoming = listener.incoming();
///
/// while let Some(stream) = incoming.next().await {
/// match stream {
/// Ok(stream) => {
/// println!("new client!");
/// }
/// Err(e) => { /* connection failed */ }
/// }
/// }
/// }
/// ```
pub fn incoming(&mut self) -> Incoming<'_> {
Incoming::new(self)
}

/// Gets the value of the `IP_TTL` option for this socket.
///
/// For more information about this option, see [`set_ttl`].
Expand Down Expand Up @@ -390,10 +356,7 @@ impl TcpListener {
impl crate::stream::Stream for TcpListener {
type Item = io::Result<TcpStream>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (socket, _) = ready!(self.poll_accept(cx))?;
Poll::Ready(Some(Ok(socket)))
}
Expand Down
4 changes: 0 additions & 4 deletions tokio/src/net/tcp/mod.rs
@@ -1,10 +1,6 @@
//! TCP utility types

pub(crate) mod listener;
pub(crate) use listener::TcpListener;

mod incoming;
pub use incoming::Incoming;

pub(crate) mod socket;

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -22,8 +22,8 @@ cfg_tcp! {
/// traits. Examples import these traits through [the prelude].
///
/// [`connect`]: method@TcpStream::connect
/// [accepting]: method@super::TcpListener::accept
/// [listener]: struct@super::TcpListener
/// [accepting]: method@crate::net::TcpListener::accept
/// [listener]: struct@crate::net::TcpListener
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
/// [the prelude]: crate::prelude
Expand Down