Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SO_RXQ_OVFL socket option (android/fuchsia/linux) #1455

Merged
merged 1 commit into from Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -14,6 +14,8 @@ This project adheres to [Semantic Versioning](https://semver.org/).
(#[1457](https://github.com/nix-rust/nix/pull/1457))
- Added `renameat2` for Linux
(#[1458](https://github.com/nix-rust/nix/pull/1458))
- Added `RxqOvfl` support on Linux, Fuchsia and Android.
(#[1455](https://github.com/nix-rust/nix/pull/1455))

### Changed
- `ptsname_r` now returns a lossily-converted string in the event of bad UTF,
Expand Down
38 changes: 38 additions & 0 deletions src/sys/socket/mod.rs
Expand Up @@ -610,6 +610,17 @@ pub enum ControlMessageOwned {
#[cfg(target_os = "linux")]
UdpGroSegments(u16),

/// SO_RXQ_OVFL indicates that an unsigned 32 bit value
/// ancilliary msg (cmsg) should be attached to recieved
/// skbs indicating the number of packets dropped by the
/// socket between the last recieved packet and this
/// received packet.
///
/// `RxqOvfl` socket option should be enabled on a socket
/// to allow receiving the drop counter.
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
RxqOvfl(u32),

/// Catch-all variant for unimplemented cmsg types.
#[doc(hidden)]
Unknown(UnknownCmsg),
Expand Down Expand Up @@ -708,6 +719,11 @@ impl ControlMessageOwned {
let gso_size: u16 = ptr::read_unaligned(p as *const _);
ControlMessageOwned::UdpGroSegments(gso_size)
},
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
(libc::SOL_SOCKET, libc::SO_RXQ_OVFL) => {
let drop_counter = ptr::read_unaligned(p as *const u32);
ControlMessageOwned::RxqOvfl(drop_counter)
},
(_, _) => {
let sl = slice::from_raw_parts(p, len);
let ucmsg = UnknownCmsg(*header, Vec::<u8>::from(sl));
Expand Down Expand Up @@ -826,6 +842,14 @@ pub enum ControlMessage<'a> {
target_os = "android",
target_os = "ios",))]
Ipv6PacketInfo(&'a libc::in6_pktinfo),

/// SO_RXQ_OVFL indicates that an unsigned 32 bit value
/// ancilliary msg (cmsg) should be attached to recieved
/// skbs indicating the number of packets dropped by the
/// socket between the last recieved packet and this
/// received packet.
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
RxqOvfl(&'a u32),
}

// An opaque structure used to prevent cmsghdr from being a public type
Expand Down Expand Up @@ -916,6 +940,10 @@ impl<'a> ControlMessage<'a> {
target_os = "netbsd", target_os = "freebsd",
target_os = "android", target_os = "ios",))]
ControlMessage::Ipv6PacketInfo(info) => info as *const _ as *const u8,
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
ControlMessage::RxqOvfl(drop_count) => {
drop_count as *const _ as *const u8
},
};
unsafe {
ptr::copy_nonoverlapping(
Expand Down Expand Up @@ -964,6 +992,10 @@ impl<'a> ControlMessage<'a> {
target_os = "netbsd", target_os = "freebsd",
target_os = "android", target_os = "ios",))]
ControlMessage::Ipv6PacketInfo(info) => mem::size_of_val(info),
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
ControlMessage::RxqOvfl(drop_count) => {
mem::size_of_val(drop_count)
},
}
}

Expand All @@ -988,6 +1020,8 @@ impl<'a> ControlMessage<'a> {
target_os = "netbsd", target_os = "freebsd",
target_os = "android", target_os = "ios",))]
ControlMessage::Ipv6PacketInfo(_) => libc::IPPROTO_IPV6,
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
ControlMessage::RxqOvfl(_) => libc::SOL_SOCKET,
}
}

