Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio: Add back poll_* for udp #2981

Merged
merged 5 commits into from Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
259 changes: 258 additions & 1 deletion tokio/src/net/udp/socket.rs
@@ -1,10 +1,11 @@
use crate::io::PollEvented;
use crate::io::{PollEvented, ReadBuf};
use crate::net::{to_socket_addrs, ToSocketAddrs};

use std::convert::TryFrom;
use std::fmt;
use std::io;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::task::{Context, Poll};

cfg_net! {
/// A UDP socket
Expand Down Expand Up @@ -272,6 +273,42 @@ impl UdpSocket {
.await
}

/// Attempts to send data on the socket to the remote address to which it was previously
/// `connect`ed.
///
/// The [`connect`] method will connect this socket to a remote address. The future
/// will resolve to an error if the socket is not connected.
///
/// Note that on multiple calls to a `poll_*` method in the send direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not available to write
/// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`connect`]: method@Self::connect
pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
loop {
let ev = ready!(self.io.poll_write_ready(cx))?;

match self.io.get_ref().send(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
}
x => return Poll::Ready(x),
}
}
}

/// Try to send data on the socket to the remote address to which it is
/// connected.
///
Expand Down Expand Up @@ -304,6 +341,55 @@ impl UdpSocket {
.await
}

/// Attempts to receive a single datagram message on the socket from the remote
/// address to which it is `connect`ed.
///
/// The [`connect`] method will connect this socket to a remote address. The future
/// will resolve to an error if the socket is not connected.
leshow marked this conversation as resolved.
Show resolved Hide resolved
///
/// Note that on multiple calls to a `poll_*` method in the recv direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready to read
/// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`connect`]: method@Self::connect
pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;
leshow marked this conversation as resolved.
Show resolved Hide resolved

// Safety: will not read the maybe uinitialized bytes.
let b = unsafe {
&mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
};
match self.io.get_ref().recv(b) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
}
Err(e) => return Poll::Ready(Err(e)),
Ok(n) => {
// Safety: We trust `recv` to have filled up `n` bytes
// in the buffer.
unsafe {
buf.assume_init(n);
}
buf.advance(n);
return Poll::Ready(Ok(()));
}
}
}
}

/// Returns a future that sends data on the socket to the given address.
/// On success, the future will resolve to the number of bytes written.
///
Expand Down Expand Up @@ -337,6 +423,41 @@ impl UdpSocket {
}
}

/// Attempts to send data on the socket to a given address.
///
/// Note that on multiple calls to a `poll_*` method in the send direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready to write
/// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
pub fn poll_send_to(
&self,
cx: &mut Context<'_>,
buf: &[u8],
target: &SocketAddr,
) -> Poll<io::Result<usize>> {
loop {
let ev = ready!(self.io.poll_write_ready(cx))?;

match self.io.get_ref().send_to(buf, *target) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
}
x => return Poll::Ready(x),
}
}
}

/// Try to send data on the socket to the given address, but if the send is blocked
/// this will return right away.
///
Expand Down Expand Up @@ -403,6 +524,142 @@ impl UdpSocket {
.await
}

/// Attempts to receive a single datagram on the socket.
///
/// Note that on multiple calls to a `poll_*` method in the recv direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready to read
/// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
pub fn poll_recv_from(
&self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<SocketAddr>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;

// Safety: will not read the maybe uinitialized bytes.
let b = unsafe {
&mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
};
match self.io.get_ref().recv_from(b) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
}
Err(e) => return Poll::Ready(Err(e)),
Ok((n, addr)) => {
// Safety: We trust `recv` to have filled up `n` bytes
// in the buffer.
unsafe {
buf.assume_init(n);
}
buf.advance(n);
return Poll::Ready(Ok(addr));
}
}
}
}

/// Receives data from the socket, without removing it from the input queue.
/// On success, returns the number of bytes read and the address from whence
/// the data came.
///
/// # Notes
///
/// On Windows, if the data is larger than the buffer specified, the buffer
/// is filled with the first part of the data, and peek_from returns the error
/// WSAEMSGSIZE(10040). The excess data is lost.
/// Make sure to always use a sufficiently large buffer to hold the
/// maximum UDP packet size, which can be up to 65536 bytes in size.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UdpSocket;
/// # use std::{io, net::SocketAddr};
///
/// # #[tokio::main]
/// # async fn main() -> io::Result<()> {
/// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
/// let mut buf = [0u8; 32];
/// let (len, addr) = sock.peek_from(&mut buf).await?;
/// println!("peeked {:?} bytes from {:?}", len, addr);
/// # Ok(())
/// # }
/// ```
pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.io
.async_io(mio::Interest::READABLE, |sock| sock.peek_from(buf))
.await
}

/// Receives data from the socket, without removing it from the input queue.
/// On success, returns the number of bytes read.
///
/// # Notes
///
/// Note that on multiple calls to a `poll_*` method in the recv direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup
///
/// On Windows, if the data is larger than the buffer specified, the buffer
/// is filled with the first part of the data, and peek returns the error
/// WSAEMSGSIZE(10040). The excess data is lost.
/// Make sure to always use a sufficiently large buffer to hold the
/// maximum UDP packet size, which can be up to 65536 bytes in size.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready to read
/// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
pub fn poll_peek_from(
&self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<SocketAddr>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;

// Safety: will not read the maybe uinitialized bytes.
let b = unsafe {
&mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
};
match self.io.get_ref().peek_from(b) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
}
Err(e) => return Poll::Ready(Err(e)),
Ok((n, addr)) => {
// Safety: We trust `recv` to have filled up `n` bytes
// in the buffer.
unsafe {
buf.assume_init(n);
}
buf.advance(n);
return Poll::Ready(Ok(addr));
}
}
}
}

/// Gets the value of the `SO_BROADCAST` option for this socket.
///
/// For more information about this option, see [`set_broadcast`].
Expand Down