From 922ef3434af40de62b39e5023a61dba03f735dfb Mon Sep 17 00:00:00 2001 From: Junho Choi Date: Fri, 18 Jun 2021 17:39:36 -0700 Subject: [PATCH] Support SO_RXQ_OVFL socket option (Linux only) This PR implements support of RXQ_OVFL flag and parsing ControlMessage to get the packet drop counter of UDP socket. --- src/sys/socket/mod.rs | 38 ++++++++++++++++++ src/sys/socket/sockopt.rs | 2 + test/sys/test_socket.rs | 81 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 121 insertions(+) diff --git a/src/sys/socket/mod.rs b/src/sys/socket/mod.rs index 69070463a9..9f37cb747f 100644 --- a/src/sys/socket/mod.rs +++ b/src/sys/socket/mod.rs @@ -610,6 +610,17 @@ pub enum ControlMessageOwned { #[cfg(target_os = "linux")] UdpGroSegments(u16), + /// SO_RXQ_OVFL indicates that an unsigned 32 bit value + /// ancilliary msg (cmsg) should be attached to recieved + /// skbs indicating the number of packets dropped by the + /// socket between the last recieved packet and this + /// received packet. + /// + /// `RxqOvfl` socket option should be enabled on a socket + /// to allow receiving the drop counter. + #[cfg(target_os = "linux")] + RxqOvfl(u32), + /// Catch-all variant for unimplemented cmsg types. #[doc(hidden)] Unknown(UnknownCmsg), @@ -708,6 +719,11 @@ impl ControlMessageOwned { let gso_size: u16 = ptr::read_unaligned(p as *const _); ControlMessageOwned::UdpGroSegments(gso_size) }, + #[cfg(target_os = "linux")] + (libc::SOL_SOCKET, libc::SO_RXQ_OVFL) => { + let drop_counter = ptr::read_unaligned(p as *const u32); + ControlMessageOwned::RxqOvfl(drop_counter) + }, (_, _) => { let sl = slice::from_raw_parts(p, len); let ucmsg = UnknownCmsg(*header, Vec::::from(sl)); @@ -826,6 +842,14 @@ pub enum ControlMessage<'a> { target_os = "android", target_os = "ios",))] Ipv6PacketInfo(&'a libc::in6_pktinfo), + + // SO_RXQ_OVFL indicates that an unsigned 32 bit value + // ancilliary msg (cmsg) should be attached to recieved + // skbs indicating the number of packets dropped by the + // socket between the last recieved packet and this + // received packet. + #[cfg(target_os = "linux")] + RxqOvfl(&'a u32), } // An opaque structure used to prevent cmsghdr from being a public type @@ -916,6 +940,10 @@ impl<'a> ControlMessage<'a> { target_os = "netbsd", target_os = "freebsd", target_os = "android", target_os = "ios",))] ControlMessage::Ipv6PacketInfo(info) => info as *const _ as *const u8, + #[cfg(target_os = "linux")] + ControlMessage::RxqOvfl(drop_count) => { + drop_count as *const _ as *const u8 + }, }; unsafe { ptr::copy_nonoverlapping( @@ -964,6 +992,10 @@ impl<'a> ControlMessage<'a> { target_os = "netbsd", target_os = "freebsd", target_os = "android", target_os = "ios",))] ControlMessage::Ipv6PacketInfo(info) => mem::size_of_val(info), + #[cfg(target_os = "linux")] + ControlMessage::RxqOvfl(drop_count) => { + mem::size_of_val(drop_count) + }, } } @@ -988,6 +1020,8 @@ impl<'a> ControlMessage<'a> { target_os = "netbsd", target_os = "freebsd", target_os = "android", target_os = "ios",))] ControlMessage::Ipv6PacketInfo(_) => libc::IPPROTO_IPV6, + #[cfg(target_os = "linux")] + ControlMessage::RxqOvfl(_) => libc::SO_RXQ_OVFL, } } @@ -1023,6 +1057,10 @@ impl<'a> ControlMessage<'a> { target_os = "netbsd", target_os = "freebsd", target_os = "android", target_os = "ios",))] ControlMessage::Ipv6PacketInfo(_) => libc::IPV6_PKTINFO, + #[cfg(target_os = "linux")] + ControlMessage::RxqOvfl(_) => { + libc::SO_RXQ_OVFL + }, } } diff --git a/src/sys/socket/sockopt.rs b/src/sys/socket/sockopt.rs index fe17395f72..892f3746cf 100644 --- a/src/sys/socket/sockopt.rs +++ b/src/sys/socket/sockopt.rs @@ -324,6 +324,8 @@ sockopt_impl!(Both, Ipv4RecvDstAddr, libc::IPPROTO_IP, libc::IP_RECVDSTADDR, boo sockopt_impl!(Both, UdpGsoSegment, libc::SOL_UDP, libc::UDP_SEGMENT, libc::c_int); #[cfg(target_os = "linux")] sockopt_impl!(Both, UdpGroSegment, libc::IPPROTO_UDP, libc::UDP_GRO, bool); +#[cfg(target_os = "linux")] +sockopt_impl!(Both, RxqOvfl, libc::SOL_SOCKET, libc::SO_RXQ_OVFL, libc::c_int); #[cfg(any(target_os = "android", target_os = "linux"))] #[derive(Copy, Clone, Debug)] diff --git a/test/sys/test_socket.rs b/test/sys/test_socket.rs index c22eaeb1a5..f88f5dd88b 100644 --- a/test/sys/test_socket.rs +++ b/test/sys/test_socket.rs @@ -1649,3 +1649,84 @@ fn test_recvmmsg_timestampns() { // Close socket nix::unistd::close(in_socket).unwrap(); } + +#[cfg(all(target_os = "linux"))] +#[test] +fn test_recvmsg_rxq_ovfl() { + use nix::Error; + use nix::errno::Errno; + use nix::sys::socket::*; + use nix::sys::uio::IoVec; + use nix::sys::socket::sockopt::{RxqOvfl, RcvBuf}; + + let message = [0u8; 2048]; + let bufsize = message.len() * 2; + + let in_socket = socket( + AddressFamily::Inet, + SockType::Datagram, + SockFlag::empty(), + None).unwrap(); + let out_socket = socket( + AddressFamily::Inet, + SockType::Datagram, + SockFlag::empty(), + None).unwrap(); + + let localhost = InetAddr::new(IpAddr::new_v4(127, 0, 0, 1), 0); + bind(in_socket, &SockAddr::new_inet(localhost)).unwrap(); + + let address = getsockname(in_socket).unwrap(); + connect(out_socket, &address).unwrap(); + + // Set SO_RXQ_OVFL flag. + setsockopt(in_socket, RxqOvfl, &1).unwrap(); + + // Set the receiver buffer size to hold only 2 messages. + setsockopt(in_socket, RcvBuf, &bufsize).unwrap(); + + let mut drop_counter = 0; + + for _ in 0..2 { + let iov = [IoVec::from_slice(&message)]; + let flags = MsgFlags::empty(); + + // Send the 3 messages (the receiver buffer can only hold 2 messages) + // to create an overflow. + for _ in 0..3 { + let l = sendmsg(out_socket, &iov, &[], flags, Some(&address)).unwrap(); + assert_eq!(message.len(), l); + } + + // Receive the message and check the drop counter if any. + loop { + let mut buffer = vec![0u8; message.len()]; + let mut cmsgspace = nix::cmsg_space!(u32); + + let iov = [IoVec::from_mut_slice(&mut buffer)]; + + match recvmsg( + in_socket, + &iov, + Some(&mut cmsgspace), + MsgFlags::MSG_DONTWAIT) { + Ok(r) => { + drop_counter = match r.cmsgs().next() { + Some(ControlMessageOwned::RxqOvfl(drop_counter)) => drop_counter, + Some(_) => panic!("Unexpected control message"), + None => 0, + }; + }, + Err(Error::Sys(Errno::EAGAIN)) => { break; }, + _ => { panic!("unknown recvmsg() error"); }, + } + } + } + + // One packet lost. + assert_eq!(drop_counter, 1); + + // Close sockets + nix::unistd::close(in_socket).unwrap(); + nix::unistd::close(out_socket).unwrap(); +}