Expand Down Expand Up @@ -1023,6 +1057,10 @@ impl<'a> ControlMessage<'a> {
target_os = "netbsd", target_os = "freebsd",
target_os = "android", target_os = "ios",))]
ControlMessage::Ipv6PacketInfo(_) => libc::IPV6_PKTINFO,
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
ControlMessage::RxqOvfl(_) => {
libc::SO_RXQ_OVFL
},
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/sys/socket/sockopt.rs
Expand Up @@ -328,6 +328,8 @@ sockopt_impl!(Both, Ipv4RecvDstAddr, libc::IPPROTO_IP, libc::IP_RECVDSTADDR, boo
sockopt_impl!(Both, UdpGsoSegment, libc::SOL_UDP, libc::UDP_SEGMENT, libc::c_int);
#[cfg(target_os = "linux")]
sockopt_impl!(Both, UdpGroSegment, libc::IPPROTO_UDP, libc::UDP_GRO, bool);
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
sockopt_impl!(Both, RxqOvfl, libc::SOL_SOCKET, libc::SO_RXQ_OVFL, libc::c_int);

#[cfg(any(target_os = "android", target_os = "linux"))]
#[derive(Copy, Clone, Debug)]
Expand Down
83 changes: 83 additions & 0 deletions test/sys/test_socket.rs
Expand Up @@ -1645,3 +1645,86 @@ fn test_recvmmsg_timestampns() {
// Close socket
nix::unistd::close(in_socket).unwrap();
}

// Disable the test on emulated platforms because it fails in Cirrus-CI. Lack of QEMU
// support is suspected.
#[cfg_attr(not(any(target_arch = "x86_64")), ignore)]
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
#[test]
fn test_recvmsg_rxq_ovfl() {
use nix::Error;
use nix::sys::socket::*;
use nix::sys::uio::IoVec;
use nix::sys::socket::sockopt::{RxqOvfl, RcvBuf};

let message = [0u8; 2048];
let bufsize = message.len() * 2;

let in_socket = socket(
AddressFamily::Inet,
SockType::Datagram,
SockFlag::empty(),
None).unwrap();
let out_socket = socket(
AddressFamily::Inet,
SockType::Datagram,
SockFlag::empty(),
None).unwrap();

let localhost = InetAddr::new(IpAddr::new_v4(127, 0, 0, 1), 0);
bind(in_socket, &SockAddr::new_inet(localhost)).unwrap();

let address = getsockname(in_socket).unwrap();
connect(out_socket, &address).unwrap();

// Set SO_RXQ_OVFL flag.
setsockopt(in_socket, RxqOvfl, &1).unwrap();

// Set the receiver buffer size to hold only 2 messages.
setsockopt(in_socket, RcvBuf, &bufsize).unwrap();

let mut drop_counter = 0;

for _ in 0..2 {
let iov = [IoVec::from_slice(&message)];
let flags = MsgFlags::empty();

// Send the 3 messages (the receiver buffer can only hold 2 messages)
// to create an overflow.
for _ in 0..3 {
let l = sendmsg(out_socket, &iov, &[], flags, Some(&address)).unwrap();
assert_eq!(message.len(), l);
}

// Receive the message and check the drop counter if any.
loop {
let mut buffer = vec![0u8; message.len()];
let mut cmsgspace = nix::cmsg_space!(u32);

let iov = [IoVec::from_mut_slice(&mut buffer)];

match recvmsg(
in_socket,
&iov,
Some(&mut cmsgspace),
MsgFlags::MSG_DONTWAIT) {
Ok(r) => {
drop_counter = match r.cmsgs().next() {
Some(ControlMessageOwned::RxqOvfl(drop_counter)) => drop_counter,
Some(_) => panic!("Unexpected control message"),
None => 0,
};
},
Err(Error::EAGAIN) => { break; },
_ => { panic!("unknown recvmsg() error"); },
}
}
}

// One packet lost.
assert_eq!(drop_counter, 1);

// Close sockets
nix::unistd::close(in_socket).unwrap();
nix::unistd::close(out_socket).unwrap();
}