Skip to content

Commit

Permalink
Merge #1209
Browse files Browse the repository at this point in the history
1209: Support UDP GSO and GRO on linux r=asomers a=glebpom

This PR implements support for UDP GSO and GRO on Linux. It provides the way to send/receive UDP payloads bigger than interface MTU. The goal is to improve UDP performance. 

GSO was introduced in Linux 4.18, GRO in 5.3


Co-authored-by: Gleb Pomykalov <gleb@lancastr.com>
  • Loading branch information
bors[bot] and Gleb Pomykalov committed Apr 25, 2020
2 parents 4707736 + b1b64ee commit 490e979
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -21,6 +21,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).
- Derived `Ord`, `PartialOrd` for `unistd::Pid` (#[1189](https://github.com/nix-rust/nix/pull/1189))
- Added `select::FdSet::fds` method to iterate over file descriptors in a set.
([#1207](https://github.com/nix-rust/nix/pull/1207))
- Added support for UDP generic segmentation offload (GSO) and generic
receive offload (GRO) ([#1209](https://github.com/nix-rust/nix/pull/1209))

### Changed
- Changed `fallocate` return type from `c_int` to `()` (#[1201](https://github.com/nix-rust/nix/pull/1201))
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -16,7 +16,7 @@ exclude = [
]

[dependencies]
libc = { git = "https://github.com/rust-lang/libc/", features = [ "extra_traits" ] }
libc = { version = "0.2.69", features = [ "extra_traits" ] }
bitflags = "1.1"
cfg-if = "0.1.10"
void = "1.0.2"
Expand All @@ -29,6 +29,7 @@ bytes = "0.4.8"
lazy_static = "1.2"
rand = "0.6"
tempfile = "3.0.5"
semver = "0.9.0"

[target.'cfg(any(target_os = "android", target_os = "linux"))'.dev-dependencies]
caps = "0.3.1"
Expand Down
43 changes: 42 additions & 1 deletion src/sys/socket/mod.rs
Expand Up @@ -463,6 +463,18 @@ pub enum ControlMessageOwned {
target_os = "openbsd",
))]
Ipv4RecvDstAddr(libc::in_addr),

/// UDP Generic Receive Offload (GRO) allows receiving multiple UDP
/// packets from a single sender.
/// Fixed-size payloads are following one by one in a receive buffer.
/// This Control Message indicates the size of all smaller packets,
/// except, maybe, the last one.
///
/// `UdpGroSegment` socket option should be enabled on a socket
/// to allow receiving GRO packets.
#[cfg(target_os = "linux")]
UdpGroSegments(u16),

/// Catch-all variant for unimplemented cmsg types.
#[doc(hidden)]
Unknown(UnknownCmsg),
Expand Down Expand Up @@ -546,6 +558,11 @@ impl ControlMessageOwned {
let dl = ptr::read_unaligned(p as *const libc::in_addr);
ControlMessageOwned::Ipv4RecvDstAddr(dl)
},
#[cfg(target_os = "linux")]
(libc::SOL_UDP, libc::UDP_GRO) => {
let gso_size: u16 = ptr::read_unaligned(p as *const _);
ControlMessageOwned::UdpGroSegments(gso_size)
},
(_, _) => {
let sl = slice::from_raw_parts(p, len);
let ucmsg = UnknownCmsg(*header, Vec::<u8>::from(&sl[..]));
Expand Down Expand Up @@ -617,6 +634,16 @@ pub enum ControlMessage<'a> {
))]
AlgSetAeadAssoclen(&'a u32),

/// UDP GSO makes it possible for applications to generate network packets
/// for a virtual MTU much greater than the real one.
/// The length of the send data no longer matches the expected length on
/// the wire.
/// The size of the datagram payload as it should appear on the wire may be
/// passed through this control message.
/// Send buffer should consist of multiple fixed-size wire payloads
/// following one by one, and the last, possibly smaller one.
#[cfg(target_os = "linux")]
UdpGsoSegments(&'a u16),
}

// An opaque structure used to prevent cmsghdr from being a public type
Expand Down Expand Up @@ -687,6 +714,10 @@ impl<'a> ControlMessage<'a> {
ControlMessage::AlgSetAeadAssoclen(len) => {
len as *const _ as *const u8
},
#[cfg(target_os = "linux")]
ControlMessage::UdpGsoSegments(gso_size) => {
gso_size as *const _ as *const u8
},
};
unsafe {
ptr::copy_nonoverlapping(
Expand Down Expand Up @@ -719,6 +750,10 @@ impl<'a> ControlMessage<'a> {
ControlMessage::AlgSetAeadAssoclen(len) => {
mem::size_of_val(len)
},
#[cfg(target_os = "linux")]
ControlMessage::UdpGsoSegments(gso_size) => {
mem::size_of_val(gso_size)
},
}
}

Expand All @@ -730,7 +765,9 @@ impl<'a> ControlMessage<'a> {
ControlMessage::ScmCredentials(_) => libc::SOL_SOCKET,
#[cfg(any(target_os = "android", target_os = "linux"))]
ControlMessage::AlgSetIv(_) | ControlMessage::AlgSetOp(_) |
ControlMessage::AlgSetAeadAssoclen(_) => libc::SOL_ALG ,
ControlMessage::AlgSetAeadAssoclen(_) => libc::SOL_ALG,
#[cfg(target_os = "linux")]
ControlMessage::UdpGsoSegments(_) => libc::SOL_UDP,
}
}

Expand All @@ -752,6 +789,10 @@ impl<'a> ControlMessage<'a> {
ControlMessage::AlgSetAeadAssoclen(_) => {
libc::ALG_SET_AEAD_ASSOCLEN
},
#[cfg(target_os = "linux")]
ControlMessage::UdpGsoSegments(_) => {
libc::UDP_SEGMENT
},
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/sys/socket/sockopt.rs
Expand Up @@ -308,7 +308,10 @@ sockopt_impl!(Both, Ipv4RecvIf, libc::IPPROTO_IP, libc::IP_RECVIF, bool);
target_os = "openbsd",
))]
sockopt_impl!(Both, Ipv4RecvDstAddr, libc::IPPROTO_IP, libc::IP_RECVDSTADDR, bool);

#[cfg(target_os = "linux")]
sockopt_impl!(Both, UdpGsoSegment, libc::SOL_UDP, libc::UDP_SEGMENT, libc::c_int);
#[cfg(target_os = "linux")]
sockopt_impl!(Both, UdpGroSegment, libc::IPPROTO_UDP, libc::UDP_GRO, bool);

#[cfg(any(target_os = "android", target_os = "linux"))]
#[derive(Copy, Clone, Debug)]
Expand Down
88 changes: 83 additions & 5 deletions test/sys/test_socket.rs
Expand Up @@ -169,8 +169,10 @@ mod recvfrom {

const MSG: &'static [u8] = b"Hello, World!";

fn sendrecv<F>(rsock: RawFd, ssock: RawFd, f: F) -> Option<SockAddr>
where F: Fn(RawFd, &[u8], MsgFlags) -> Result<usize> + Send + 'static
fn sendrecv<Fs, Fr>(rsock: RawFd, ssock: RawFd, f_send: Fs, mut f_recv: Fr) -> Option<SockAddr>
where
Fs: Fn(RawFd, &[u8], MsgFlags) -> Result<usize> + Send + 'static,
Fr: FnMut(usize, Option<SockAddr>),
{
let mut buf: [u8; 13] = [0u8; 13];
let mut l = 0;
Expand All @@ -179,12 +181,13 @@ mod recvfrom {
let send_thread = thread::spawn(move || {
let mut l = 0;
while l < std::mem::size_of_val(MSG) {
l += f(ssock, &MSG[l..], MsgFlags::empty()).unwrap();
l += f_send(ssock, &MSG[l..], MsgFlags::empty()).unwrap();
}
});

while l < std::mem::size_of_val(MSG) {
let (len, from_) = recvfrom(rsock, &mut buf[l..]).unwrap();
f_recv(len, from_);
from = from_;
l += len;
}
Expand All @@ -200,7 +203,7 @@ mod recvfrom {
// Ignore from for stream sockets
let _ = sendrecv(fd1, fd2, |s, m, flags| {
send(s, m, flags)
});
}, |_, _| {});
}

#[test]
Expand All @@ -222,10 +225,85 @@ mod recvfrom {
).expect("send socket failed");
let from = sendrecv(rsock, ssock, move |s, m, flags| {
sendto(s, m, &sock_addr, flags)
});
},|_, _| {});
// UDP sockets should set the from address
assert_eq!(AddressFamily::Inet, from.unwrap().family());
}

