Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

driver: add support for registering file descriptors with user-specified flags #6089

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
39 changes: 39 additions & 0 deletions tokio/src/io/async_fd.rs
Expand Up @@ -243,6 +243,30 @@ impl<T: AsRawFd> AsyncFd<T> {
Self::new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
}

/// Create a new AsyncFd with the provided raw epoll flags for registration.
///
/// These flags replace any epoll flags would normally set when registering the fd.
///
/// # Note
/// This API does not support the use of `EPOLLONESHOT`.
/// Users are strongly advised to use `EPOLLET` to prevent the tokio IO driver from receiving
/// spurious wakes.
///
/// # Stability
/// This is an [unstable API][unstable]. The public API of this may break in 1.x releases.
/// See [the documentation on unstable features][unstable] for details.
///
/// [unstable]: crate#unstable-features
#[track_caller]
#[cfg(all(target_os = "linux", tokio_unstable))]
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, target_os = "linux"))))]
pub fn with_epoll_flags(inner: T, flags: u32) -> io::Result<Self>
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
where
T: AsRawFd,
{
Self::new_with_handle_and_flags(inner, scheduler::Handle::current(), flags)
}

#[track_caller]
pub(crate) fn new_with_handle_and_interest(
inner: T,
Expand All @@ -260,6 +284,21 @@ impl<T: AsRawFd> AsyncFd<T> {
})
}

#[track_caller]
#[cfg(all(target_os = "linux", tokio_unstable))]
pub(crate) fn new_with_handle_and_flags(
mut inner: T,
handle: scheduler::Handle,
flags: u32,
) -> io::Result<Self> {
let registration = Registration::new_with_flags_and_handle(&mut inner, flags, handle)?;

Ok(AsyncFd {
registration,
inner: Some(inner),
})
}

/// Returns a shared reference to the backing object of this [`AsyncFd`].
#[inline]
pub fn get_ref(&self) -> &T {
Expand Down
62 changes: 62 additions & 0 deletions tokio/src/io/poll_evented.rs
Expand Up @@ -68,6 +68,68 @@ cfg_io_driver! {
}
}

#[cfg(target_os = "linux")]
impl<E: std::os::unix::io::AsRawFd + Source> PollEvented<E> {
/// Creates a new `PollEvented` associated with the default reactor.
///
/// The returned `PollEvented` has readable and writable interests. For more control, use
/// [`Self::new_with_interest`].
///
/// This will also use additional provided raw flags for epoll.
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[track_caller]
#[cfg_attr(feature = "signal", allow(unused))]
#[cfg(all(target_os = "linux", tokio_unstable))]
pub(crate) fn new_raw(io: E, flags: u32) -> io::Result<Self> {
PollEvented::new_with_interest_raw(io, flags)
}

/// Creates a new `PollEvented` associated with the default reactor, for
/// specific `Interest` state. `new_with_interest` should be used over `new`
/// when you need control over the readiness state, such as when a file
/// descriptor only allows reads. This does not add `hup` or `error` so if
/// you are interested in those states, you will need to add them to the
/// readiness state passed to this function.
///
/// This will also use additional provided raw flags for epoll.
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
///
/// The runtime is usually set implicitly when this function is called from
/// a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter)
/// function.
#[track_caller]
#[cfg_attr(feature = "signal", allow(unused))]
#[cfg(all(target_os = "linux", tokio_unstable))]
pub(crate) fn new_with_interest_raw(io: E, flags: u32) -> io::Result<Self> {
Self::new_with_interest_and_handle_raw(io, flags, scheduler::Handle::current())
}

#[track_caller]
#[cfg(all(target_os = "linux", tokio_unstable))]
pub(crate) fn new_with_interest_and_handle_raw(
mut io: E,
flags: u32,
handle: scheduler::Handle,
) -> io::Result<Self> {
let registration = Registration::new_with_flags_and_handle(&mut io, flags, handle)?;
Ok(Self {
io: Some(io),
registration,
})
}
}

// ===== impl PollEvented =====

