Skip to content

Commit

Permalink
io: Add AsyncFd
Browse files Browse the repository at this point in the history
This adds AsyncFd, a unix-only structure to allow for read/writability states
to be monitored for arbitrary file descriptors.

Issue: #2728
  • Loading branch information
Bryan Donlan committed Oct 5, 2020
1 parent 9730317 commit dd728fe
Show file tree
Hide file tree
Showing 10 changed files with 664 additions and 47 deletions.
9 changes: 8 additions & 1 deletion tokio/Cargo.toml
Expand Up @@ -30,6 +30,7 @@ default = []

# enable everything
full = [
"async-fd",
"blocking",
"dns",
"fs",
Expand All @@ -52,7 +53,7 @@ dns = ["rt-core"]
fs = ["rt-core", "io-util"]
io-util = ["memchr"]
# stdin, stdout, stderr
io-std = ["rt-core"]
io-std = ["rt-core", "mio/os-util"]
macros = ["tokio-macros"]
net = ["dns", "tcp", "udp", "uds"]
process = [
Expand All @@ -76,6 +77,7 @@ signal = [
"libc",
"mio/os-poll",
"mio/uds",
"mio/os-util",
"signal-hook-registry",
"winapi/consoleapi",
]
Expand All @@ -86,6 +88,7 @@ tcp = ["lazy_static", "mio/tcp", "mio/os-poll"]
time = []
udp = ["lazy_static", "mio/udp", "mio/os-poll"]
uds = ["lazy_static", "libc", "mio/uds", "mio/os-poll"]
async-fd = ["lazy_static", "mio/udp", "mio/os-poll", "mio/os-util"]

[dependencies]
tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true }
Expand All @@ -108,6 +111,10 @@ tracing = { version = "0.1.16", default-features = false, features = ["std"], op
libc = { version = "0.2.42", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }

[target.'cfg(unix)'.dev-dependencies]
libc = { version = "0.2.42" }
nix = { version = "0.18.0" }

[target.'cfg(windows)'.dependencies.winapi]
version = "0.3.8"
default-features = false
Expand Down
269 changes: 269 additions & 0 deletions tokio/src/io/async_fd.rs
@@ -0,0 +1,269 @@
use std::os::unix::io::RawFd;

use std::io;

use mio::unix::SourceFd;

use crate::io::driver::{Handle, ReadyEvent, ScheduledIo};
use crate::util::slab;

/// Associates a Unix file descriptor with the tokio reactor, allowing for
/// readiness to be polled.
///
/// Creating an AsyncFd registers the file descriptor with the current tokio
/// Reactor, allowing you to directly await the file descriptor being readable
/// or writable. Once registered, the file descriptor remains registered until
/// the AsyncFd is dropped.
///
/// It is the responsibility of the caller to ensure that the AsyncFd is dropped
/// before the associated file descriptor is closed. Failing to do so may result
/// in spurious events or mysterious errors from other tokio IO calls.
///
/// Polling for readiness is done by calling the async functions [`readable`]
/// and [`writable`]. These functions complete when the associated readiness
/// condition is observed. Any number of tasks can query the same `AsyncFd`
/// in parallel, on the same or different conditions.
///
/// On some platforms, the readiness detecting mechanism relies on
/// edge-triggered notifications. This means that the OS will only notify Tokio
/// when the file descriptor transitions from not-ready to ready. Tokio
/// internally tracks when it has received a ready notification, and when
/// readiness checking functions like [`readable`] and [`writable`] are called,
/// if the readiness flag is set, these async functions will complete
/// immediately.
///
/// This however does mean that it is critical to ensure that this ready flag is
/// cleared when (and only when) the file descriptor ceases to be ready. The
/// [`ReadyGuard`] returned from readiness checking functions serves this
/// function; after calling a readiness-checking async function, you must use
/// this [`ReadyGuard`] to signal to tokio whether the file descriptor is no
/// longer in a ready state.
///
/// ## Converting to a poll-based API
///
/// In some cases it may be desirable to use `AsyncFd` from APIs similar to
/// [`TcpStream::poll_read_ready`]. One can do so by allocating a pinned future
/// to perform the poll:
///
/// ```
/// use tokio::io::{ReadyGuard, AsyncFd};
///
/// use std::future::Future;
/// use std::sync::Arc;
/// use std::pin::Pin;
/// use std::task::{Context, Poll};
///
/// use futures::ready;
///
/// struct MyIoStruct {
/// async_fd: Arc<AsyncFd>,
/// poller: Pin<Box<dyn Future<Output=()>>>
/// }
///
/// impl MyIoStruct {
/// fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<std::io::Result<ReadyGuard<'_>>> {
/// let mut result = Poll::Pending;
/// while result.is_pending() {
/// // Poll the saved future; if it's not ready, our context waker will be saved in the
/// // future and we can return.
/// ready!(self.poller.as_mut().poll(cx));
///
/// // Reset the poller future, since we consumed it.
/// let arc = self.async_fd.clone();
/// self.poller = Box::pin(async move {
/// let _ = arc.readable().await.map(|mut guard| guard.retain_ready());
/// });
///
/// // Because we need to bind the ReadyGuard to the lifetime of self, we have to re-poll here.
/// // It's possible that we might race with another thread clearing the ready state, so deal
/// // with that as well.
/// let fut = self.async_fd.readable();
/// tokio::pin!(fut);
/// result = fut.as_mut().poll(cx);
/// }
///
/// result
/// }
/// }
/// ```
///
/// [`readable`]: method@Self::readable
/// [`writable`]: method@Self::writable
/// [`ReadyGuard`]: struct@self::ReadyGuard
/// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream
pub struct AsyncFd {
handle: Handle,
fd: RawFd,
shared: slab::Ref<ScheduledIo>,
}

impl std::fmt::Debug for AsyncFd {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncFd").field("fd", &self.fd).finish()
}
}

unsafe impl Send for AsyncFd {}
unsafe impl Sync for AsyncFd {}

const fn all_interest() -> mio::Interest {
mio::Interest::READABLE.add(mio::Interest::WRITABLE)
}

/// Represents an IO-ready event detected on a particular file descriptor, which
/// has not yet been acknowledged. This is a `must_use` structure to help ensure
/// that you do not forget to explicitly clear (or not clear) the event.
#[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"]
pub struct ReadyGuard<'a> {
async_fd: &'a AsyncFd,
event: Option<ReadyEvent>,
}

