Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for sendmmsg/recvmmsg #494

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
208 changes: 208 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,3 +727,211 @@ impl<'name, 'bufs, 'control> fmt::Debug for MsgHdrMut<'name, 'bufs, 'control> {
"MsgHdrMut".fmt(fmt)
}
}

/// Configuration of a `sendmmsg(2)` system call.
///
/// This wraps `mmsghdr` on Unix. Also see [`MmsgHdrMut`] for the variant used
/// by `recvmmsg(2)`.
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
pub struct MmsgHdr<'addr, 'bufs, 'control> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about matching mmsghdr here? For example

pub struct MmsgHdr<'addr, 'bufs, 'control> {
    msg: MsgHdr,
    len: libc::c_uint,
}

This way the caller can decide if they want to use a Vec, a slice an array or something else.

inner: Vec<sys::mmsghdr>,
#[allow(clippy::type_complexity)]
_lifetimes: PhantomData<(&'addr SockAddr, &'bufs IoSlice<'bufs>, &'control [u8])>,
}

#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
impl<'addr, 'bufs, 'control> MmsgHdr<'addr, 'bufs, 'control> {
/// Create a new `MmsgHdr` with all empty/zero fields.
#[allow(clippy::new_without_default)]
pub fn new(len: usize) -> MmsgHdr<'addr, 'bufs, 'control> {
// SAFETY: all zero is valid for `mmsghdr`
MmsgHdr {
inner: vec![unsafe { mem::zeroed() }; len],
_lifetimes: PhantomData,
}
}

/// Set the addresses (name) of the message.
///
/// Corresponds to setting `msg_name` and `msg_namelen`.
pub fn with_addrs(mut self, addrs: &'addr [SockAddr]) -> Self {
for (msg, addr) in self.inner.iter_mut().zip(addrs) {
sys::set_msghdr_name(&mut msg.msg_hdr, addr);
}
self
}

/// Set the buffer(s) of the message.
///
/// Corresponds to setting `msg_iov` and `msg_iovlen` on Unix.
pub fn with_buffers(mut self, bufs: &'bufs [IoSlice<'_>]) -> Self {
for (msg, buf) in self.inner.iter_mut().zip(bufs) {
sys::set_msghdr_iov(&mut msg.msg_hdr, buf as *const _ as *mut libc::iovec, 1);
}
self
}

/// Set the control buffer of the messages.
///
/// Corresponds to setting `msg_control` and `msg_controllen` on Unix.
pub fn with_control(mut self, bufs: &'control [&'control [u8]]) -> Self {
for (msg, buf) in self.inner.iter_mut().zip(bufs) {
sys::set_msghdr_control(&mut msg.msg_hdr, buf.as_ptr() as *mut _, buf.len())
}
self
}

/// Set the flags on the messages
///
/// Corresponds to setting `msg_flags` on Unix.
pub fn with_flags(mut self, flags: &[sys::c_int]) -> Self {
for (msg, flags) in self.inner.iter_mut().zip(flags) {
sys::set_msghdr_flags(&mut msg.msg_hdr, *flags);
}
self
}
}

#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
impl<'name, 'bufs, 'control> fmt::Debug for MmsgHdr<'name, 'bufs, 'control> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
"MmsgHdr".fmt(fmt)
}
}

/// Configuration of a `recvmmsg(2)` system call
///
/// This wraps `mmsghdr` on Unix. Also see [`MmsgHdr`] for the variant used by
/// `sendmmsg(2)`.
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
pub struct MmsgHdrMut<'addr, 'bufs, 'control> {
inner: Vec<sys::mmsghdr>,
#[allow(clippy::type_complexity)]
_lifetimes: PhantomData<(
&'addr mut SockAddr,
&'bufs mut MaybeUninitSlice<'bufs>,
&'control mut [u8],
)>,
}

#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
impl<'addr, 'bufs, 'control> MmsgHdrMut<'addr, 'bufs, 'control> {
/// Create a new `MmsgHdrMut` with all empty/zero fields.
#[allow(clippy::new_without_default)]
pub fn new(len: usize) -> MmsgHdrMut<'addr, 'bufs, 'control> {
// SAFETY: all zero is valid for `msghdr` and `WSAMSG`.
MmsgHdrMut {
inner: vec![unsafe { mem::zeroed() }; len],
_lifetimes: PhantomData,
}
}

/// Set the mutable address (name) of the message.
///
/// Corresponds to setting `msg_name` and `msg_namelen` on Unix and `name`
/// and `namelen` on Windows.
pub fn with_addrs(mut self, addrs: &'addr mut [SockAddr]) -> Self {
for (msg, addr) in self.inner.iter_mut().zip(addrs) {
sys::set_msghdr_name(&mut msg.msg_hdr, addr);
}
self
}