impl<E: Source> PollEvented<E> {
Expand Down
26 changes: 26 additions & 0 deletions tokio/src/net/tcp/listener.rs
Expand Up @@ -240,6 +240,32 @@ impl TcpListener {
Ok(TcpListener { io })
}

/// Create a new TcpListener with the provided raw epoll flags.
///
/// These flags replace any epoll flags would normally set when registering the fd.
///
/// # Note
/// This API does not support the use of `EPOLLONESHOT`.
/// Users are strongly advised to use `EPOLLET` to prevent the tokio IO driver from receiving
/// spurious wakes.
///
/// # Stability
/// This is an [unstable API][unstable]. The public API of this may break in 1.x releases.
/// See [the documentation on unstable features][unstable] for details.
///
/// [unstable]: crate#unstable-features
#[track_caller]
#[cfg(all(target_os = "linux", tokio_unstable))]
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, target_os = "linux"))))]
pub fn from_std_with_epoll_flags(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not have these methods on TcpListener. Consider adding them to TcpSocket instead as that is the socket builder API.

listener: net::TcpListener,
flags: u32,
) -> io::Result<TcpListener> {
let io = mio::net::TcpListener::from_std(listener);
let io = PollEvented::new_raw(io, flags)?;
Ok(TcpListener { io })
}

/// Turns a [`tokio::net::TcpListener`] into a [`std::net::TcpListener`].
///
/// The returned [`std::net::TcpListener`] will have nonblocking mode set as
Expand Down
26 changes: 26 additions & 0 deletions tokio/src/net/unix/listener.rs
Expand Up @@ -114,6 +114,32 @@ impl UnixListener {
Ok(UnixListener { io })
}

/// Create a new UnixListener with the provided raw epoll flags.
///
/// These flags replace any epoll flags would normally set when registering the fd.
///
/// # Note
/// This API does not support the use of `EPOLLONESHOT`.
/// Users are strongly advised to use `EPOLLET` to prevent the tokio IO driver from receiving
/// spurious wakes.
///
/// # Stability
/// This is an [unstable API][unstable]. The public API of this may break in 1.x releases.
/// See [the documentation on unstable features][unstable] for details.
///
/// [unstable]: crate#unstable-features
#[track_caller]
#[cfg(all(target_os = "linux", tokio_unstable))]
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, target_os = "linux"))))]
pub fn from_std_with_epoll_flags(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the TCP method is added to TcpSocket, this will need to be removed and either skipped initially or we will need a UnixSocket API.

listener: net::UnixListener,
flags: u32,
) -> io::Result<UnixListener> {
let io = mio::net::UnixListener::from_std(listener);
let io = PollEvented::new_raw(io, flags)?;
Ok(UnixListener { io })
}

/// Turns a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`].
///
/// The returned [`std::os::unix::net::UnixListener`] will have nonblocking mode
Expand Down
48 changes: 48 additions & 0 deletions tokio/src/runtime/io/driver.rs
Expand Up @@ -236,6 +236,54 @@ impl Handle {
Ok(scheduled_io)
}

/// Registers an I/O resource with the reactor, bypassing Mio entirely and using raw epoll_ctl.
///
/// This is more or less the copy-pasted from the Mio code, and exists so that we can use flags
/// other than the base set of epoll ones we normally use and those representing interests.
///
/// This is important for supporting things like `EPOLLEXCLUSIVE`, which is very useful for
/// shared-nothing runtimes.
///
/// The registries token is returned.
#[cfg(all(target_os = "linux", tokio_unstable))]
pub(super) fn add_source_raw(
&self,
source: &mut impl std::os::unix::io::AsRawFd,
flags: u32,
) -> io::Result<Arc<ScheduledIo>> {
use libc::EPOLLET;
use std::os::unix::io::AsRawFd;

let events = EPOLLET as u32 | flags;

let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
let token = scheduled_io.token();

let mut event = libc::epoll_event {
events,
u64: usize::from(token) as u64,
};

// TODO: if this returns an err, the `ScheduledIo` leaks...
let res = unsafe {
libc::epoll_ctl(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't call epoll_* methods from Tokio as there is no guarantee that Mio uses epoll under the hood. Either we should have mio expose more APIs and call those, or we need to use epoll directly in Tokio (obviously, the first option is preferable).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What guarantees are made by Mio around it's fd?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this behavior is not guaranteed, but we don't really say what behavior is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which also I suppose begs the question "if it isn't here so people can do this as an escape hatch, why is it here at all?"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That being said, there's a deeper question here, which is "do we need Interest::CUSTOM() in Mio, and how would that work?"

Personally, I think if we're going to continue to use Mio within tokio, we probably need something like this.

self.registry.as_raw_fd(),
libc::EPOLL_CTL_ADD,
source.as_raw_fd(),
&mut event,
)
};

if res == -1 {
return Err(std::io::Error::last_os_error());
}

// TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList`
self.metrics.incr_fd_count();

Ok(scheduled_io)
}