impl<'a> std::fmt::Debug for ReadyGuard<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClearReady")
.field("async_fd", self.async_fd)
.finish()
}
}

impl<'a> ReadyGuard<'a> {
/// Indicates to tokio that the file descriptor is no longer ready. The
/// internal readiness flag will be cleared, and tokio will wait for the
/// next edge-triggered readiness notification from the OS.
///
/// It is critical that this function not be called unless your code
/// _actually observes_ that the file descriptor is _not_ ready. Do not call
/// it simply because, for example, a read succeeded; it should be called
/// when a read is observed to block.
///
/// [`drop`]: method@std::mem::drop
pub fn clear_ready(&mut self) {
if let Some(event) = self.event.take() {
self.async_fd.shared.clear_readiness(event);
}
}

/// This function should be invoked when you intentionally want to keep the
/// ready flag asserted.
///
/// While this function is itself a no-op, it satisfies the `#[must_use]`
/// constraint on the [`ReadyGuard`] type.
pub fn retain_ready(&mut self) {
// no-op
}

/// Performs the IO operation `f`; if `f` returns a [`WouldBlock`] error,
/// the readiness state associated with this file descriptor is cleared.
///
/// This method helps ensure that the readiness state of the underlying file
/// descriptor remains in sync with the tokio-side readiness state, by
/// clearing the tokio-side state only when a [`WouldBlock`] condition
/// occurs. It is the responsibility of the caller to ensure that `f`
/// returns [`WouldBlock`] only if the file descriptor that originated this
/// `ReadyGuard` no longer expresses the readiness state that was queried to
/// create this `ReadyGuard`.
///
/// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
pub fn with_io<R, E>(&mut self, f: impl FnOnce() -> Result<R, E>) -> Result<R, E>
where
E: std::error::Error + 'static,
{
use std::error::Error;

let result = f();

if let Err(e) = result.as_ref() {
// Is this a WouldBlock error?
let mut error_ref: Option<&(dyn Error + 'static)> = Some(e);

while let Some(current) = error_ref {
if let Some(e) = Error::downcast_ref::<std::io::Error>(current) {
if e.kind() == std::io::ErrorKind::WouldBlock {
self.clear_ready();
break;
}
}
error_ref = current.source();
}
}

result
}

/// Performs the IO operation `f`; if `f` returns [`Pending`], the readiness
/// state associated with this file descriptor is cleared.
///
/// This method helps ensure that the readiness state of the underlying file
/// descriptor remains in sync with the tokio-side readiness state, by
/// clearing the tokio-side state only when a [`Pending`] condition occurs.
/// It is the responsibility of the caller to ensure that `f` returns
/// [`Pending`] only if the file descriptor that originated this
/// `ReadyGuard` no longer expresses the readiness state that was queried to
/// create this `ReadyGuard`.
///
/// [`Pending`]: std::task::Poll::Pending
pub fn with_poll<R>(&mut self, f: impl FnOnce() -> std::task::Poll<R>) -> std::task::Poll<R> {
let result = f();

if result.is_pending() {
self.clear_ready();
}

result
}
}

impl Drop for AsyncFd {
fn drop(&mut self) {
if let Some(inner) = self.handle.inner() {
let _ = inner.deregister_source(&mut SourceFd(&self.fd));
}
}
}

impl AsyncFd {
/// Constructs a new AsyncFd, binding this file descriptor to the current tokio Reactor.
///
/// This function must be called in the context of a tokio runtime.
pub fn new(fd: RawFd) -> io::Result<Self> {
Self::new_with_handle(fd, Handle::current())
}

pub(crate) fn new_with_handle(fd: RawFd, handle: Handle) -> io::Result<Self> {
let shared = if let Some(inner) = handle.inner() {
inner.add_source(&mut SourceFd(&fd), all_interest())?
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to find event loop",
));
};

Ok(AsyncFd { handle, fd, shared })
}

