Skip to content

Commit

Permalink
io: update AsyncFd to use Registration (#3113)
Browse files Browse the repository at this point in the history
  • Loading branch information
carllerche committed Nov 10, 2020
1 parent a52f507 commit e1256d8
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 165 deletions.
269 changes: 117 additions & 152 deletions tokio/src/io/async_fd.rs
@@ -1,12 +1,10 @@
use std::os::unix::io::{AsRawFd, RawFd};
use std::{task::Context, task::Poll};

use std::io;
use crate::io::driver::{Direction, Handle, ReadyEvent};
use crate::io::registration::Registration;

use mio::unix::SourceFd;

use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo};
use crate::util::slab;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::{task::Context, task::Poll};

/// Associates an IO object backed by a Unix file descriptor with the tokio
/// reactor, allowing for readiness to be polled. The file descriptor must be of
Expand Down Expand Up @@ -64,27 +62,9 @@ use crate::util::slab;
/// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard
/// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream
pub struct AsyncFd<T: AsRawFd> {
handle: Handle,
shared: slab::Ref<ScheduledIo>,
registration: Registration,
inner: Option<T>,
}

impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_ref().unwrap().as_raw_fd()
}
}

impl<T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFd<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncFd")
.field("inner", &self.inner)
.finish()
}
}

const ALL_INTEREST: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE);

/// Represents an IO-ready event detected on a particular file descriptor, which
/// has not yet been acknowledged. This is a `must_use` structure to help ensure
/// that you do not forget to explicitly clear (or not clear) the event.
Expand All @@ -94,92 +74,7 @@ pub struct AsyncFdReadyGuard<'a, T: AsRawFd> {
event: Option<ReadyEvent>,
}

impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReadyGuard")
.field("async_fd", &self.async_fd)
.finish()
}
}

impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
/// Indicates to tokio that the file descriptor is no longer ready. The
/// internal readiness flag will be cleared, and tokio will wait for the
/// next edge-triggered readiness notification from the OS.
///
/// It is critical that this function not be called unless your code
/// _actually observes_ that the file descriptor is _not_ ready. Do not call
/// it simply because, for example, a read succeeded; it should be called
/// when a read is observed to block.
///
/// [`drop`]: method@std::mem::drop
pub fn clear_ready(&mut self) {
if let Some(event) = self.event.take() {
self.async_fd.shared.clear_readiness(event);
}
}

/// This function should be invoked when you intentionally want to keep the
/// ready flag asserted.
///
/// While this function is itself a no-op, it satisfies the `#[must_use]`
/// constraint on the [`AsyncFdReadyGuard`] type.
pub fn retain_ready(&mut self) {
// no-op
}

/// Performs the IO operation `f`; if `f` returns a [`WouldBlock`] error,
/// the readiness state associated with this file descriptor is cleared.
///
/// This method helps ensure that the readiness state of the underlying file
/// descriptor remains in sync with the tokio-side readiness state, by
/// clearing the tokio-side state only when a [`WouldBlock`] condition
/// occurs. It is the responsibility of the caller to ensure that `f`
/// returns [`WouldBlock`] only if the file descriptor that originated this
/// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
/// create this `AsyncFdReadyGuard`.
///
/// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
pub fn with_io<R>(&mut self, f: impl FnOnce() -> io::Result<R>) -> io::Result<R> {
let result = f();

if let Err(e) = result.as_ref() {
if e.kind() == io::ErrorKind::WouldBlock {
self.clear_ready();
}
}

result
}

/// Performs the IO operation `f`; if `f` returns [`Pending`], the readiness
/// state associated with this file descriptor is cleared.
///
/// This method helps ensure that the readiness state of the underlying file
/// descriptor remains in sync with the tokio-side readiness state, by
/// clearing the tokio-side state only when a [`Pending`] condition occurs.
/// It is the responsibility of the caller to ensure that `f` returns
/// [`Pending`] only if the file descriptor that originated this
/// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
/// create this `AsyncFdReadyGuard`.
///
/// [`Pending`]: std::task::Poll::Pending
pub fn with_poll<R>(&mut self, f: impl FnOnce() -> std::task::Poll<R>) -> std::task::Poll<R> {
let result = f();

if result.is_pending() {
self.clear_ready();
}

result
}
}

