Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sendmmsg() / recvmmsg() #1017

Closed
wants to merge 27 commits into from
Closed

Add sendmmsg() / recvmmsg() #1017

wants to merge 27 commits into from

Conversation

dholroyd
Copy link

This is my initial go at adding sendmmsg() / recvmmsg().

I am planning on updating this pull request with at least support for control messages, but would welcome any other feedback in the meantime.

Copy link
Member

@asomers asomers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks ok so far, but it definitely needs cmsg support. And it needs to enable all relevant platforms.

CHANGELOG.md Show resolved Hide resolved
src/sys/socket/mod.rs Outdated Show resolved Hide resolved
src/sys/socket/mod.rs Show resolved Hide resolved
src/sys/socket/mod.rs Outdated Show resolved Hide resolved
src/sys/socket/mod.rs Outdated Show resolved Hide resolved
test/sys/test_socket.rs Outdated Show resolved Hide resolved
test/sys/test_socket.rs Outdated Show resolved Hide resolved
test/sys/test_socket.rs Outdated Show resolved Hide resolved
test/sys/test_socket.rs Outdated Show resolved Hide resolved
@asomers
Copy link
Member

asomers commented Jan 26, 2019

Rather than disabling this feature on FreeBSD and NetBSD, you should submit a PR to libc to add the definitions there. When you do, check other OSes too.

@dholroyd
Copy link
Author

Ok, I will try to have a go rust-lang/libc#1231

bors added a commit to rust-lang/libc that referenced this pull request Jan 28, 2019
Broader sendmmsg() / recvmmsg() support

