diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index b27c60bf6c7..12d6a0f5fd1 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -243,6 +243,30 @@ impl AsyncFd { Self::new_with_handle_and_interest(inner, scheduler::Handle::current(), interest) } + /// Create a new AsyncFd with the provided raw epoll flags for registration. + /// + /// These flags replace any epoll flags would normally set when registering the fd. + /// + /// # Note + /// This API does not support the use of `EPOLLONESHOT`. + /// Users are strongly advised to use `EPOLLET` to prevent the tokio IO driver from receiving + /// spurious wakes. + /// + /// # Stability + /// This is an [unstable API][unstable]. The public API of this may break in 1.x releases. + /// See [the documentation on unstable features][unstable] for details. + /// + /// [unstable]: crate#unstable-features + #[track_caller] + #[cfg(all(target_os = "linux", tokio_unstable))] + #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, target_os = "linux"))))] + pub fn with_epoll_flags(inner: T, flags: u32) -> io::Result + where + T: AsRawFd, + { + Self::new_with_handle_and_flags(inner, scheduler::Handle::current(), flags) + } + #[track_caller] pub(crate) fn new_with_handle_and_interest( inner: T, @@ -260,6 +284,21 @@ impl AsyncFd { }) } + #[track_caller] + #[cfg(all(target_os = "linux", tokio_unstable))] + pub(crate) fn new_with_handle_and_flags( + mut inner: T, + handle: scheduler::Handle, + flags: u32, + ) -> io::Result { + let registration = Registration::new_with_flags_and_handle(&mut inner, flags, handle)?; + + Ok(AsyncFd { + registration, + inner: Some(inner), + }) + } + /// Returns a shared reference to the backing object of this [`AsyncFd`]. #[inline] pub fn get_ref(&self) -> &T { diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index cb5bffd54a9..c1e221935cb 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -68,6 +68,68 @@ cfg_io_driver! { } } +#[cfg(target_os = "linux")] +impl PollEvented { + /// Creates a new `PollEvented` associated with the default reactor. + /// + /// The returned `PollEvented` has readable and writable interests. For more control, use + /// [`Self::new_with_interest`]. + /// + /// This will also use additional provided raw flags for epoll. + /// + /// # Panics + /// + /// This function panics if thread-local runtime is not set. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + #[track_caller] + #[cfg_attr(feature = "signal", allow(unused))] + #[cfg(all(target_os = "linux", tokio_unstable))] + pub(crate) fn new_raw(io: E, flags: u32) -> io::Result { + PollEvented::new_with_interest_raw(io, flags) + } + + /// Creates a new `PollEvented` associated with the default reactor, for + /// specific `Interest` state. `new_with_interest` should be used over `new` + /// when you need control over the readiness state, such as when a file + /// descriptor only allows reads. This does not add `hup` or `error` so if + /// you are interested in those states, you will need to add them to the + /// readiness state passed to this function. + /// + /// This will also use additional provided raw flags for epoll. + /// + /// # Panics + /// + /// This function panics if thread-local runtime is not set. + /// + /// The runtime is usually set implicitly when this function is called from + /// a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) + /// function. + #[track_caller] + #[cfg_attr(feature = "signal", allow(unused))] + #[cfg(all(target_os = "linux", tokio_unstable))] + pub(crate) fn new_with_interest_raw(io: E, flags: u32) -> io::Result { + Self::new_with_interest_and_handle_raw(io, flags, scheduler::Handle::current()) + } + + #[track_caller] + #[cfg(all(target_os = "linux", tokio_unstable))] + pub(crate) fn new_with_interest_and_handle_raw( + mut io: E, + flags: u32, + handle: scheduler::Handle, + ) -> io::Result { + let registration = Registration::new_with_flags_and_handle(&mut io, flags, handle)?; + Ok(Self { + io: Some(io), + registration, + }) + } +} + // ===== impl PollEvented ===== impl PollEvented { diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index f1befac26dc..0c6c1792a10 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -240,6 +240,32 @@ impl TcpListener { Ok(TcpListener { io }) } + /// Create a new TcpListener with the provided raw epoll flags. + /// + /// These flags replace any epoll flags would normally set when registering the fd. + /// + /// # Note + /// This API does not support the use of `EPOLLONESHOT`. + /// Users are strongly advised to use `EPOLLET` to prevent the tokio IO driver from receiving + /// spurious wakes. + /// + /// # Stability + /// This is an [unstable API][unstable]. The public API of this may break in 1.x releases. + /// See [the documentation on unstable features][unstable] for details. + /// + /// [unstable]: crate#unstable-features + #[track_caller] + #[cfg(all(target_os = "linux", tokio_unstable))] + #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, target_os = "linux"))))] + pub fn from_std_with_epoll_flags( + listener: net::TcpListener, + flags: u32, + ) -> io::Result { + let io = mio::net::TcpListener::from_std(listener); + let io = PollEvented::new_raw(io, flags)?; + Ok(TcpListener { io }) + } + /// Turns a [`tokio::net::TcpListener`] into a [`std::net::TcpListener`]. /// /// The returned [`std::net::TcpListener`] will have nonblocking mode set as diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index a7e9115eadd..1686b59e982 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -114,6 +114,32 @@ impl UnixListener { Ok(UnixListener { io }) } + /// Create a new UnixListener with the provided raw epoll flags. + /// + /// These flags replace any epoll flags would normally set when registering the fd. + /// + /// # Note + /// This API does not support the use of `EPOLLONESHOT`. + /// Users are strongly advised to use `EPOLLET` to prevent the tokio IO driver from receiving + /// spurious wakes. + /// + /// # Stability + /// This is an [unstable API][unstable]. The public API of this may break in 1.x releases. + /// See [the documentation on unstable features][unstable] for details. + /// + /// [unstable]: crate#unstable-features + #[track_caller] + #[cfg(all(target_os = "linux", tokio_unstable))] + #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, target_os = "linux"))))] + pub fn from_std_with_epoll_flags( + listener: net::UnixListener, + flags: u32, + ) -> io::Result { + let io = mio::net::UnixListener::from_std(listener); + let io = PollEvented::new_raw(io, flags)?; + Ok(UnixListener { io }) + } + /// Turns a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`]. /// /// The returned [`std::os::unix::net::UnixListener`] will have nonblocking mode diff --git a/tokio/src/runtime/io/driver.rs b/tokio/src/runtime/io/driver.rs index d3055862e25..c6d84274d9d 100644 --- a/tokio/src/runtime/io/driver.rs +++ b/tokio/src/runtime/io/driver.rs @@ -236,6 +236,54 @@ impl Handle { Ok(scheduled_io) } + /// Registers an I/O resource with the reactor, bypassing Mio entirely and using raw epoll_ctl. + /// + /// This is more or less the copy-pasted from the Mio code, and exists so that we can use flags + /// other than the base set of epoll ones we normally use and those representing interests. + /// + /// This is important for supporting things like `EPOLLEXCLUSIVE`, which is very useful for + /// shared-nothing runtimes. + /// + /// The registries token is returned. + #[cfg(all(target_os = "linux", tokio_unstable))] + pub(super) fn add_source_raw( + &self, + source: &mut impl std::os::unix::io::AsRawFd, + flags: u32, + ) -> io::Result> { + use libc::EPOLLET; + use std::os::unix::io::AsRawFd; + + let events = EPOLLET as u32 | flags; + + let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?; + let token = scheduled_io.token(); + + let mut event = libc::epoll_event { + events, + u64: usize::from(token) as u64, + }; + + // TODO: if this returns an err, the `ScheduledIo` leaks... + let res = unsafe { + libc::epoll_ctl( + self.registry.as_raw_fd(), + libc::EPOLL_CTL_ADD, + source.as_raw_fd(), + &mut event, + ) + }; + + if res == -1 { + return Err(std::io::Error::last_os_error()); + } + + // TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList` + self.metrics.incr_fd_count(); + + Ok(scheduled_io) + } + /// Deregisters an I/O resource from the reactor. pub(super) fn deregister_source( &self, diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index 759589863eb..3b9827bab9f 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -80,6 +80,31 @@ impl Registration { Ok(Registration { handle, shared }) } + /// Registers the I/O resource with the reactor for the provided handle, for + /// a specific `Interest`, *with a set of raw additional epoll flags*. This does not add `hup` + /// or `error` so if you are interested in those states, you will need to add them to the + /// readiness state passed to this function. + /// + /// # Return + /// + /// - `Ok` if the registration happened successfully + /// - `Err` if an error was encountered during registration + #[cfg(all(target_os = "linux", tokio_unstable))] + pub(crate) fn new_with_flags_and_handle( + io: &mut impl std::os::unix::io::AsRawFd, + flags: u32, + handle: scheduler::Handle, + ) -> io::Result { + debug_assert!( + (flags & libc::EPOLLONESHOT as u32) == 0, + "Tokio does not support using EPOLLONESHOT in user-specified epoll flags" + ); + + let shared = handle.driver().io().add_source_raw(io, flags)?; + + Ok(Registration { handle, shared }) + } + /// Deregisters the I/O resource from the reactor it is associated with. /// /// This function must be called before the I/O resource associated with the diff --git a/tokio/tests/epollexclusive.rs b/tokio/tests/epollexclusive.rs new file mode 100644 index 00000000000..5887598fbb2 --- /dev/null +++ b/tokio/tests/epollexclusive.rs @@ -0,0 +1,104 @@ +#![cfg(all( + target_os = "linux", + feature = "net", + feature = "rt", + feature = "sync", + feature = "macros", + feature = "time", + tokio_unstable, +))] + +use std::sync::Arc; +use std::thread; +use tokio::sync::Barrier; + +const NUM_WORKERS: usize = 8; +const NUM_CONNECTIONS: u64 = 32; + +const FUDGE_MIN: f64 = 0.75; +const FUDGE_MAX: f64 = 1.25; + +#[test] +fn epoll_exclusive() { + let value = count_accepts_with_flags(NUM_WORKERS, NUM_CONNECTIONS, libc::EPOLLEXCLUSIVE as u32); + + let actual_to_expected_ratio = value as f64 / NUM_CONNECTIONS as f64; + + assert!( + actual_to_expected_ratio >= FUDGE_MIN && actual_to_expected_ratio <= FUDGE_MAX, + "expected fuzzy {}, got {}", + NUM_CONNECTIONS, + value + ); +} + +fn count_accepts_with_flags(workers: usize, connections: u64, flags: u32) -> u64 { + let barrier = Arc::new(Barrier::new(workers as usize + 1)); + + let mut handles = Vec::with_capacity(workers); + + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + + let listener_addr = listener.local_addr().unwrap(); + + for _ in 0..workers { + let local_listener = listener.try_clone().unwrap(); + let local_barrier = barrier.clone(); + + handles.push(thread::spawn(move || { + count_accepts(local_listener, flags | libc::EPOLLIN as u32, local_barrier) + })) + } + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + barrier.wait().await; + + for _ in 0..connections { + tokio::net::TcpStream::connect(listener_addr).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + barrier.wait().await; + }); + + let mut num_accepts_total = 0; + + for handle in handles { + num_accepts_total += handle.join().unwrap(); + } + + num_accepts_total +} + +fn count_accepts(std: std::net::TcpListener, flags: u32, barrier: Arc) -> u64 { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + std.set_nonblocking(true).unwrap(); + + let listener = tokio::net::TcpListener::from_std_with_epoll_flags(std, flags).unwrap(); + + barrier.wait().await; + + let mut barr_wait = std::pin::pin!(barrier.wait()); + + loop { + tokio::select! { + _ = &mut barr_wait => { + return tokio::runtime::Handle::current().metrics().io_driver_ready_count(); + } + a = listener.accept() => { + a.unwrap(); + } + } + } + }) +}