impl<T: AsRawFd> Drop for AsyncFd<T> {
fn drop(&mut self) {
let _ = self.take_inner();
}
}
const ALL_INTEREST: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE);

impl<T: AsRawFd> AsyncFd<T> {
/// Creates an AsyncFd backed by (and taking ownership of) an object
Expand All @@ -197,18 +92,11 @@ impl<T: AsRawFd> AsyncFd<T> {
pub(crate) fn new_with_handle(inner: T, handle: Handle) -> io::Result<Self> {
let fd = inner.as_raw_fd();

let shared = if let Some(inner) = handle.inner() {
inner.add_source(&mut SourceFd(&fd), ALL_INTEREST)?
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to find event loop",
));
};
let registration =
Registration::new_with_interest_and_handle(&mut SourceFd(&fd), ALL_INTEREST, handle)?;

Ok(AsyncFd {
handle,
shared,
registration,
inner: Some(inner),
})
}
Expand All @@ -227,11 +115,11 @@ impl<T: AsRawFd> AsyncFd<T> {

fn take_inner(&mut self) -> Option<T> {
let fd = self.inner.as_ref().map(AsRawFd::as_raw_fd);

if let Some(fd) = fd {
if let Some(driver) = self.handle.inner() {
let _ = driver.deregister_source(&mut SourceFd(&fd));
}
let _ = self.registration.deregister(&mut SourceFd(&fd));
}

self.inner.take()
}

Expand All @@ -257,15 +145,7 @@ impl<T: AsRawFd> AsyncFd<T> {
&'a self,
cx: &mut Context<'_>,
) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
let event = ready!(self.shared.poll_readiness(cx, Direction::Read));

if !self.handle.is_alive() {
return Err(io::Error::new(
io::ErrorKind::Other,
"IO driver has terminated",
))
.into();
}
let event = ready!(self.registration.poll_readiness(cx, Direction::Read))?;

Ok(AsyncFdReadyGuard {
async_fd: self,
Expand All @@ -290,15 +170,7 @@ impl<T: AsRawFd> AsyncFd<T> {
&'a self,
cx: &mut Context<'_>,
) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
let event = ready!(self.shared.poll_readiness(cx, Direction::Write));

if !self.handle.is_alive() {
return Err(io::Error::new(
io::ErrorKind::Other,
"IO driver has terminated",
))
.into();
}
let event = ready!(self.registration.poll_readiness(cx, Direction::Write))?;

Ok(AsyncFdReadyGuard {
async_fd: self,
Expand All @@ -308,16 +180,8 @@ impl<T: AsRawFd> AsyncFd<T> {
}

async fn readiness(&self, interest: mio::Interest) -> io::Result<AsyncFdReadyGuard<'_, T>> {
let event = self.shared.readiness(interest);
let event = self.registration.readiness(interest).await?;

if !self.handle.is_alive() {
return Err(io::Error::new(
io::ErrorKind::Other,
"IO driver has terminated",
));
}

let event = event.await;
Ok(AsyncFdReadyGuard {
async_fd: self,
event: Some(event),
Expand All @@ -340,3 +204,104 @@ impl<T: AsRawFd> AsyncFd<T> {
self.readiness(mio::Interest::WRITABLE).await
}
}

impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_ref().unwrap().as_raw_fd()
}
}

impl<T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFd<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncFd")
.field("inner", &self.inner)
.finish()
}
}

impl<T: AsRawFd> Drop for AsyncFd<T> {
fn drop(&mut self) {
let _ = self.take_inner();
}
}

impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
/// Indicates to tokio that the file descriptor is no longer ready. The
/// internal readiness flag will be cleared, and tokio will wait for the
/// next edge-triggered readiness notification from the OS.
///
/// It is critical that this function not be called unless your code
/// _actually observes_ that the file descriptor is _not_ ready. Do not call
/// it simply because, for example, a read succeeded; it should be called
/// when a read is observed to block.
///
/// [`drop`]: method@std::mem::drop
pub fn clear_ready(&mut self) {
if let Some(event) = self.event.take() {
self.async_fd.registration.clear_readiness(event);
}
}

/// This function should be invoked when you intentionally want to keep the
/// ready flag asserted.
///
/// While this function is itself a no-op, it satisfies the `#[must_use]`
/// constraint on the [`AsyncFdReadyGuard`] type.
pub fn retain_ready(&mut self) {
// no-op
}

/// Performs the IO operation `f`; if `f` returns a [`WouldBlock`] error,
/// the readiness state associated with this file descriptor is cleared.
///
/// This method helps ensure that the readiness state of the underlying file
/// descriptor remains in sync with the tokio-side readiness state, by
/// clearing the tokio-side state only when a [`WouldBlock`] condition
/// occurs. It is the responsibility of the caller to ensure that `f`
/// returns [`WouldBlock`] only if the file descriptor that originated this
/// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
/// create this `AsyncFdReadyGuard`.
///
/// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
pub fn with_io<R>(&mut self, f: impl FnOnce() -> io::Result<R>) -> io::Result<R> {
let result = f();

if let Err(e) = result.as_ref() {
if e.kind() == io::ErrorKind::WouldBlock {
self.clear_ready();
}
}

result
}

/// Performs the IO operation `f`; if `f` returns [`Pending`], the readiness
/// state associated with this file descriptor is cleared.
///
/// This method helps ensure that the readiness state of the underlying file
/// descriptor remains in sync with the tokio-side readiness state, by
/// clearing the tokio-side state only when a [`Pending`] condition occurs.
/// It is the responsibility of the caller to ensure that `f` returns
/// [`Pending`] only if the file descriptor that originated this
/// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
/// create this `AsyncFdReadyGuard`.
///
/// [`Pending`]: std::task::Poll::Pending
pub fn with_poll<R>(&mut self, f: impl FnOnce() -> std::task::Poll<R>) -> std::task::Poll<R> {
let result = f();

if result.is_pending() {
self.clear_ready();
}

result
}
}

impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReadyGuard")
.field("async_fd", &self.async_fd)
.finish()
}
}
6 changes: 0 additions & 6 deletions tokio/src/io/driver/mod.rs
Expand Up @@ -290,12 +290,6 @@ impl Handle {
pub(super) fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
}

