diff --git a/tokio/src/net/tcp/split.rs b/tokio/src/net/tcp/split.rs index 6e927f05855..9a257f8b8ad 100644 --- a/tokio/src/net/tcp/split.rs +++ b/tokio/src/net/tcp/split.rs @@ -81,7 +81,7 @@ impl ReadHalf<'_> { /// /// [`TcpStream::poll_peek`]: TcpStream::poll_peek pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - self.0.poll_peek2(cx, buf) + self.0.poll_peek(cx, buf) } /// Receives data on the socket from the remote address to which it is diff --git a/tokio/src/net/tcp/split_owned.rs b/tokio/src/net/tcp/split_owned.rs index 2f35f495ca2..4b4e26362e1 100644 --- a/tokio/src/net/tcp/split_owned.rs +++ b/tokio/src/net/tcp/split_owned.rs @@ -136,7 +136,7 @@ impl OwnedReadHalf { /// /// [`TcpStream::poll_peek`]: TcpStream::poll_peek pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - self.inner.poll_peek2(cx, buf) + self.inner.poll_peek(cx, buf) } /// Receives data on the socket from the remote address to which it is diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 3f9d667077d..ee24ee32e98 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -257,7 +257,7 @@ impl TcpStream { /// /// #[tokio::main] /// async fn main() -> io::Result<()> { - /// let mut stream = TcpStream::connect("127.0.0.1:8000").await?; + /// let stream = TcpStream::connect("127.0.0.1:8000").await?; /// let mut buf = [0; 10]; /// /// poll_fn(|cx| { @@ -267,15 +267,7 @@ impl TcpStream { /// Ok(()) /// } /// ``` - pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - self.poll_peek2(cx, buf) - } - - pub(super) fn poll_peek2( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + pub fn poll_peek(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { loop { let ev = ready!(self.io.poll_read_ready(cx))?; @@ -326,8 +318,10 @@ impl TcpStream { /// /// [`read`]: fn@crate::io::AsyncReadExt::read /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt - pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result { - poll_fn(|cx| self.poll_peek(cx, buf)).await + pub async fn peek(&self, buf: &mut [u8]) -> io::Result { + self.io + .async_io(mio::Interest::READABLE, |io| io.peek(buf)) + .await } /// Shuts down the read, write, or both halves of this connection. diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index 20c6c2272ef..5dcda6c973c 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -314,7 +314,7 @@ impl UnixDatagram { /// let bytes = b"bytes"; /// // We use a socket pair so that they are assigned /// // each other as a peer. - /// let (mut first, mut second) = UnixDatagram::pair()?; + /// let (first, second) = UnixDatagram::pair()?; /// /// let size = first.try_send(bytes)?; /// assert_eq!(size, bytes.len()); @@ -327,7 +327,7 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub fn try_send(&mut self, buf: &[u8]) -> io::Result { + pub fn try_send(&self, buf: &[u8]) -> io::Result { self.io.get_ref().send(buf) } @@ -346,10 +346,10 @@ impl UnixDatagram { /// let tmp = tempdir().unwrap(); /// /// let server_path = tmp.path().join("server"); - /// let mut server = UnixDatagram::bind(&server_path)?; + /// let server = UnixDatagram::bind(&server_path)?; /// /// let client_path = tmp.path().join("client"); - /// let mut client = UnixDatagram::bind(&client_path)?; + /// let client = UnixDatagram::bind(&client_path)?; /// /// let size = client.try_send_to(bytes, &server_path)?; /// assert_eq!(size, bytes.len()); @@ -363,7 +363,7 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub fn try_send_to

(&mut self, buf: &[u8], target: P) -> io::Result + pub fn try_send_to

