Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: switch socket methods to &self #2934

Merged
merged 3 commits into from Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio/src/net/tcp/split.rs
Expand Up @@ -81,7 +81,7 @@ impl ReadHalf<'_> {
///
/// [`TcpStream::poll_peek`]: TcpStream::poll_peek
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.0.poll_peek2(cx, buf)
self.0.poll_peek(cx, buf)
}

/// Receives data on the socket from the remote address to which it is
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/tcp/split_owned.rs
Expand Up @@ -136,7 +136,7 @@ impl OwnedReadHalf {
///
/// [`TcpStream::poll_peek`]: TcpStream::poll_peek
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.inner.poll_peek2(cx, buf)
self.inner.poll_peek(cx, buf)
}

/// Receives data on the socket from the remote address to which it is
Expand Down
18 changes: 6 additions & 12 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -257,7 +257,7 @@ impl TcpStream {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut stream = TcpStream::connect("127.0.0.1:8000").await?;
/// let stream = TcpStream::connect("127.0.0.1:8000").await?;
/// let mut buf = [0; 10];
///
/// poll_fn(|cx| {
Expand All @@ -267,15 +267,7 @@ impl TcpStream {
/// Ok(())
/// }
/// ```
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.poll_peek2(cx, buf)
}

pub(super) fn poll_peek2(
&self,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
pub fn poll_peek(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;

Expand Down Expand Up @@ -326,8 +318,10 @@ impl TcpStream {
///
/// [`read`]: fn@crate::io::AsyncReadExt::read
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| self.poll_peek(cx, buf)).await
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io
.async_io(mio::Interest::READABLE, |io| io.peek(buf))
.await
}

/// Shuts down the read, write, or both halves of this connection.
Expand Down
20 changes: 10 additions & 10 deletions tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -314,7 +314,7 @@ impl UnixDatagram {
/// let bytes = b"bytes";
/// // We use a socket pair so that they are assigned
/// // each other as a peer.
/// let (mut first, mut second) = UnixDatagram::pair()?;
/// let (first, second) = UnixDatagram::pair()?;
///
/// let size = first.try_send(bytes)?;
/// assert_eq!(size, bytes.len());
Expand All @@ -327,7 +327,7 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
pub fn try_send(&mut self, buf: &[u8]) -> io::Result<usize> {
pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
self.io.get_ref().send(buf)
}

Expand All @@ -346,10 +346,10 @@ impl UnixDatagram {
/// let tmp = tempdir().unwrap();
///
/// let server_path = tmp.path().join("server");
/// let mut server = UnixDatagram::bind(&server_path)?;
/// let server = UnixDatagram::bind(&server_path)?;
///
/// let client_path = tmp.path().join("client");
/// let mut client = UnixDatagram::bind(&client_path)?;
/// let client = UnixDatagram::bind(&client_path)?;
///
/// let size = client.try_send_to(bytes, &server_path)?;
/// assert_eq!(size, bytes.len());
Expand All @@ -363,7 +363,7 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
pub fn try_send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
where
P: AsRef<Path>,
{
Expand Down Expand Up @@ -413,7 +413,7 @@ impl UnixDatagram {
/// let bytes = b"bytes";
/// // We use a socket pair so that they are assigned
/// // each other as a peer.
/// let (mut first, mut second) = UnixDatagram::pair()?;
/// let (first, second) = UnixDatagram::pair()?;
///
/// let size = first.try_send(bytes)?;
/// assert_eq!(size, bytes.len());
Expand All @@ -426,7 +426,7 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
pub fn try_recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io.get_ref().recv(buf)
}

Expand Down Expand Up @@ -531,10 +531,10 @@ impl UnixDatagram {
/// let tmp = tempdir().unwrap();
///
/// let server_path = tmp.path().join("server");
/// let mut server = UnixDatagram::bind(&server_path)?;
/// let server = UnixDatagram::bind(&server_path)?;
///
/// let client_path = tmp.path().join("client");
/// let mut client = UnixDatagram::bind(&client_path)?;
/// let client = UnixDatagram::bind(&client_path)?;
///
/// let size = client.try_send_to(bytes, &server_path)?;
/// assert_eq!(size, bytes.len());
Expand All @@ -548,7 +548,7 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
pub fn try_recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
let (n, addr) = self.io.get_ref().recv_from(buf)?;
Ok((n, SocketAddr(addr)))
}
Expand Down
42 changes: 0 additions & 42 deletions tokio/src/net/unix/incoming.rs

This file was deleted.

71 changes: 17 additions & 54 deletions tokio/src/net/unix/listener.rs
@@ -1,6 +1,5 @@
use crate::future::poll_fn;
use crate::io::PollEvented;
use crate::net::unix::{Incoming, SocketAddr, UnixStream};
use crate::net::unix::{SocketAddr, UnixStream};

use std::convert::TryFrom;
use std::fmt;
Expand Down Expand Up @@ -99,18 +98,26 @@ impl UnixListener {
}

/// Accepts a new incoming connection to this listener.
pub async fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> {
poll_fn(|cx| self.poll_accept(cx)).await
pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
let (mio, addr) = self
.io
.async_io(mio::Interest::READABLE, |sock| sock.accept())
.await?;

let addr = SocketAddr(addr);
let stream = UnixStream::new(mio)?;
Ok((stream, addr))
}

/// Polls to accept a new incoming connection to this listener.
///
/// If there is no connection to accept, `Poll::Pending` is returned and
/// the current task will be notified by a waker.
pub fn poll_accept(
&mut self,
cx: &mut Context<'_>,
) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
/// the current task will be notified by a waker.
///
/// When ready, the most recent task that called `poll_accept` is notified.
/// The caller is responsble to ensure that `poll_accept` is called from a
/// single task. Failing to do this could result in tasks hanging.
pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;

Expand All @@ -127,57 +134,13 @@ impl UnixListener {
}
}
}

/// Returns a stream over the connections being received on this listener.
///
/// Note that `UnixListener` also directly implements `Stream`.
///
/// The returned stream will never return `None` and will also not yield the
/// peer's `SocketAddr` structure. Iterating over it is equivalent to
/// calling accept in a loop.
///
/// # Errors
///
/// Note that accepting a connection can lead to various errors and not all
/// of them are necessarily fatal ‒ for example having too many open file
/// descriptors or the other side closing the connection while it waits in
/// an accept queue. These would terminate the stream if not handled in any
/// way.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UnixListener;
/// use tokio::stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() {
/// let mut listener = UnixListener::bind("/path/to/the/socket").unwrap();
/// let mut incoming = listener.incoming();
///
/// while let Some(stream) = incoming.next().await {
/// match stream {
/// Ok(stream) => {
/// println!("new client!");
/// }
/// Err(e) => { /* connection failed */ }
/// }
/// }
/// }
/// ```
pub fn incoming(&mut self) -> Incoming<'_> {
Incoming::new(self)
}
}

#[cfg(feature = "stream")]
impl crate::stream::Stream for UnixListener {
type Item = io::Result<UnixStream>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (socket, _) = ready!(self.poll_accept(cx))?;
Poll::Ready(Some(Ok(socket)))
}
Expand Down
4 changes: 0 additions & 4 deletions tokio/src/net/unix/mod.rs
Expand Up @@ -2,11 +2,7 @@

pub mod datagram;

mod incoming;
pub use incoming::Incoming;

pub(crate) mod listener;
pub(crate) use listener::UnixListener;

mod split;
pub use split::{ReadHalf, WriteHalf};
Expand Down
2 changes: 1 addition & 1 deletion tokio/tests/uds_datagram.rs
Expand Up @@ -78,7 +78,7 @@ async fn try_send_recv_never_block() -> io::Result<()> {
let payload = b"PAYLOAD";
let mut count = 0;

let (mut dgram1, mut dgram2) = UnixDatagram::pair()?;
let (dgram1, dgram2) = UnixDatagram::pair()?;

// Send until we hit the OS `net.unix.max_dgram_qlen`.
loop {
Expand Down
4 changes: 2 additions & 2 deletions tokio/tests/uds_stream.rs
Expand Up @@ -15,7 +15,7 @@ async fn accept_read_write() -> std::io::Result<()> {
.unwrap();
let sock_path = dir.path().join("connect.sock");

let mut listener = UnixListener::bind(&sock_path)?;
let listener = UnixListener::bind(&sock_path)?;

let accept = listener.accept();
let connect = UnixStream::connect(&sock_path);
Expand All @@ -42,7 +42,7 @@ async fn shutdown() -> std::io::Result<()> {
.unwrap();
let sock_path = dir.path().join("connect.sock");

let mut listener = UnixListener::bind(&sock_path)?;
let listener = UnixListener::bind(&sock_path)?;

let accept = listener.accept();
let connect = UnixStream::connect(&sock_path);
Expand Down