async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyGuard<'_>> {
let event = self.shared.readiness(interest).await;
Ok(ReadyGuard {
async_fd: self,
event: Some(event),
})
}

/// Waits for the file descriptor to become readable, returning a
/// [`ReadyGuard`] that must be dropped to resume read-readiness polling.
///
/// [`ReadyGuard`]: struct@self::ReadyGuard
pub async fn readable(&self) -> io::Result<ReadyGuard<'_>> {
self.readiness(mio::Interest::READABLE).await
}

/// Waits for the file descriptor to become writable, returning a
/// [`ReadyGuard`] that must be dropped to resume write-readiness polling.
///
/// [`ReadyGuard`]: struct@self::ReadyGuard
pub async fn writable(&self) -> io::Result<ReadyGuard<'_>> {
self.readiness(mio::Interest::WRITABLE).await
}
}
23 changes: 13 additions & 10 deletions tokio/src/io/driver/mod.rs
Expand Up @@ -56,10 +56,12 @@ pub(super) struct Inner {
waker: mio::Waker,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub(super) enum Direction {
Read,
Write,
cfg_io_poll_evented! {
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub(super) enum Direction {
Read,
Write,
}
}

enum Tick {
Expand Down Expand Up @@ -292,12 +294,13 @@ impl Inner {
self.registry.deregister(source)
}
}

impl Direction {
pub(super) fn mask(self) -> Ready {
match self {
Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
cfg_io_poll_evented! {
impl Direction {
pub(super) fn mask(self) -> Ready {
match self {
Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
}
}
}
}

0 comments on commit dd728fe

Please sign in to comment.