Skip to content

Commit

Permalink
net: switch socket methods to &self (#2934)
Browse files Browse the repository at this point in the history
Switches various socket methods from &mut self to &self. This uses the intrusive
waker infrastructure to handle multiple waiters.

Refs: #2928
  • Loading branch information
carllerche committed Oct 9, 2020
1 parent 41ac1ae commit ee59734
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 127 deletions.
2 changes: 1 addition & 1 deletion tokio/src/net/tcp/split.rs
Expand Up @@ -81,7 +81,7 @@ impl ReadHalf<'_> {
///
/// [`TcpStream::poll_peek`]: TcpStream::poll_peek
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.0.poll_peek2(cx, buf)
self.0.poll_peek(cx, buf)
}

/// Receives data on the socket from the remote address to which it is
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/tcp/split_owned.rs
Expand Up @@ -136,7 +136,7 @@ impl OwnedReadHalf {
///
/// [`TcpStream::poll_peek`]: TcpStream::poll_peek
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.inner.poll_peek2(cx, buf)
self.inner.poll_peek(cx, buf)
}

/// Receives data on the socket from the remote address to which it is
Expand Down
18 changes: 6 additions & 12 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -257,7 +257,7 @@ impl TcpStream {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut stream = TcpStream::connect("127.0.0.1:8000").await?;
/// let stream = TcpStream::connect("127.0.0.1:8000").await?;
/// let mut buf = [0; 10];
///
/// poll_fn(|cx| {
Expand All @@ -267,15 +267,7 @@ impl TcpStream {
/// Ok(())
/// }
/// ```
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.poll_peek2(cx, buf)
}

pub(super) fn poll_peek2(
&self,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
pub fn poll_peek(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;

Expand Down Expand Up @@ -326,8 +318,10 @@ impl TcpStream {
///
/// [`read`]: fn@crate::io::AsyncReadExt::read
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| self.poll_peek(cx, buf)).await
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io
.async_io(mio::Interest::READABLE, |io| io.peek(buf))
.await
}

/// Shuts down the read, write, or both halves of this connection.
Expand Down
20 changes: 10 additions & 10 deletions tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -314,7 +314,7 @@ impl UnixDatagram {
/// let bytes = b"bytes";
/// // We use a socket pair so that they are assigned
/// // each other as a peer.
/// let (mut first, mut second) = UnixDatagram::pair()?;
/// let (first, second) = UnixDatagram::pair()?;
///
/// let size = first.try_send(bytes)?;
/// assert_eq!(size, bytes.len());
Expand All @@ -327,7 +327,7 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
pub fn try_send(&mut self, buf: &[u8]) -> io::Result<usize> {
pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
self.io.get_ref().send(buf)
}

Expand All @@ -346,10 +346,10 @@ impl UnixDatagram {
/// let tmp = tempdir().unwrap();
///
/// let server_path = tmp.path().join("server");
/// let mut server = UnixDatagram::bind(&server_path)?;
/// let server = UnixDatagram::bind(&server_path)?;
///
/// let client_path = tmp.path().join("client");
/// let mut client = UnixDatagram::bind(&client_path)?;
/// let client = UnixDatagram::bind(&client_path)?;
///
/// let size = client.try_send_to(bytes, &server_path)?;
/// assert_eq!(size, bytes.len());
Expand All @@ -363,7 +363,7 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
pub fn try_send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
where
P: AsRef<Path>,
{
Expand Down Expand Up @@ -413,7 +413,7 @@ impl UnixDatagram {
/// let bytes = b"bytes";
/// // We use a socket pair so that they are assigned
/// // each other as a peer.
/// let (mut first, mut second) = UnixDatagram::pair()?;
/// let (first, second) = UnixDatagram::pair()?;
///
/// let size = first.try_send(bytes)?;
/// assert_eq!(size, bytes.len());
Expand All @@ -426,7 +426,7 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
pub fn try_recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io.get_ref().recv(buf)
}

Expand Down Expand Up @@ -531,10 +531,10 @@ impl UnixDatagram {
/// let tmp = tempdir().unwrap();
///
/// let server_path = tmp.path().join("server");
/// let mut server = UnixDatagram::bind(&server_path)?;
/// let server = UnixDatagram::bind(&server_path)?;
///
/// let client_path = tmp.path().join("client");
/// let mut client = UnixDatagram::bind(&client_path)?;
/// let client = UnixDatagram::bind(&client_path)?;
///
/// let size = client.try_send_to(bytes, &server_path)?;
/// assert_eq!(size, bytes.len());
Expand All @@ -548,7 +548,7 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
pub fn try_recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
let (n, addr) = self.io.get_ref().recv_from(buf)?;
Ok((n, SocketAddr(addr)))
}
Expand Down
42 changes: 0 additions & 42 deletions tokio/src/net/unix/incoming.rs

This file was deleted.

71 changes: 17 additions & 54 deletions tokio/src/net/unix/listener.rs
@@ -1,6 +1,5 @@
use crate::future::poll_fn;
use crate::io::PollEvented;
use crate::net::unix::{Incoming, SocketAddr, UnixStream};
use crate::net::unix::{SocketAddr, UnixStream};

use std::convert::TryFrom;
use std::fmt;
Expand Down Expand Up @@ -99,18 +98,26 @@ impl UnixListener {
}

/// Accepts a new incoming connection to this listener.
pub async fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> {
poll_fn(|cx| self.poll_accept(cx)).await
pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
let (mio, addr) = self
.io
.async_io(mio::Interest::READABLE, |sock| sock.accept())
.await?;

let addr = SocketAddr(addr);
let stream = UnixStream::new(mio)?;
Ok((stream, addr))
}

/// Polls to accept a new incoming connection to this listener.
///
/// If there is no connection to accept, `Poll::Pending` is returned and
/// the current task will be notified by a waker.
pub fn poll_accept(
&mut self,
cx: &mut Context<'_>,
) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
/// the current task will be notified by a waker.
///
/// When ready, the most recent task that called `poll_accept` is notified.
/// The caller is responsble to ensure that `poll_accept` is called from a
/// single task. Failing to do this could result in tasks hanging.
pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;

Expand All @@ -127,57 +134,13 @@ impl UnixListener {
}
}
}

/// Returns a stream over the connections being received on this listener.
///
/// Note that `UnixListener` also directly implements `Stream`.
///
/// The returned stream will never return `None` and will also not yield the
/// peer's `SocketAddr` structure. Iterating over it is equivalent to
/// calling accept in a loop.
///
/// # Errors
///
/// Note that accepting a connection can lead to various errors and not all
/// of them are necessarily fatal ‒ for example having too many open file
/// descriptors or the other side closing the connection while it waits in
/// an accept queue. These would terminate the stream if not handled in any
/// way.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UnixListener;
/// use tokio::stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() {
/// let mut listener = UnixListener::bind("/path/to/the/socket").unwrap();
/// let mut incoming = listener.incoming();
///
/// while let Some(stream) = incoming.next().await {
/// match stream {
/// Ok(stream) => {
/// println!("new client!");
/// }
/// Err(e) => { /* connection failed */ }
/// }
/// }
/// }
/// ```
pub fn incoming(&mut self) -> Incoming<'_> {
Incoming::new(self)
}
}

#[cfg(feature = "stream")]
impl crate::stream::Stream for UnixListener {
type Item = io::Result<UnixStream>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (socket, _) = ready!(self.poll_accept(cx))?;
Poll::Ready(Some(Ok(socket)))
}
Expand Down
4 changes: 0 additions & 4 deletions tokio/src/net/unix/mod.rs
Expand Up @@ -2,11 +2,7 @@

pub mod datagram;

mod incoming;
pub use incoming::Incoming;

pub(crate) mod listener;
pub(crate) use listener::UnixListener;

mod split;
pub use split::{ReadHalf, WriteHalf};
Expand Down
2 changes: 1 addition & 1 deletion tokio/tests/uds_datagram.rs
Expand Up @@ -78,7 +78,7 @@ async fn try_send_recv_never_block() -> io::Result<()> {
let payload = b"PAYLOAD";
let mut count = 0;

let (mut dgram1, mut dgram2) = UnixDatagram::pair()?;
let (dgram1, dgram2) = UnixDatagram::pair()?;

// Send until we hit the OS `net.unix.max_dgram_qlen`.
loop {
Expand Down
4 changes: 2 additions & 2 deletions tokio/tests/uds_stream.rs
Expand Up @@ -15,7 +15,7 @@ async fn accept_read_write() -> std::io::Result<()> {
.unwrap();
let sock_path = dir.path().join("connect.sock");

let mut listener = UnixListener::bind(&sock_path)?;
let listener = UnixListener::bind(&sock_path)?;

let accept = listener.accept();
let connect = UnixStream::connect(&sock_path);
Expand All @@ -42,7 +42,7 @@ async fn shutdown() -> std::io::Result<()> {
.unwrap();
let sock_path = dir.path().join("connect.sock");

let mut listener = UnixListener::bind(&sock_path)?;
let listener = UnixListener::bind(&sock_path)?;

let accept = listener.accept();
let connect = UnixStream::connect(&sock_path);
Expand Down

0 comments on commit ee59734

Please sign in to comment.