(&self, buf: &[u8], target: P) -> io::Result where P: AsRef, { @@ -413,7 +413,7 @@ impl UnixDatagram { /// let bytes = b"bytes"; /// // We use a socket pair so that they are assigned /// // each other as a peer. - /// let (mut first, mut second) = UnixDatagram::pair()?; + /// let (first, second) = UnixDatagram::pair()?; /// /// let size = first.try_send(bytes)?; /// assert_eq!(size, bytes.len()); @@ -426,7 +426,7 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub fn try_recv(&mut self, buf: &mut [u8]) -> io::Result { + pub fn try_recv(&self, buf: &mut [u8]) -> io::Result { self.io.get_ref().recv(buf) } @@ -531,10 +531,10 @@ impl UnixDatagram { /// let tmp = tempdir().unwrap(); /// /// let server_path = tmp.path().join("server"); - /// let mut server = UnixDatagram::bind(&server_path)?; + /// let server = UnixDatagram::bind(&server_path)?; /// /// let client_path = tmp.path().join("client"); - /// let mut client = UnixDatagram::bind(&client_path)?; + /// let client = UnixDatagram::bind(&client_path)?; /// /// let size = client.try_send_to(bytes, &server_path)?; /// assert_eq!(size, bytes.len()); @@ -548,7 +548,7 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub fn try_recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { let (n, addr) = self.io.get_ref().recv_from(buf)?; Ok((n, SocketAddr(addr))) } diff --git a/tokio/src/net/unix/incoming.rs b/tokio/src/net/unix/incoming.rs deleted file mode 100644 index af493604353..00000000000 --- a/tokio/src/net/unix/incoming.rs +++ /dev/null @@ -1,42 +0,0 @@ -use crate::net::unix::{UnixListener, UnixStream}; - -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Stream of listeners -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Incoming<'a> { - inner: &'a mut UnixListener, -} - -impl Incoming<'_> { - pub(crate) fn new(listener: &mut UnixListener) -> Incoming<'_> { - Incoming { inner: listener } - } - - /// Attempts to poll `UnixStream` by polling inner `UnixListener` to accept - /// connection. - /// - /// If `UnixListener` isn't ready yet, `Poll::Pending` is returned and - /// current task will be notified by a waker. Otherwise `Poll::Ready` with - /// `Result` containing `UnixStream` will be returned. - pub fn poll_accept( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let (socket, _) = ready!(self.inner.poll_accept(cx))?; - Poll::Ready(Ok(socket)) - } -} - -#[cfg(feature = "stream")] -impl crate::stream::Stream for Incoming<'_> { - type Item = io::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let (socket, _) = ready!(self.inner.poll_accept(cx))?; - Poll::Ready(Some(Ok(socket))) - } -} diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 5d586ec3509..fb85da9f9df 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -1,6 +1,5 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::unix::{Incoming, SocketAddr, UnixStream}; +use crate::net::unix::{SocketAddr, UnixStream}; use std::convert::TryFrom; use std::fmt; @@ -99,18 +98,26 @@ impl UnixListener { } /// Accepts a new incoming connection to this listener. - pub async fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> { - poll_fn(|cx| self.poll_accept(cx)).await + pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { + let (mio, addr) = self + .io + .async_io(mio::Interest::READABLE, |sock| sock.accept()) + .await?; + + let addr = SocketAddr(addr); + let stream = UnixStream::new(mio)?; + Ok((stream, addr)) } /// Polls to accept a new incoming connection to this listener. /// /// If there is no connection to accept, `Poll::Pending` is returned and - /// the current task will be notified by a waker. - pub fn poll_accept( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { + /// the current task will be notified by a waker. + /// + /// When ready, the most recent task that called `poll_accept` is notified. + /// The caller is responsble to ensure that `poll_accept` is called from a + /// single task. Failing to do this could result in tasks hanging. + pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { loop { let ev = ready!(self.io.poll_read_ready(cx))?; @@ -127,57 +134,13 @@ impl UnixListener { } } } - - /// Returns a stream over the connections being received on this listener. - /// - /// Note that `UnixListener` also directly implements `Stream`. - /// - /// The returned stream will never return `None` and will also not yield the - /// peer's `SocketAddr` structure. Iterating over it is equivalent to - /// calling accept in a loop. - /// - /// # Errors - /// - /// Note that accepting a connection can lead to various errors and not all - /// of them are necessarily fatal ‒ for example having too many open file - /// descriptors or the other side closing the connection while it waits in - /// an accept queue. These would terminate the stream if not handled in any - /// way. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::UnixListener; - /// use tokio::stream::StreamExt; - /// - /// #[tokio::main] - /// async fn main() { - /// let mut listener = UnixListener::bind("/path/to/the/socket").unwrap(); - /// let mut incoming = listener.incoming(); - /// - /// while let Some(stream) = incoming.next().await { - /// match stream { - /// Ok(stream) => { - /// println!("new client!"); - /// } - /// Err(e) => { /* connection failed */ } - /// } - /// } - /// } - /// ``` - pub fn incoming(&mut self) -> Incoming<'_> { - Incoming::new(self) - } } #[cfg(feature = "stream")] impl crate::stream::Stream for UnixListener { type Item = io::Result; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let (socket, _) = ready!(self.poll_accept(cx))?; Poll::Ready(Some(Ok(socket))) } diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs index 21aa4fe7f2d..19ee34a54ea 100644 --- a/tokio/src/net/unix/mod.rs +++ b/tokio/src/net/unix/mod.rs @@ -2,11 +2,7 @@ pub mod datagram; -mod incoming; -pub use incoming::Incoming; - pub(crate) mod listener; -pub(crate) use listener::UnixListener; mod split; pub use split::{ReadHalf, WriteHalf}; diff --git a/tokio/tests/uds_datagram.rs b/tokio/tests/uds_datagram.rs index 18dfcca028f..ec2f6f824b4 100644 --- a/tokio/tests/uds_datagram.rs +++ b/tokio/tests/uds_datagram.rs @@ -78,7 +78,7 @@ async fn try_send_recv_never_block() -> io::Result<()> { let payload = b"PAYLOAD"; let mut count = 0; - let (mut dgram1, mut dgram2) = UnixDatagram::pair()?; + let (dgram1, dgram2) = UnixDatagram::pair()?; // Send until we hit the OS `net.unix.max_dgram_qlen`. loop { diff --git a/tokio/tests/uds_stream.rs b/tokio/tests/uds_stream.rs index 29f118a2d48..cd557e54ad7 100644 --- a/tokio/tests/uds_stream.rs +++ b/tokio/tests/uds_stream.rs @@ -15,7 +15,7 @@ async fn accept_read_write() -> std::io::Result<()> { .unwrap(); let sock_path = dir.path().join("connect.sock"); - let mut listener = UnixListener::bind(&sock_path)?; + let listener = UnixListener::bind(&sock_path)?; let accept = listener.accept(); let connect = UnixStream::connect(&sock_path); @@ -42,7 +42,7 @@ async fn shutdown() -> std::io::Result<()> { .unwrap(); let sock_path = dir.path().join("connect.sock"); - let mut listener = UnixListener::bind(&sock_path)?; + let listener = UnixListener::bind(&sock_path)?; let accept = listener.accept(); let connect = UnixStream::connect(&sock_path);