#[cfg(target_os = "linux")]
mod udp_offload {
use super::*;
use nix::sys::uio::IoVec;
use nix::sys::socket::sockopt::{UdpGroSegment, UdpGsoSegment};

#[test]
pub fn gso() {
require_kernel_version!(udp_offload::gso, ">= 4.18");

// In this test, we send the data and provide a GSO segment size.
// Since we are sending the buffer of size 13, six UDP packets
// with size 2 and two UDP packet with size 1 will be sent.
let segment_size: u16 = 2;

let std_sa = SocketAddr::from_str("127.0.0.1:6791").unwrap();
let inet_addr = InetAddr::from_std(&std_sa);
let sock_addr = SockAddr::new_inet(inet_addr);
let rsock = socket(AddressFamily::Inet,
SockType::Datagram,
SockFlag::empty(),
None
).unwrap();

setsockopt(rsock, UdpGsoSegment, &(segment_size as _))
.expect("setsockopt UDP_SEGMENT failed");

bind(rsock, &sock_addr).unwrap();
let ssock = socket(
AddressFamily::Inet,
SockType::Datagram,
SockFlag::empty(),
None,
).expect("send socket failed");

let mut num_packets_received: i32 = 0;

sendrecv(rsock, ssock, move |s, m, flags| {
let iov = [IoVec::from_slice(m)];
let cmsg = ControlMessage::UdpGsoSegments(&segment_size);
sendmsg(s, &iov, &[cmsg], flags, Some(&sock_addr))
}, {
let num_packets_received_ref = &mut num_packets_received;

move |len, _| {
// check that we receive UDP packets with payload size
// less or equal to segment size
assert!(len <= segment_size as usize);
*num_packets_received_ref += 1;
}
});

// Buffer size is 13, we will receive six packets of size 2,
// and one packet of size 1.
assert_eq!(7, num_packets_received);
}

#[test]
pub fn gro() {
require_kernel_version!(udp_offload::gro, ">= 5.3");

// It's hard to guarantee receiving GRO packets. Just checking
// that `setsockopt` doesn't fail with error

let rsock = socket(AddressFamily::Inet,
SockType::Datagram,
SockFlag::empty(),
None
).unwrap();

setsockopt(rsock, UdpGroSegment, &true)
.expect("setsockopt UDP_GRO failed");
}
}
}