/// Deregisters an I/O resource from the reactor.
pub(super) fn deregister_source(
&self,
Expand Down
25 changes: 25 additions & 0 deletions tokio/src/runtime/io/registration.rs
Expand Up @@ -80,6 +80,31 @@ impl Registration {
Ok(Registration { handle, shared })
}

/// Registers the I/O resource with the reactor for the provided handle, for
/// a specific `Interest`, *with a set of raw additional epoll flags*. This does not add `hup`
/// or `error` so if you are interested in those states, you will need to add them to the
/// readiness state passed to this function.
///
/// # Return
///
/// - `Ok` if the registration happened successfully
/// - `Err` if an error was encountered during registration
#[cfg(all(target_os = "linux", tokio_unstable))]
pub(crate) fn new_with_flags_and_handle(
io: &mut impl std::os::unix::io::AsRawFd,
flags: u32,
handle: scheduler::Handle,
) -> io::Result<Registration> {
debug_assert!(
(flags & libc::EPOLLONESHOT as u32) == 0,
"Tokio does not support using EPOLLONESHOT in user-specified epoll flags"
);

let shared = handle.driver().io().add_source_raw(io, flags)?;

Ok(Registration { handle, shared })
}

/// Deregisters the I/O resource from the reactor it is associated with.
///
/// This function must be called before the I/O resource associated with the
Expand Down
104 changes: 104 additions & 0 deletions tokio/tests/epollexclusive.rs
@@ -0,0 +1,104 @@
#![cfg(all(
target_os = "linux",
feature = "net",
feature = "rt",
feature = "sync",
feature = "macros",
feature = "time",
tokio_unstable,
))]

use std::sync::Arc;
use std::thread;
use tokio::sync::Barrier;

const NUM_WORKERS: usize = 8;
const NUM_CONNECTIONS: u64 = 32;

const FUDGE_MIN: f64 = 0.75;
const FUDGE_MAX: f64 = 1.25;

#[test]
fn epoll_exclusive() {
let value = count_accepts_with_flags(NUM_WORKERS, NUM_CONNECTIONS, libc::EPOLLEXCLUSIVE as u32);

let actual_to_expected_ratio = value as f64 / NUM_CONNECTIONS as f64;

assert!(
actual_to_expected_ratio >= FUDGE_MIN && actual_to_expected_ratio <= FUDGE_MAX,
"expected fuzzy {}, got {}",
NUM_CONNECTIONS,
value
);
}

fn count_accepts_with_flags(workers: usize, connections: u64, flags: u32) -> u64 {
let barrier = Arc::new(Barrier::new(workers as usize + 1));

let mut handles = Vec::with_capacity(workers);

let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();

let listener_addr = listener.local_addr().unwrap();

for _ in 0..workers {
let local_listener = listener.try_clone().unwrap();
let local_barrier = barrier.clone();

handles.push(thread::spawn(move || {
count_accepts(local_listener, flags | libc::EPOLLIN as u32, local_barrier)
}))
}

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async {
barrier.wait().await;

for _ in 0..connections {
tokio::net::TcpStream::connect(listener_addr).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

barrier.wait().await;
});

let mut num_accepts_total = 0;

for handle in handles {
num_accepts_total += handle.join().unwrap();
}

num_accepts_total
}

fn count_accepts(std: std::net::TcpListener, flags: u32, barrier: Arc<Barrier>) -> u64 {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async {
std.set_nonblocking(true).unwrap();

let listener = tokio::net::TcpListener::from_std_with_epoll_flags(std, flags).unwrap();

barrier.wait().await;

let mut barr_wait = std::pin::pin!(barrier.wait());

loop {
tokio::select! {
_ = &mut barr_wait => {
return tokio::runtime::Handle::current().metrics().io_driver_ready_count();
}
a = listener.accept() => {
a.unwrap();
}
}
}
})
}