Skip to content

Commit

Permalink
Support SO_RXQ_OVFL socket option (Linux only)
Browse files Browse the repository at this point in the history
This PR implements support of RXQ_OVFL flag and parsing ControlMessage
to get the packet drop counter of UDP socket.
  • Loading branch information
Junho Choi committed Jun 19, 2021
1 parent 7317e0a commit 2335050
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 0 deletions.
38 changes: 38 additions & 0 deletions src/sys/socket/mod.rs
Expand Up @@ -610,6 +610,17 @@ pub enum ControlMessageOwned {
#[cfg(target_os = "linux")]
UdpGroSegments(u16),

/// SO_RXQ_OVFL indicates that an unsigned 32 bit value
/// ancilliary msg (cmsg) should be attached to recieved
/// skbs indicating the number of packets dropped by the
/// socket between the last recieved packet and this
/// received packet.
///
/// `RxqOvfl` socket option should be enabled on a socket
/// to allow receiving the drop counter.
#[cfg(target_os = "linux")]
RxqOvfl(u32),

/// Catch-all variant for unimplemented cmsg types.
#[doc(hidden)]
Unknown(UnknownCmsg),
Expand Down Expand Up @@ -708,6 +719,11 @@ impl ControlMessageOwned {
let gso_size: u16 = ptr::read_unaligned(p as *const _);
ControlMessageOwned::UdpGroSegments(gso_size)
},
#[cfg(target_os = "linux")]
(libc::SOL_SOCKET, libc::SO_RXQ_OVFL) => {
let drop_counter = ptr::read_unaligned(p as *const u32);
ControlMessageOwned::RxqOvfl(drop_counter)
},
(_, _) => {
let sl = slice::from_raw_parts(p, len);
let ucmsg = UnknownCmsg(*header, Vec::<u8>::from(sl));
Expand Down Expand Up @@ -826,6 +842,14 @@ pub enum ControlMessage<'a> {
target_os = "android",
target_os = "ios",))]
Ipv6PacketInfo(&'a libc::in6_pktinfo),

// SO_RXQ_OVFL indicates that an unsigned 32 bit value
// ancilliary msg (cmsg) should be attached to recieved
// skbs indicating the number of packets dropped by the
// socket between the last recieved packet and this
// received packet.
#[cfg(target_os = "linux")]
RxqOvfl(&'a u32),
}

// An opaque structure used to prevent cmsghdr from being a public type
Expand Down Expand Up @@ -916,6 +940,10 @@ impl<'a> ControlMessage<'a> {
target_os = "netbsd", target_os = "freebsd",
target_os = "android", target_os = "ios",))]
ControlMessage::Ipv6PacketInfo(info) => info as *const _ as *const u8,
#[cfg(target_os = "linux")]
ControlMessage::RxqOvfl(drop_count) => {
drop_count as *const _ as *const u8
},
};
unsafe {
ptr::copy_nonoverlapping(
Expand Down Expand Up @@ -964,6 +992,10 @@ impl<'a> ControlMessage<'a> {
target_os = "netbsd", target_os = "freebsd",
target_os = "android", target_os = "ios",))]
ControlMessage::Ipv6PacketInfo(info) => mem::size_of_val(info),
#[cfg(target_os = "linux")]
ControlMessage::RxqOvfl(drop_count) => {
mem::size_of_val(drop_count)
},
}
}

Expand All @@ -988,6 +1020,8 @@ impl<'a> ControlMessage<'a> {
target_os = "netbsd", target_os = "freebsd",
target_os = "android", target_os = "ios",))]
ControlMessage::Ipv6PacketInfo(_) => libc::IPPROTO_IPV6,
#[cfg(target_os = "linux")]
ControlMessage::RxqOvfl(_) => libc::SO_RXQ_OVFL,
}
}