// Test error handling of our recvmsg wrapper
Expand Down
37 changes: 37 additions & 0 deletions test/test.rs
Expand Up @@ -12,6 +12,7 @@ extern crate rand;
#[cfg(target_os = "freebsd")]
extern crate sysctl;
extern crate tempfile;
extern crate semver;

cfg_if! {
if #[cfg(any(target_os = "android", target_os = "linux"))] {
Expand Down Expand Up @@ -100,6 +101,42 @@ cfg_if! {
}
}

cfg_if! {
if #[cfg(any(target_os = "android", target_os = "linux"))] {
macro_rules! require_kernel_version {
($name:expr, $version_requirement:expr) => {
use ::std::io::Write;
use semver::{Version, VersionReq};

let version_requirement = VersionReq::parse($version_requirement)
.expect("Bad match_version provided");

let uname = nix::sys::utsname::uname();

let mut version = Version::parse(uname.release()).unwrap();

//Keep only numeric parts
version.pre.clear();
version.build.clear();

if !version_requirement.matches(&version) {
let stderr = ::std::io::stderr();
let mut handle = stderr.lock();

writeln!(handle,
"Skip {} because kernel version `{}` doesn't match the requirement `{}`",
stringify!($name), version, version_requirement).unwrap();
return;
}
}
}
} else {
macro_rules! require_kernel_version {
($name:expr) => {}
}
}
}

mod sys;
mod test_dir;
mod test_fcntl;
Expand Down

0 comments on commit 490e979

Please sign in to comment.