/// Set the mutable buffer(s) of the message.
///
/// Corresponds to setting `msg_iov` and `msg_iovlen` on Unix and `lpBuffers`
/// and `dwBufferCount` on Windows.
pub fn with_buffers(mut self, bufs: &'bufs mut [MaybeUninitSlice<'_>]) -> Self {
for (msg, buf) in self.inner.iter_mut().zip(bufs) {
sys::set_msghdr_iov(&mut msg.msg_hdr, buf as *mut _ as *mut libc::iovec, 1);
}
self
}

/// Set the mutable control buffer of the message.
///
/// Corresponds to setting `msg_control` and `msg_controllen` on Unix and
/// `Control` on Windows.
pub fn with_control(mut self, buf: &'control mut [&'control mut [MaybeUninit<u8>]]) -> Self {
for (msg, buf) in self.inner.iter_mut().zip(buf) {
sys::set_msghdr_control(&mut msg.msg_hdr, buf.as_mut_ptr().cast(), buf.len());
}
self
}

/// Returns the flags of the message.
pub fn flags(&self, n: usize) -> Vec<RecvFlags> {
self.inner
.iter()
.take(n)
.map(|msg| sys::msghdr_flags(&msg.msg_hdr))
.collect()
}
}

#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
impl<'name, 'bufs, 'control> fmt::Debug for MmsgHdrMut<'name, 'bufs, 'control> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
"MmsgHdrMut".fmt(fmt)
}
}
148 changes: 148 additions & 0 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ use crate::MsgHdrMut;
use crate::{Domain, Protocol, SockAddr, TcpKeepalive, Type};
#[cfg(not(target_os = "redox"))]
use crate::{MaybeUninitSlice, MsgHdr, RecvFlags};
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
use crate::{MmsgHdr, MmsgHdrMut};

/// Owned wrapper around a system socket.
///
Expand Down Expand Up @@ -586,6 +598,40 @@ impl Socket {
sys::recv_from_vectored(self.as_raw(), bufs, flags)
}

/// Receive multiple messages in a single call.
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
)))
)]
pub fn recv_multiple_from(
&self,
msgs: &mut [MaybeUninitSlice<'_>],
flags: c_int,
) -> io::Result<Vec<(usize, RecvFlags, SockAddr)>> {
sys::recv_multiple_from(self.as_raw(), msgs, flags)
}

/// Receives data from the socket, without removing it from the queue.
///
/// Successive calls return the same data. This is accomplished by passing
Expand Down Expand Up @@ -642,6 +688,41 @@ impl Socket {
sys::recvmsg(self.as_raw(), msg, flags)
}

/// Receive multiple messages from a socket using a message structure.
#[doc = man_links!(recvmmsg(2))]
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
)))
)]
pub fn recvmmsg(
&self,
msgs: &mut MmsgHdrMut<'_, '_, '_>,
flags: sys::c_int,
) -> io::Result<usize> {
sys::recvmmsg(self.as_raw(), msgs, flags)
}

/// Sends data on the socket to a connected peer.
///
/// This is typically used on TCP sockets or datagram sockets which have
Expand Down Expand Up @@ -741,13 +822,80 @@ impl Socket {
sys::send_to_vectored(self.as_raw(), bufs, addr, flags)
}

/// Send multiple data to multiple peers listening on `addrs`. Return the amount of bytes
/// written for each message.
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
)))
)]
pub fn send_multiple_to(
&self,
msgs: &[IoSlice<'_>],
to: &[SockAddr],
flags: c_int,
) -> io::Result<Vec<usize>> {
sys::send_multiple_to(self.as_raw(), msgs, to, flags)
}

/// Send a message on a socket using a message structure.
#[doc = man_links!(sendmsg(2))]
#[cfg(not(target_os = "redox"))]
#[cfg_attr(docsrs, doc(cfg(not(target_os = "redox"))))]
pub fn sendmsg(&self, msg: &MsgHdr<'_, '_, '_>, flags: sys::c_int) -> io::Result<usize> {
sys::sendmsg(self.as_raw(), msg, flags)
}

/// Send multiple messages on a socket using a multiple message structure.
#[doc = man_links!(sendmmsg(2))]
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
)))
)]
pub fn sendmmsg(&self, msgs: &MmsgHdr<'_, '_, '_>, flags: sys::c_int) -> io::Result<usize> {
sys::sendmmsg(self.as_raw(), msgs, flags)
}
}

/// Set `SOCK_CLOEXEC` and `NO_HANDLE_INHERIT` on the `ty`pe on platforms that
Expand Down