Expand Down Expand Up @@ -1023,6 +1057,10 @@ impl<'a> ControlMessage<'a> {
target_os = "netbsd", target_os = "freebsd",
target_os = "android", target_os = "ios",))]
ControlMessage::Ipv6PacketInfo(_) => libc::IPV6_PKTINFO,
#[cfg(target_os = "linux")]
ControlMessage::RxqOvfl(_) => {
libc::SO_RXQ_OVFL
},
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/sys/socket/sockopt.rs
Expand Up @@ -324,6 +324,8 @@ sockopt_impl!(Both, Ipv4RecvDstAddr, libc::IPPROTO_IP, libc::IP_RECVDSTADDR, boo
sockopt_impl!(Both, UdpGsoSegment, libc::SOL_UDP, libc::UDP_SEGMENT, libc::c_int);
#[cfg(target_os = "linux")]
sockopt_impl!(Both, UdpGroSegment, libc::IPPROTO_UDP, libc::UDP_GRO, bool);
#[cfg(target_os = "linux")]
sockopt_impl!(Both, RxqOvfl, libc::SOL_SOCKET, libc::SO_RXQ_OVFL, libc::c_int);

#[cfg(any(target_os = "android", target_os = "linux"))]
#[derive(Copy, Clone, Debug)]
Expand Down
84 changes: 84 additions & 0 deletions test/sys/test_socket.rs
Expand Up @@ -1649,3 +1649,87 @@ fn test_recvmmsg_timestampns() {
// Close socket
nix::unistd::close(in_socket).unwrap();
}

// Disable the test on emulated platforms because it fails in Cirrus-CI. Lack of QEMU
// support is suspected.
#[cfg_attr(not(any(target_arch = "x86_64")), ignore)]
#[cfg(all(target_os = "linux"))]
#[test]
fn test_recvmsg_rxq_ovfl() {
use nix::Error;
use nix::errno::Errno;
use nix::sys::socket::*;
use nix::sys::uio::IoVec;
use nix::sys::socket::sockopt::{RxqOvfl, RcvBuf};

let message = [0u8; 2048];
let bufsize = message.len() * 2;

let in_socket = socket(
AddressFamily::Inet,
SockType::Datagram,
SockFlag::empty(),
None).unwrap();
let out_socket = socket(
AddressFamily::Inet,
SockType::Datagram,
SockFlag::empty(),
None).unwrap();

let localhost = InetAddr::new(IpAddr::new_v4(127, 0, 0, 1), 0);
bind(in_socket, &SockAddr::new_inet(localhost)).unwrap();

let address = getsockname(in_socket).unwrap();
connect(out_socket, &address).unwrap();

// Set SO_RXQ_OVFL flag.
setsockopt(in_socket, RxqOvfl, &1).unwrap();

// Set the receiver buffer size to hold only 2 messages.
setsockopt(in_socket, RcvBuf, &bufsize).unwrap();

let mut drop_counter = 0;

for _ in 0..2 {
let iov = [IoVec::from_slice(&message)];
let flags = MsgFlags::empty();

// Send the 3 messages (the receiver buffer can only hold 2 messages)
// to create an overflow.
for _ in 0..3 {
let l = sendmsg(out_socket, &iov, &[], flags, Some(&address)).unwrap();
assert_eq!(message.len(), l);
}

// Receive the message and check the drop counter if any.
loop {
let mut buffer = vec![0u8; message.len()];
let mut cmsgspace = nix::cmsg_space!(u32);

let iov = [IoVec::from_mut_slice(&mut buffer)];

match recvmsg(
in_socket,
&iov,
Some(&mut cmsgspace),
MsgFlags::MSG_DONTWAIT) {
Ok(r) => {
drop_counter = match r.cmsgs().next() {
Some(ControlMessageOwned::RxqOvfl(drop_counter)) => drop_counter,
Some(_) => panic!("Unexpected control message"),
None => 0,
};
},
Err(Error::Sys(Errno::EAGAIN)) => { break; },
_ => { panic!("unknown recvmsg() error"); },
}
}
}

// One packet lost.
assert_eq!(drop_counter, 1);

// Close sockets
nix::unistd::close(in_socket).unwrap();
nix::unistd::close(out_socket).unwrap();
}

0 comments on commit 2335050

Please sign in to comment.