cfg_net_unix! {
pub(super) fn is_alive(&self) -> bool {
self.inner.strong_count() > 0
}
}
}

impl Unpark for Handle {
Expand Down
13 changes: 9 additions & 4 deletions tokio/src/io/registration.rs
Expand Up @@ -117,18 +117,23 @@ impl Registration {
cx: &mut Context<'_>,
direction: Direction,
) -> Poll<io::Result<ReadyEvent>> {
if self.handle.inner().is_none() {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
}

// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));

if self.handle.inner().is_none() {
return Poll::Ready(Err(gone()));
}

coop.made_progress();
Poll::Ready(Ok(ev))
}
}

fn gone() -> io::Error {
io::Error::new(io::ErrorKind::Other, "IO driver has terminated")
}

cfg_io_readiness! {
impl Registration {
pub(super) async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyEvent> {
Expand Down
6 changes: 3 additions & 3 deletions tokio/tests/io_async_fd.rs
Expand Up @@ -500,10 +500,10 @@ fn driver_shutdown_wakes_currently_pending() {

std::mem::drop(rt);

// Being awoken by a rt drop does not return an error, currently...
let _ = futures::executor::block_on(readable).unwrap();
// The future was initialized **before** dropping the rt
assert_err!(futures::executor::block_on(readable));

// However, attempting to initiate a readiness wait when the rt is dropped is an error
// The future is initialized **after** dropping the rt.
assert_err!(futures::executor::block_on(afd_a.readable()));
}

Expand Down

0 comments on commit e1256d8

Please sign in to comment.