As a prerequisite for getting `sendmmsg()` / `recvmmsg()` into nix ( nix-rust/nix#1017 ), support for non-linux platforms needs to be added in libc.

The initial commits in this PR will just be to test out target support via CI.
bors added a commit to rust-lang/libc that referenced this pull request Jan 28, 2019
Broader sendmmsg() / recvmmsg() support

As a prerequisite for getting `sendmmsg()` / `recvmmsg()` into nix ( nix-rust/nix#1017 ), support for non-linux platforms needs to be added in libc.

The initial commits in this PR will just be to test out target support via CI.
bors added a commit to rust-lang/libc that referenced this pull request Feb 4, 2019
Broader sendmmsg() / recvmmsg() support

As a prerequisite for getting `sendmmsg()` / `recvmmsg()` into nix ( nix-rust/nix#1017 ), support for non-linux platforms needs to be added in libc.

The initial commits in this PR will just be to test out target support via CI.
bors added a commit to rust-lang/libc that referenced this pull request Feb 16, 2019
Broader sendmmsg() / recvmmsg() support

As a prerequisite for getting `sendmmsg()` / `recvmmsg()` into nix ( nix-rust/nix#1017 ), support for non-linux platforms needs to be added in libc.

The initial commits in this PR will just be to test out target support via CI.
bors added a commit to rust-lang/libc that referenced this pull request Feb 18, 2019
Broader sendmmsg() / recvmmsg() support

As a prerequisite for getting `sendmmsg()` / `recvmmsg()` into nix ( nix-rust/nix#1017 ), support for non-linux platforms needs to be added in libc.

The initial commits in this PR will just be to test out target support via CI.
Before, the caller would have been allowed to move the storage of the
slice to which the mmsghdr within RecvMMsgHdr holds a pointer.

This took me ages to track down :(
(clippy dislikes mem::transmute, so I'm just doing as it asks)
@@ -9,6 +9,20 @@ use std::{fmt, mem, ptr, slice};
use std::os::unix::io::RawFd;
use sys::time::TimeVal;
use sys::uio::IoVec;
#[cfg(any(
target_os = "linux",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent for consistency's sake.

@@ -17,6 +17,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).
([#969](https://github.com/nix-rust/nix/pull/969))
- Add several errno constants from OpenBSD 6.2
([#1036](https://github.com/nix-rust/nix/pull/1036))
- Added support for sendmmsg() / recvmmsg() on Linux
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description is blisfully out-of-date. You can remove the "on Linux" part.

@@ -882,6 +882,29 @@ impl SockAddr {
SockAddr::Link(LinkAddr(ref ether_addr)) => (mem::transmute(ether_addr), mem::size_of::<libc::sockaddr_dl>() as libc::socklen_t),
}
}

pub unsafe fn as_ffi_pair_mut(&mut self) -> (&mut libc::sockaddr, libc::socklen_t) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicative. Can you think of any way to share code between this method and as_ffi_pair? Also, you should wrap to 80 cols.

target_os = "netbsd",
target_os = "android",
))]
pub fn sendmmsg(fd: RawFd, msgvec: &mut[SendMMsgHdr]) -> Result<usize> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very complicated function to use. Could you provide an example? Also, why no flags argument?

@dholroyd
Copy link
Author

Just want to say that I plan to address all the comments you've kindly provided Alan; all look reasonable.

I'm currently trying to integrate the API in the form from my branch into a higher-level API within actual application code, in hopes that exercise might knock off some rough edges here.

fd,
msgvec.as_mut_ptr() as *mut libc::mmsghdr,
msgvec.len() as _,
flags.bits(),
Copy link

@gnzlbg gnzlbg May 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: on musl the flag argument is an unsigned int, while on gnu, it is a signed int, so one probably needs an as _ here as well.

@kawakami-o3
Copy link

What are you struggling with, @dholroyd ? I want to help you.

@dholroyd
Copy link
Author

What are you struggling with, @dholroyd ?

There are the usual issues of time and priorities, but further than that, I think the interfaces I made don't work very well:

  • An app that would benefit from these optimisations (vs non-batched sendmsg/recvmsg) will also want to avoid allocations / copies per packet
  • RecvMMsgHdr and SendMMsgHdr both borrow the underlying buffers, so while these instances exist the underlying buffers can't be modified
  • modifying underlying buffers in between sendmmsg/recvmmsg calls is something that applications will naturally need to do (e.g. so as to be able to supply different payloads on each call to sendmmsg)
  • I think calling code has to drop the sets of RecvMMsgHdr and SendMMsgHdr instances in between each call in order to regain mutable access to underlying buffers, and will have to recreate new instances for subsequent calls, which is a drain on efficiency

In order to try and deal with these problems I tried to come up with an interface that ends up looking significantly different than sendmmsg/recvmmsg while fixing these problems and being safe. I didn't finish that exercise either, but here's a sketch of the WIP (quoted out of context, so it will not work as-is). My idea was to provide a datastruture to own the underlying libc::mmsghdr instances and allow them to be safely reused -- hiding complexity that the previous implementation forces on the caller. I'm not sure if such a 'thick' layer on top of sendmmsg/recvmmsg is in keeping with nix philosophy though?

struct RecvPk<P: Packet> {
    packet: P,
    iovec: [ libc::iovec; 1],
    _pin: marker::PhantomPinned,
}
impl<P: Packet> RecvPk<P> {
    fn new(mut packet: P) -> RecvPk<P> {
        let pay = packet.payload_mut();
        let iov = libc::iovec {
            iov_base: pay.as_ptr() as *mut libc::c_void,
            iov_len: pay.len() as libc::size_t,
        };
        let mut result = RecvPk {
            packet,
            iovec: [ iov ],
            _pin: marker::PhantomPinned
        };
        result
    }

    fn into_packet(self, len: usize) -> P {
        let mut pk = self.packet;
        pk.truncate(len);
        pk
    }
}

pub struct RecvQueue<BP: BufferPool> {
    buffer_pool: BP,
    // TODO: use a type with a similar interface that guarantees stable addresses for the contained
    //       items (VecDeque could relocate items if capacity is exceeded, which we do take care to
    //       avoid).
    recv_packets: VecDeque<RecvPk<BP::P>>,
    // storage can't be a VecDeque, since we will need to pass a contiguous slice
    // to recevmmsg()
    storage: SliceDeque<libc::mmsghdr>,
    hist: Vec<u64>,
    count: usize,
}
impl<BP: BufferPool> RecvQueue<BP> {
    pub fn new(buffer_pool: BP) -> RecvQueue<BP> {
        let mut queue = RecvQueue {
            buffer_pool,
            recv_packets: VecDeque::with_capacity(BATCH_SIZE),
            storage: SliceDeque::with_capacity(BATCH_SIZE),
            hist: vec![0; BATCH_SIZE],
            count: 0,
        };
        queue.replenish();
        queue
    }

    fn replenish(&mut self) {
        for _ in self.storage.len()..BATCH_SIZE {
            self.add(self.buffer_pool.allocate().unwrap(/*FIXME*/));
        }
        // if the list which contains the iovec elements does not have the expected capacity, this
        // likely means it has been reallocated, and the pointers from the members of the 'storage'
        // Vec will now point at random memory; try to detect this and fail fast!
        assert_eq!(self.recv_packets.len(), BATCH_SIZE);
        assert_eq!(self.storage.len(), BATCH_SIZE);
    }
    fn add(&mut self, packet: BP::P) {
        let pk = RecvPk::new(packet);
        self.recv_packets.push_back(pk);
        let pk = self.recv_packets.get_mut(self.recv_packets.len()-1).unwrap();
        unsafe {
            self.storage.push_back(mem::uninitialized());
        }
        let mut hdr = self.storage.last_mut().unwrap();
        let flags = 0;
        hdr.msg_hdr.msg_control = ptr::null_mut();
        hdr.msg_hdr.msg_controllen = 0;
        hdr.msg_hdr.msg_flags = flags;
        hdr.msg_hdr.msg_iov = pk.iovec.as_mut_ptr() as *mut libc::iovec;
        hdr.msg_hdr.msg_iovlen = pk.iovec.len() as _;
        hdr.msg_hdr.msg_name = ptr::null_mut();
        hdr.msg_hdr.msg_namelen = 0;
        hdr.msg_len = 0;

    }

    pub fn recvbatch<'a>(&'a mut self, sock: &mio::net::UdpSocket) -> Result<(Batch, Option<impl Iterator<Item = BP::P> + 'a>), io::Error> {
        self.replenish();
        let msgvec = self.storage[..].as_mut_ptr();
        let vlen = self.storage.len() as u32;
        let flags = 0;
        let timeout = ptr::null_mut();
        let res = unsafe { libc::recvmmsg(sock.as_raw_fd(), msgvec, vlen, flags, timeout) };
        let size = match nix::errno::Errno::result(res) {
            Ok(s) => s as usize,
            Err(nix::Error::Sys(errno)) if errno == nix::errno::Errno::EAGAIN => {
                return Ok((Batch::Exhausted, None));
            }
            Err(nix::Error::Sys(e)) => return Err(io::Error::from(e)),
            Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.description())),
        };
        self.hist[size - 1] += 1;
        self.count += 1;
        if self.count & 0b111111111111 == 0 {
            println!("stats: {:?}", self.hist);
            self.hist.iter_mut().for_each(|v| *v = 0 )
        }
        let pks = self.recv_packets.drain(0..size);
        // we want SliceDeque::drain(), but that's unstable so we can't use it.  This hack with
        // repeat_with() lacks the guarantees that drain() would give about actually dropping
        // the items no matter if the caller actually consumes the iterator; it's therefore not
        // really a good enough solution 'for real', but lets us benchmark
        /*let store = &mut self.storage;
        let hdrs = iter::repeat_with(move || { store.remove(0) } ).take(size);
        //let hdrs = self.storage.drain(0..size);
        let result = hdrs.zip(pks).map(|(hdr, pk)| {
            pk.into_packet(hdr.msg_len as usize)
        });*/
        let result = Drain { iter: pks, store: &mut self.storage };

        if size < BATCH_SIZE {
            // either there were no more packets to receive (so it's legitimate to
            // avoid calling recvmmsg() again any avoid getting EAGAIN) or an error
            // occurred after receiving at least one packet.  Since we can't
            // differentiate these two cases, and we would like to avoid the extra
            // syscall that will immediately get EAGAIN, we break out of the recv loop
            // here, and rely on the socket having been registered with MIO using
            // PollOpt::level() to let any remaining buffered packets be read on
            // another iteration of the overall event loop,
            Ok((Batch::Exhausted, Some(result)))
        } else {
            Ok((Batch::Continues, Some(result)))
        }
    }
}
struct Drain<'a, P: Packet> {
    iter: std::collections::vec_deque::Drain<'a, RecvPk<P>>,
    store: &'a mut SliceDeque<libc::mmsghdr>,
}
impl<'a, P: Packet> Iterator for Drain<'a, P> {
    type Item = P;

    fn next(&mut self) -> Option<Self::Item> {
        let pk = self.iter.next()?;
        let hdr = &self.store[0];
        let result = pk.into_packet(hdr.msg_len as usize);
        self.store.remove(0);
        Some(result)
    }
}

#[derive(PartialEq, Debug)]
pub enum Batch {
    Continues,
    Exhausted,
}

@kawakami-o3
Copy link

Thank you for explaining the current state. Honestly, I'm strugling to implement a wrapper for recvmmsg(). Your idea is very insightful, although it needs dicision making. I will also try to make a proof of concept.

@dholroyd
Copy link
Author

Superseded by #1208

@dholroyd dholroyd closed this Jun 28, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants