Skip to content

Commit

Permalink
tokio: add back poll_* for udp (#2981)
Browse files Browse the repository at this point in the history
  • Loading branch information
leshow committed Oct 22, 2020
1 parent adf822f commit 358e4f9
Show file tree
Hide file tree
Showing 2 changed files with 388 additions and 2 deletions.
259 changes: 258 additions & 1 deletion tokio/src/net/udp/socket.rs
@@ -1,10 +1,11 @@
use crate::io::PollEvented;
use crate::io::{PollEvented, ReadBuf};
use crate::net::{to_socket_addrs, ToSocketAddrs};

use std::convert::TryFrom;
use std::fmt;
use std::io;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::task::{Context, Poll};

cfg_net! {
/// A UDP socket
Expand Down Expand Up @@ -272,6 +273,42 @@ impl UdpSocket {
.await
}

/// Attempts to send data on the socket to the remote address to which it was previously
/// `connect`ed.
///
/// The [`connect`] method will connect this socket to a remote address. The future
/// will resolve to an error if the socket is not connected.
///
/// Note that on multiple calls to a `poll_*` method in the send direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not available to write
/// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`connect`]: method@Self::connect
pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
loop {
let ev = ready!(self.io.poll_write_ready(cx))?;

match self.io.get_ref().send(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
}
x => return Poll::Ready(x),
}
}
}

/// Try to send data on the socket to the remote address to which it is
/// connected.
///
Expand Down Expand Up @@ -304,6 +341,55 @@ impl UdpSocket {
.await
}

/// Attempts to receive a single datagram message on the socket from the remote
/// address to which it is `connect`ed.
///
/// The [`connect`] method will connect this socket to a remote address. This method
/// resolves to an error if the socket is not connected.
///
/// Note that on multiple calls to a `poll_*` method in the recv direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready to read
/// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`connect`]: method@Self::connect
pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;

// Safety: will not read the maybe uinitialized bytes.
let b = unsafe {
&mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
};
match self.io.get_ref().recv(b) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
}
Err(e) => return Poll::Ready(Err(e)),
Ok(n) => {
// Safety: We trust `recv` to have filled up `n` bytes
// in the buffer.
unsafe {
buf.assume_init(n);
}
buf.advance(n);
return Poll::Ready(Ok(()));
}
}
}
}

/// Returns a future that sends data on the socket to the given address.
/// On success, the future will resolve to the number of bytes written.
///
Expand Down Expand Up @@ -337,6 +423,41 @@ impl UdpSocket {
}
}

/// Attempts to send data on the socket to a given address.
///
/// Note that on multiple calls to a `poll_*` method in the send direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready to write
/// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
pub fn poll_send_to(
&self,
cx: &mut Context<'_>,
buf: &[u8],
target: &SocketAddr,
) -> Poll<io::Result<usize>> {
loop {
let ev = ready!(self.io.poll_write_ready(cx))?;

match self.io.get_ref().send_to(buf, *target) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
}
x => return Poll::Ready(x),
}
}
}

/// Try to send data on the socket to the given address, but if the send is blocked
/// this will return right away.
///
Expand Down Expand Up @@ -403,6 +524,142 @@ impl UdpSocket {
.await
}

/// Attempts to receive a single datagram on the socket.
///
/// Note that on multiple calls to a `poll_*` method in the recv direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready to read
/// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
pub fn poll_recv_from(
&self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<SocketAddr>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;

// Safety: will not read the maybe uinitialized bytes.
let b = unsafe {
&mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
};
match self.io.get_ref().recv_from(b) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
}
Err(e) => return Poll::Ready(Err(e)),
Ok((n, addr)) => {
// Safety: We trust `recv` to have filled up `n` bytes
// in the buffer.
unsafe {
buf.assume_init(n);
}
buf.advance(n);
return Poll::Ready(Ok(addr));
}
}
}
}

/// Receives data from the socket, without removing it from the input queue.
/// On success, returns the number of bytes read and the address from whence
/// the data came.
///
/// # Notes
///
/// On Windows, if the data is larger than the buffer specified, the buffer
/// is filled with the first part of the data, and peek_from returns the error
/// WSAEMSGSIZE(10040). The excess data is lost.
/// Make sure to always use a sufficiently large buffer to hold the
/// maximum UDP packet size, which can be up to 65536 bytes in size.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UdpSocket;
/// # use std::{io, net::SocketAddr};
///
/// # #[tokio::main]
/// # async fn main() -> io::Result<()> {
/// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
/// let mut buf = [0u8; 32];
/// let (len, addr) = sock.peek_from(&mut buf).await?;
/// println!("peeked {:?} bytes from {:?}", len, addr);
/// # Ok(())
/// # }
/// ```
pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.io
.async_io(mio::Interest::READABLE, |sock| sock.peek_from(buf))
.await
}

/// Receives data from the socket, without removing it from the input queue.
/// On success, returns the number of bytes read.
///
/// # Notes
///
/// Note that on multiple calls to a `poll_*` method in the recv direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup
///
/// On Windows, if the data is larger than the buffer specified, the buffer
/// is filled with the first part of the data, and peek returns the error
/// WSAEMSGSIZE(10040). The excess data is lost.
/// Make sure to always use a sufficiently large buffer to hold the
/// maximum UDP packet size, which can be up to 65536 bytes in size.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready to read
/// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
pub fn poll_peek_from(
&self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<SocketAddr>> {
loop {
let ev = ready!(self.io.poll_read_ready(cx))?;

// Safety: will not read the maybe uinitialized bytes.
let b = unsafe {
&mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
};
match self.io.get_ref().peek_from(b) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
}
Err(e) => return Poll::Ready(Err(e)),
Ok((n, addr)) => {
// Safety: We trust `recv` to have filled up `n` bytes
// in the buffer.
unsafe {
buf.assume_init(n);
}
buf.advance(n);
return Poll::Ready(Ok(addr));
}
}
}
}

/// Gets the value of the `SO_BROADCAST` option for this socket.
///
/// For more information about this option, see [`set_broadcast`].
Expand Down

0 comments on commit 358e4f9

Please sign in to comment.