From f4905efb52fd1e0270f5f78cbc1db35d9507449a Mon Sep 17 00:00:00 2001 From: Bryan Donlan Date: Thu, 1 Oct 2020 21:18:20 +0000 Subject: [PATCH 01/11] io: Add AsyncFd This adds AsyncFd, a unix-only structure to allow for read/writability states to be monitored for arbitrary file descriptors. Issue: #2728 --- tokio/Cargo.toml | 6 + tokio/src/io/async_fd.rs | 320 +++++++++++++++++++++ tokio/src/io/mod.rs | 9 +- tokio/src/lib.rs | 3 +- tokio/src/loom/std/parking_lot.rs | 5 + tokio/tests/io_async_fd.rs | 449 ++++++++++++++++++++++++++++++ 6 files changed, 790 insertions(+), 2 deletions(-) create mode 100644 tokio/src/io/async_fd.rs create mode 100644 tokio/tests/io_async_fd.rs diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index e19b8c916aa..ea849c1f105 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -53,6 +53,7 @@ net = [ "lazy_static", "libc", "mio/os-poll", + "mio/os-util", "mio/tcp", "mio/udp", "mio/uds", @@ -77,6 +78,7 @@ signal = [ "libc", "mio/os-poll", "mio/uds", + "mio/os-util", "signal-hook-registry", "winapi/consoleapi", ] @@ -106,6 +108,10 @@ tracing = { version = "0.1.16", default-features = false, features = ["std"], op libc = { version = "0.2.42", optional = true } signal-hook-registry = { version = "1.1.1", optional = true } +[target.'cfg(unix)'.dev-dependencies] +libc = { version = "0.2.42" } +nix = { version = "0.18.0" } + [target.'cfg(windows)'.dependencies.winapi] version = "0.3.8" default-features = false diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs new file mode 100644 index 00000000000..af440c05a95 --- /dev/null +++ b/tokio/src/io/async_fd.rs @@ -0,0 +1,320 @@ +use std::os::unix::io::{AsRawFd, RawFd}; +use std::{task::Context, task::Poll}; + +use std::io; + +use mio::unix::SourceFd; + +use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo}; +use crate::util::slab; + +/// Associates an IO object backed by a Unix file descriptor with the tokio +/// reactor, allowing for readiness to be polled. +/// +/// Creating an AsyncFd registers the file descriptor with the current tokio +/// Reactor, allowing you to directly await the file descriptor being readable +/// or writable. Once registered, the file descriptor remains registered until +/// the AsyncFd is dropped. +/// +/// The AsyncFd takes ownership of an arbitrary object to represent the IO +/// object. It is intended that this object will handle closing the file +/// descriptor when it is dropped, avoiding resource leaks and ensuring that the +/// AsyncFd can clean up the registration before closing the file descriptor. +/// The [`AsyncFd::into_inner`] function can be used to extract the inner object +/// to retake control from the tokio IO reactor. +/// +/// Polling for readiness is done by calling the async functions [`readable`] +/// and [`writable`]. These functions complete when the associated readiness +/// condition is observed. Any number of tasks can query the same `AsyncFd` in +/// parallel, on the same or different conditions. +/// +/// On some platforms, the readiness detecting mechanism relies on +/// edge-triggered notifications. This means that the OS will only notify Tokio +/// when the file descriptor transitions from not-ready to ready. Tokio +/// internally tracks when it has received a ready notification, and when +/// readiness checking functions like [`readable`] and [`writable`] are called, +/// if the readiness flag is set, these async functions will complete +/// immediately. +/// +/// This however does mean that it is critical to ensure that this ready flag is +/// cleared when (and only when) the file descriptor ceases to be ready. The +/// [`ReadyGuard`] returned from readiness checking functions serves this +/// function; after calling a readiness-checking async function, you must use +/// this [`ReadyGuard`] to signal to tokio whether the file descriptor is no +/// longer in a ready state. +/// +/// ## Use with to a poll-based API +/// +/// In some cases it may be desirable to use `AsyncFd` from APIs similar to +/// [`TcpStream::poll_read_ready`]. The [`AsyncFd::poll_read_ready`] and +/// [`AsyncFd::poll_write_ready`] functions are provided for this purpose. +/// Because these functions don't create a future to hold their state, they have +/// the limitation that only one task can wait on each direction (read or write) +/// at a time. + +/// [`readable`]: method@Self::readable +/// [`writable`]: method@Self::writable +/// [`ReadyGuard`]: struct@self::ReadyGuard +/// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream +pub struct AsyncFd { + handle: Handle, + fd: RawFd, + shared: slab::Ref, + inner: Option, +} + +impl AsRawFd for AsyncFd { + fn as_raw_fd(&self) -> RawFd { + self.fd + } +} + +impl std::fmt::Debug for AsyncFd { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AsyncFd") + .field("fd", &self.fd) + .field("inner", &self.inner) + .finish() + } +} + +const fn all_interest() -> mio::Interest { + mio::Interest::READABLE.add(mio::Interest::WRITABLE) +} + +/// Represents an IO-ready event detected on a particular file descriptor, which +/// has not yet been acknowledged. This is a `must_use` structure to help ensure +/// that you do not forget to explicitly clear (or not clear) the event. +#[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"] +pub struct ReadyGuard<'a, T> { + async_fd: &'a AsyncFd, + event: Option, +} + +impl<'a, T: std::fmt::Debug> std::fmt::Debug for ReadyGuard<'a, T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ClearReady") + .field("async_fd", self.async_fd) + .finish() + } +} + +impl<'a, T> ReadyGuard<'a, T> { + /// Indicates to tokio that the file descriptor is no longer ready. The + /// internal readiness flag will be cleared, and tokio will wait for the + /// next edge-triggered readiness notification from the OS. + /// + /// It is critical that this function not be called unless your code + /// _actually observes_ that the file descriptor is _not_ ready. Do not call + /// it simply because, for example, a read succeeded; it should be called + /// when a read is observed to block. + /// + /// [`drop`]: method@std::mem::drop + pub fn clear_ready(&mut self) { + if let Some(event) = self.event.take() { + self.async_fd.shared.clear_readiness(event); + } + } + + /// This function should be invoked when you intentionally want to keep the + /// ready flag asserted. + /// + /// While this function is itself a no-op, it satisfies the `#[must_use]` + /// constraint on the [`ReadyGuard`] type. + pub fn retain_ready(&mut self) { + // no-op + } + + /// Performs the IO operation `f`; if `f` returns a [`WouldBlock`] error, + /// the readiness state associated with this file descriptor is cleared. + /// + /// This method helps ensure that the readiness state of the underlying file + /// descriptor remains in sync with the tokio-side readiness state, by + /// clearing the tokio-side state only when a [`WouldBlock`] condition + /// occurs. It is the responsibility of the caller to ensure that `f` + /// returns [`WouldBlock`] only if the file descriptor that originated this + /// `ReadyGuard` no longer expresses the readiness state that was queried to + /// create this `ReadyGuard`. + /// + /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock + pub fn with_io(&mut self, f: impl FnOnce() -> Result) -> Result + where + E: std::error::Error + 'static, + { + use std::error::Error; + + let result = f(); + + if let Err(e) = result.as_ref() { + // Is this a WouldBlock error? + let mut error_ref: Option<&(dyn Error + 'static)> = Some(e); + + while let Some(current) = error_ref { + if let Some(e) = Error::downcast_ref::(current) { + if e.kind() == std::io::ErrorKind::WouldBlock { + self.clear_ready(); + break; + } + } + error_ref = current.source(); + } + } + + result + } + + /// Performs the IO operation `f`; if `f` returns [`Pending`], the readiness + /// state associated with this file descriptor is cleared. + /// + /// This method helps ensure that the readiness state of the underlying file + /// descriptor remains in sync with the tokio-side readiness state, by + /// clearing the tokio-side state only when a [`Pending`] condition occurs. + /// It is the responsibility of the caller to ensure that `f` returns + /// [`Pending`] only if the file descriptor that originated this + /// `ReadyGuard` no longer expresses the readiness state that was queried to + /// create this `ReadyGuard`. + /// + /// [`Pending`]: std::task::Poll::Pending + pub fn with_poll(&mut self, f: impl FnOnce() -> std::task::Poll) -> std::task::Poll { + let result = f(); + + if result.is_pending() { + self.clear_ready(); + } + + result + } +} + +impl Drop for AsyncFd { + fn drop(&mut self) { + if let Some(inner) = self.handle.inner() { + let _ = inner.deregister_source(&mut SourceFd(&self.fd)); + } + } +} + +impl AsyncFd { + /// Creates an AsyncFd backed by (and taking ownership of) an object + /// implementing [`AsRawFd`]. The backing file descriptor is cached at the + /// time of creation. + /// + /// This function must be called in the context of a tokio runtime. + pub fn new(inner: T) -> io::Result + where + T: AsRawFd, + { + let fd = inner.as_raw_fd(); + Self::new_with_fd(inner, fd) + } + + /// Constructs a new AsyncFd, explicitly specifying the backing file + /// descriptor. + /// + /// This function must be called in the context of a tokio runtime. + pub fn new_with_fd(inner: T, fd: RawFd) -> io::Result { + Self::new_with_handle(inner, fd, Handle::current()) + } + + pub(crate) fn new_with_handle(inner: T, fd: RawFd, handle: Handle) -> io::Result { + let shared = if let Some(inner) = handle.inner() { + inner.add_source(&mut SourceFd(&fd), all_interest())? + } else { + return Err(io::Error::new( + io::ErrorKind::Other, + "failed to find event loop", + )); + }; + + Ok(AsyncFd { + handle, + fd, + shared, + inner: Some(inner), + }) + } + + /// Returns a shared reference to the backing object of this [`AsyncFd`] + #[inline] + pub fn inner(&self) -> &T { + self.inner.as_ref().unwrap() + } + + /// Returns a mutable reference to the backing object of this [`AsyncFd`] + #[inline] + pub fn inner_mut(&mut self) -> &mut T { + self.inner.as_mut().unwrap() + } + + /// Deregisters this file descriptor, and returns ownership of the backing + /// object. + pub fn into_inner(mut self) -> T { + self.inner.take().unwrap() + } + + /// Polls for read readiness. This function retains the waker for the last + /// context that called [`poll_read_ready`]; it therefore can only be used + /// by a single task at a time (however, [`poll_write_ready`] retains a + /// second, independent waker). + /// + /// This function is intended for cases where creating and pinning a future + /// via [`readable`] is not feasible. Where possible, using [`readable`] is + /// preferred, as this supports polling from multiple tasks at once. + pub fn poll_read_ready<'a, 'cx>( + &'a self, + cx: &mut Context<'cx>, + ) -> Poll>> { + let event = ready!(self.shared.poll_readiness(cx, Direction::Read)); + + Ok(ReadyGuard { + async_fd: self, + event: Some(event), + }) + .into() + } + + /// Polls for write readiness. This function retains the waker for the last + /// context that called [`poll_write_ready`]; it therefore can only be used + /// by a single task at a time (however, [`poll_read_ready`] retains a + /// second, independent waker). + /// + /// This function is intended for cases where creating and pinning a future + /// via [`writable`] is not feasible. Where possible, using [`writable`] is + /// preferred, as this supports polling from multiple tasks at once. + pub fn poll_write_ready<'a, 'cx>( + &'a self, + cx: &mut Context<'cx>, + ) -> Poll>> { + let event = ready!(self.shared.poll_readiness(cx, Direction::Write)); + + Ok(ReadyGuard { + async_fd: self, + event: Some(event), + }) + .into() + } + + async fn readiness(&self, interest: mio::Interest) -> io::Result> { + let event = self.shared.readiness(interest).await; + Ok(ReadyGuard { + async_fd: self, + event: Some(event), + }) + } + + /// Waits for the file descriptor to become readable, returning a + /// [`ReadyGuard`] that must be dropped to resume read-readiness polling. + /// + /// [`ReadyGuard`]: struct@self::ReadyGuard + pub async fn readable(&self) -> io::Result> { + self.readiness(mio::Interest::READABLE).await + } + + /// Waits for the file descriptor to become writable, returning a + /// [`ReadyGuard`] that must be dropped to resume write-readiness polling. + /// + /// [`ReadyGuard`]: struct@self::ReadyGuard + pub async fn writable(&self) -> io::Result> { + self.readiness(mio::Interest::WRITABLE).await + } +} diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 9191bbcd19e..b782577604d 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -207,11 +207,18 @@ pub use std::io::{Error, ErrorKind, Result, SeekFrom}; cfg_io_driver! { pub(crate) mod driver; + mod registration; + mod poll_evented; + #[cfg(not(loom))] pub(crate) use poll_evented::PollEvented; +} - mod registration; +cfg_net_unix! { + mod async_fd; + + pub use self::async_fd::{AsyncFd, ReadyGuard}; } cfg_io_std! { diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 0ae15ab688c..eb77b5ad982 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -305,7 +305,8 @@ //! - `rt-multi-thread`: Enables the heavier, multi-threaded, work-stealing scheduler. //! - `io-util`: Enables the IO based `Ext` traits. //! - `io-std`: Enable `Stdout`, `Stdin` and `Stderr` types. -//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`. +//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`, +//! as well as (on Unix-like systems) `AsyncFd` //! - `time`: Enables `tokio::time` types and allows the schedulers to enable //! the built in timer. //! - `process`: Enables `tokio::process` types. diff --git a/tokio/src/loom/std/parking_lot.rs b/tokio/src/loom/std/parking_lot.rs index c03190feb8b..8448bed53d7 100644 --- a/tokio/src/loom/std/parking_lot.rs +++ b/tokio/src/loom/std/parking_lot.rs @@ -43,6 +43,11 @@ impl Mutex { self.0.try_lock() } + #[inline] + pub(crate) fn get_mut(&mut self) -> &mut T { + self.0.get_mut() + } + // Note: Additional methods `is_poisoned` and `into_inner`, can be // provided here as needed. } diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs new file mode 100644 index 00000000000..d8abaa0e935 --- /dev/null +++ b/tokio/tests/io_async_fd.rs @@ -0,0 +1,449 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(unix, feature = "full"))] + +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; +use std::time::Duration; +use std::{ + io::{self, ErrorKind, Read, Write}, + task::{Context, Waker}, +}; + +use nix::errno::Errno; +use nix::unistd::{close, read, write}; + +use futures::{poll, FutureExt}; + +use tokio::io::AsyncFd; +use tokio_test::assert_pending; + +struct TestWaker { + inner: Arc, + waker: Waker, +} + +#[derive(Default)] +struct TestWakerInner { + awoken: AtomicBool, +} + +impl futures::task::ArcWake for TestWakerInner { + fn wake_by_ref(arc_self: &Arc) { + arc_self.awoken.store(true, Ordering::SeqCst); + } +} + +impl TestWaker { + fn new() -> Self { + let inner: Arc = Default::default(); + + Self { + inner: inner.clone(), + waker: futures::task::waker(inner), + } + } + + fn awoken(&self) -> bool { + self.inner.awoken.swap(false, Ordering::SeqCst) + } + + fn context<'a>(&'a self) -> Context<'a> { + Context::from_waker(&self.waker) + } +} + +fn is_blocking(e: &nix::Error) -> bool { + Some(Errno::EAGAIN) == e.as_errno() +} + +#[derive(Debug)] +struct FileDescriptor { + fd: RawFd, +} + +impl AsRawFd for FileDescriptor { + fn as_raw_fd(&self) -> RawFd { + self.fd + } +} + +impl Read for &FileDescriptor { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match read(self.fd, buf) { + Ok(n) => Ok(n), + Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()), + Err(e) => Err(io::Error::new(ErrorKind::Other, e)), + } + } +} + +impl Read for FileDescriptor { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (self as &Self).read(buf) + } +} + +impl Write for &FileDescriptor { + fn write(&mut self, buf: &[u8]) -> io::Result { + match write(self.fd, buf) { + Ok(n) => Ok(n), + Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()), + Err(e) => Err(io::Error::new(ErrorKind::Other, e)), + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl Write for FileDescriptor { + fn write(&mut self, buf: &[u8]) -> io::Result { + (self as &Self).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (self as &Self).flush() + } +} + +impl Drop for FileDescriptor { + fn drop(&mut self) { + let _ = close(self.fd); + } +} + +fn set_nonblocking(fd: RawFd) { + use nix::fcntl::{OFlag, F_GETFL, F_SETFL}; + + let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)"); + + if flags < 0 { + panic!( + "bad return value from fcntl(F_GETFL): {} ({:?})", + flags, + nix::Error::last() + ); + } + + let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK; + + nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)"); +} + +fn socketpair() -> (FileDescriptor, FileDescriptor) { + use nix::sys::socket::{self, AddressFamily, SockFlag, SockType}; + + let (fd_a, fd_b) = socket::socketpair( + AddressFamily::Unix, + SockType::Stream, + None, + SockFlag::empty(), + ) + .expect("socketpair"); + let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b }); + + set_nonblocking(fds.0.fd); + set_nonblocking(fds.1.fd); + + fds +} + +fn drain(mut fd: &FileDescriptor) { + let mut buf = [0u8; 512]; + + loop { + match fd.read(&mut buf[..]) { + Err(e) if e.kind() == ErrorKind::WouldBlock => break, + Ok(0) => panic!("unexpected EOF"), + Err(e) => panic!("unexpected error: {:?}", e), + Ok(_) => continue, + } + } +} + +#[tokio::test] +async fn initially_writable() { + let (a, b) = socketpair(); + + let afd_a = AsyncFd::new(a).unwrap(); + let afd_b = AsyncFd::new(b).unwrap(); + + afd_a.writable().await.unwrap().clear_ready(); + afd_b.writable().await.unwrap().clear_ready(); + + futures::select_biased! { + _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {}, + _ = afd_a.readable().fuse() => panic!("Unexpected readable state"), + _ = afd_b.readable().fuse() => panic!("Unexpected readable state"), + } +} + +#[tokio::test] +async fn reset_readable() { + let (a, mut b) = socketpair(); + + let afd_a = AsyncFd::new(a).unwrap(); + + let readable = afd_a.readable(); + tokio::pin!(readable); + + tokio::select! { + _ = readable.as_mut() => panic!(), + _ = tokio::time::sleep(Duration::from_millis(10)) => {} + } + + b.write_all(b"0").unwrap(); + + let mut guard = readable.await.unwrap(); + + guard.with_io(|| afd_a.inner().read(&mut [0])).unwrap(); + + // `a` is not readable, but the reactor still thinks it is + // (because we have not observed a not-ready error yet) + afd_a.readable().await.unwrap().retain_ready(); + + // Explicitly clear the ready state + guard.clear_ready(); + + let readable = afd_a.readable(); + tokio::pin!(readable); + + tokio::select! { + _ = readable.as_mut() => panic!(), + _ = tokio::time::sleep(Duration::from_millis(10)) => {} + } + + b.write_all(b"0").unwrap(); + + // We can observe the new readable event + afd_a.readable().await.unwrap().clear_ready(); +} + +#[tokio::test] +async fn reset_writable() { + let (a, mut b) = socketpair(); + + let afd_a = AsyncFd::new(a).unwrap(); + + let mut guard = afd_a.writable().await.unwrap(); + + // Write until we get a WouldBlock. This also clears the ready state. + loop { + if let Err(e) = guard.with_io(|| afd_a.inner().write(&[0; 512][..])) { + assert_eq!(ErrorKind::WouldBlock, e.kind()); + break; + } + } + + // Writable state should be cleared now. + let writable = afd_a.writable(); + tokio::pin!(writable); + + tokio::select! { + _ = writable.as_mut() => panic!(), + _ = tokio::time::sleep(Duration::from_millis(10)) => {} + } + + // Read from the other side; we should become writable now. + drain(&mut b); + + let _ = writable.await.unwrap(); +} + +#[tokio::test] +async fn drop_closes() { + let (a, mut b) = socketpair(); + + let afd_a = AsyncFd::new(a).unwrap(); + + assert_eq!( + ErrorKind::WouldBlock, + b.read(&mut [0]).err().unwrap().kind() + ); + + std::mem::drop(afd_a); + + assert_eq!(0, b.read(&mut [0]).unwrap()); + + // into_inner does not close the fd + + let (a, mut b) = socketpair(); + let afd_a = AsyncFd::new(a).unwrap(); + let _a: FileDescriptor = afd_a.into_inner(); + + assert_eq!( + ErrorKind::WouldBlock, + b.read(&mut [0]).err().unwrap().kind() + ); + + // Drop closure behavior is delegated to the inner object + let (a, mut b) = socketpair(); + let afd_a = AsyncFd::new_with_fd((), a.fd).unwrap(); + std::mem::drop(afd_a); + + assert_eq!( + ErrorKind::WouldBlock, + b.read(&mut [0]).err().unwrap().kind() + ); +} + +#[tokio::test] +async fn with_poll() { + use std::task::Poll; + + let (a, mut b) = socketpair(); + + b.write_all(b"0").unwrap(); + + let afd_a = AsyncFd::new(a).unwrap(); + + let mut guard = afd_a.readable().await.unwrap(); + + afd_a.inner().read_exact(&mut [0]).unwrap(); + + // Should not clear the readable state + let _ = guard.with_poll(|| Poll::Ready(())); + + // Still readable... + let _ = afd_a.readable().await.unwrap(); + + // Should clear the readable state + let _ = guard.with_poll(|| Poll::Pending::<()>); + + // Assert not readable + let readable = afd_a.readable(); + tokio::pin!(readable); + + tokio::select! { + _ = readable.as_mut() => panic!(), + _ = tokio::time::sleep(Duration::from_millis(10)) => {} + } + + // Write something down b again and make sure we're reawoken + b.write_all(b"0").unwrap(); + let _ = readable.await.unwrap(); +} + +#[tokio::test] +async fn multiple_waiters() { + let (a, mut b) = socketpair(); + let afd_a = Arc::new(AsyncFd::new(a).unwrap()); + + let barrier = Arc::new(tokio::sync::Barrier::new(11)); + + let mut tasks = Vec::new(); + for _ in 0..10 { + let afd_a = afd_a.clone(); + let barrier = barrier.clone(); + + let f = async move { + let notify_barrier = async { + barrier.wait().await; + futures::future::pending::<()>().await; + }; + + futures::select_biased! { + guard = afd_a.readable().fuse() => { + tokio::task::yield_now().await; + guard.unwrap().clear_ready() + }, + _ = notify_barrier.fuse() => unreachable!(), + } + + std::mem::drop(afd_a); + }; + + tasks.push(tokio::spawn(f)); + } + + let mut all_tasks = futures::future::try_join_all(tasks); + + tokio::select! { + r = std::pin::Pin::new(&mut all_tasks) => { + r.unwrap(); // propagate panic + panic!("Tasks exited unexpectedly") + }, + _ = barrier.wait() => {} + }; + + b.write_all(b"0").unwrap(); + + all_tasks.await.unwrap(); +} + +#[tokio::test] +async fn poll_fns() { + let (a, b) = socketpair(); + let afd_a = Arc::new(AsyncFd::new(a).unwrap()); + let afd_b = Arc::new(AsyncFd::new(b).unwrap()); + + // Fill up the write side of A + while afd_a.inner().write(&[0; 512]).is_ok() {} + + let waker = TestWaker::new(); + + assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context())); + + let afd_a_2 = afd_a.clone(); + let r_barrier = Arc::new(tokio::sync::Barrier::new(2)); + let barrier_clone = r_barrier.clone(); + + let read_fut = tokio::spawn(async move { + // Move waker onto this task first + assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2 + .as_ref() + .poll_read_ready(cx)))); + barrier_clone.wait().await; + + let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await; + }); + + let afd_a_2 = afd_a.clone(); + let w_barrier = Arc::new(tokio::sync::Barrier::new(2)); + let barrier_clone = w_barrier.clone(); + + let mut write_fut = tokio::spawn(async move { + // Move waker onto this task first + assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2 + .as_ref() + .poll_write_ready(cx)))); + barrier_clone.wait().await; + + let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await; + }); + + r_barrier.wait().await; + w_barrier.wait().await; + + let readable = afd_a.readable(); + tokio::pin!(readable); + + tokio::select! { + _ = &mut readable => unreachable!(), + _ = tokio::task::yield_now() => {} + } + + // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly + afd_b.inner().write_all(b"0").unwrap(); + + let _ = tokio::join!(readable, read_fut); + + // Our original waker should _not_ be awoken (poll_read_ready retains only the last context) + assert!(!waker.awoken()); + + // The writable side should not be awoken + tokio::select! { + _ = &mut write_fut => unreachable!(), + _ = tokio::time::sleep(Duration::from_millis(5)) => {} + } + + // Make it writable now + drain(afd_b.inner()); + + // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side) + let _ = write_fut.await; +} From 799e763ee69cc24251ea4d984dd72f41831beeeb Mon Sep 17 00:00:00 2001 From: Bryan Donlan Date: Tue, 20 Oct 2020 22:35:00 +0000 Subject: [PATCH 02/11] driver: fix shutdown notification unreliability Previously, there was a race window in which an IO driver shutting down could fail to notify ScheduledIo instances of this state; in particular, notification of outstanding ScheduledIo registrations was driven by `Driver::drop`, but registrations bypass `Driver` and go directly to a `Weak`. The `Driver` holds the `Arc` keeping `Inner` alive, but it's possible that a new handle could be registered (or a new readiness future created for an existing handle) after the `Driver::drop` handler runs and prior to `Inner` being dropped. This change fixes this in two parts: First, notification of outstanding ScheduledIo handles is pushed down into the drop method of `Inner` instead, and, second, we add state to ScheduledIo to ensure that we remember that the IO driver we're bound to has shut down after the initial shutdown notification, so that subsequent readiness future registrations can immediately return (instead of potentially blocking indefinitely). Fixes: #2924 --- tokio/src/io/async_fd.rs | 13 ++++++-- tokio/src/io/driver/mod.rs | 51 ++++++++++++++++++++++------- tokio/src/io/driver/scheduled_io.rs | 33 +++++++++++++++++-- tokio/src/loom/std/mod.rs | 2 +- 4 files changed, 81 insertions(+), 18 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index af440c05a95..1aad157c924 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -51,7 +51,7 @@ use crate::util::slab; /// Because these functions don't create a future to hold their state, they have /// the limitation that only one task can wait on each direction (read or write) /// at a time. - +/// /// [`readable`]: method@Self::readable /// [`writable`]: method@Self::writable /// [`ReadyGuard`]: struct@self::ReadyGuard @@ -295,7 +295,16 @@ impl AsyncFd { } async fn readiness(&self, interest: mio::Interest) -> io::Result> { - let event = self.shared.readiness(interest).await; + let event = self.shared.readiness(interest); + + if !self.handle.is_alive() { + return Err(io::Error::new( + io::ErrorKind::Other, + "IO driver has terminated", + )); + } + + let event = event.await; Ok(ReadyGuard { async_fd: self, event: Some(event), diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index cd82b26f0f8..2bf174326a6 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -7,8 +7,8 @@ mod scheduled_io; pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests use crate::park::{Park, Unpark}; -use crate::util::bit; use crate::util::slab::{self, Slab}; +use crate::{loom::sync::Mutex, util::bit}; use std::fmt; use std::io; @@ -25,8 +25,10 @@ pub(crate) struct Driver { events: Option, /// Primary slab handle containing the state for each resource registered - /// with this driver. - resources: Slab, + /// with this driver. During Drop this is moved into the Inner structure, so + /// this is an Option to allow it to be vacated (until Drop this is always + /// Some) + resources: Option>, /// The system event queue poll: mio::Poll, @@ -47,6 +49,14 @@ pub(crate) struct ReadyEvent { } pub(super) struct Inner { + /// Primary slab handle containing the state for each resource registered + /// with this driver. + /// + /// The ownership of this slab is moved into this structure during + /// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles + /// without risking new ones being registered in the meantime. + resources: Mutex>>, + /// Registers I/O resources registry: mio::Registry, @@ -104,9 +114,10 @@ impl Driver { Ok(Driver { tick: 0, events: Some(mio::Events::with_capacity(1024)), - resources: slab, poll, + resources: Some(slab), inner: Arc::new(Inner { + resources: Mutex::new(None), registry, io_dispatch: allocator, waker, @@ -133,7 +144,7 @@ impl Driver { self.tick = self.tick.wrapping_add(1); if self.tick == COMPACT_INTERVAL { - self.resources.compact(); + self.resources.as_mut().unwrap().compact() } let mut events = self.events.take().expect("i/o driver event store missing"); @@ -163,7 +174,9 @@ impl Driver { fn dispatch(&mut self, token: mio::Token, ready: Ready) { let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); - let io = match self.resources.get(addr) { + let resources = self.resources.as_mut().unwrap(); + + let io = match resources.get(addr) { Some(io) => io, None => return, }; @@ -181,12 +194,22 @@ impl Driver { impl Drop for Driver { fn drop(&mut self) { - self.resources.for_each(|io| { - // If a task is waiting on the I/O resource, notify it. The task - // will then attempt to use the I/O resource and fail due to the - // driver being shutdown. - io.wake(Ready::ALL); - }) + (*self.inner.resources.lock()) = self.resources.take(); + } +} + +impl Drop for Inner { + fn drop(&mut self) { + let resources = self.resources.lock().take(); + + resources.map(|mut slab| { + slab.for_each(|io| { + // If a task is waiting on the I/O resource, notify it. The task + // will then attempt to use the I/O resource and fail due to the + // driver being shutdown. + io.shutdown(); + }) + }); } } @@ -267,6 +290,10 @@ impl Handle { pub(super) fn inner(&self) -> Option> { self.inner.upgrade() } + + pub(super) fn is_alive(&self) -> bool { + self.inner.strong_count() > 0 + } } impl Unpark for Handle { diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index b1354a0551a..3aefb3766d5 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -1,4 +1,4 @@ -use super::{Direction, Ready, ReadyEvent, Tick}; +use super::{Ready, ReadyEvent, Tick}; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::bit; @@ -7,6 +7,8 @@ use crate::util::slab::Entry; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::task::{Context, Poll, Waker}; +use super::Direction; + cfg_io_readiness! { use crate::util::linked_list::{self, LinkedList}; @@ -41,6 +43,9 @@ struct Waiters { /// Waker used for AsyncWrite writer: Option, + + /// True if this ScheduledIo has been killed due to IO driver shutdown + is_shutdown: bool, } cfg_io_readiness! { @@ -121,6 +126,12 @@ impl ScheduledIo { GENERATION.unpack(self.readiness.load(Acquire)) } + /// Invoked when the IO driver is shut down; forces this ScheduledIo into a + /// permanently ready state. + pub(super) fn shutdown(&self) { + self.wake0(Ready::ALL, true) + } + /// Sets the readiness on this `ScheduledIo` by invoking the given closure on /// the current value, returning the previous readiness value. /// @@ -197,6 +208,10 @@ impl ScheduledIo { /// than 32 wakers to notify, if the stack array fills up, the lock is /// released, the array is cleared, and the iteration continues. pub(super) fn wake(&self, ready: Ready) { + self.wake0(ready, false); + } + + fn wake0(&self, ready: Ready, shutdown: bool) { const NUM_WAKERS: usize = 32; let mut wakers: [Option; NUM_WAKERS] = Default::default(); @@ -204,6 +219,8 @@ impl ScheduledIo { let mut waiters = self.waiters.lock(); + waiters.is_shutdown |= shutdown; + // check for AsyncRead slot if ready.is_readable() { if let Some(waker) = waiters.reader.take() { @@ -288,7 +305,12 @@ impl ScheduledIo { // taking the waiters lock let curr = self.readiness.load(Acquire); let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); - if ready.is_empty() { + if waiters.is_shutdown { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready: direction.mask(), + }) + } else if ready.is_empty() { Poll::Pending } else { Poll::Ready(ReadyEvent { @@ -401,7 +423,12 @@ cfg_io_readiness! { let mut waiters = scheduled_io.waiters.lock(); let curr = scheduled_io.readiness.load(SeqCst); - let ready = Ready::from_usize(READINESS.unpack(curr)); + let mut ready = Ready::from_usize(READINESS.unpack(curr)); + + if waiters.is_shutdown { + ready = Ready::ALL; + } + let ready = ready.intersection(interest); if !ready.is_empty() { diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index 9525286895f..414ef90623b 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -74,7 +74,7 @@ pub(crate) mod sync { pub(crate) use crate::loom::std::atomic_u8::AtomicU8; pub(crate) use crate::loom::std::atomic_usize::AtomicUsize; - pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool}; + pub(crate) use std::sync::atomic::{fence, spin_loop_hint, AtomicBool, Ordering}; } } From 62deeafd72b1d9ba039349a472184a78351655f3 Mon Sep 17 00:00:00 2001 From: Bryan Donlan Date: Wed, 21 Oct 2020 19:29:00 +0000 Subject: [PATCH 03/11] io_async_fd: add tests for wakeup on rt shutdown --- tokio/src/io/async_fd.rs | 16 ++++ tokio/tests/io_async_fd.rs | 148 ++++++++++++++++++++++++++++++++++++- 2 files changed, 162 insertions(+), 2 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 1aad157c924..1bfc5046e85 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -266,6 +266,14 @@ impl AsyncFd { ) -> Poll>> { let event = ready!(self.shared.poll_readiness(cx, Direction::Read)); + if !self.handle.is_alive() { + return Err(io::Error::new( + io::ErrorKind::Other, + "IO driver has terminated", + )) + .into(); + } + Ok(ReadyGuard { async_fd: self, event: Some(event), @@ -287,6 +295,14 @@ impl AsyncFd { ) -> Poll>> { let event = ready!(self.shared.poll_readiness(cx, Direction::Write)); + if !self.handle.is_alive() { + return Err(io::Error::new( + io::ErrorKind::Other, + "IO driver has terminated", + )) + .into(); + } + Ok(ReadyGuard { async_fd: self, event: Some(event), diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs index d8abaa0e935..e21a527c628 100644 --- a/tokio/tests/io_async_fd.rs +++ b/tokio/tests/io_async_fd.rs @@ -8,6 +8,7 @@ use std::sync::{ }; use std::time::Duration; use std::{ + future::Future, io::{self, ErrorKind, Read, Write}, task::{Context, Waker}, }; @@ -17,8 +18,8 @@ use nix::unistd::{close, read, write}; use futures::{poll, FutureExt}; -use tokio::io::AsyncFd; -use tokio_test::assert_pending; +use tokio::io::{AsyncFd, ReadyGuard}; +use tokio_test::{assert_err, assert_pending}; struct TestWaker { inner: Arc, @@ -447,3 +448,146 @@ async fn poll_fns() { // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side) let _ = write_fut.await; } + +fn assert_pending>(f: F) -> std::pin::Pin> { + let mut pinned = Box::pin(f); + + assert_pending!(pinned + .as_mut() + .poll(&mut Context::from_waker(futures::task::noop_waker_ref()))); + + pinned +} + +fn rt() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() +} + +#[test] +fn driver_shutdown_wakes_currently_pending() { + let rt = rt(); + + let (a, _b) = socketpair(); + let afd_a = { + let _enter = rt.enter(); + AsyncFd::new(a).unwrap() + }; + + let readable = assert_pending(afd_a.readable()); + + std::mem::drop(rt); + + // Being awoken by a rt drop does not return an error, currently... + let _ = futures::executor::block_on(readable).unwrap(); + + // However, attempting to initiate a readiness wait when the rt is dropped is an error + assert_err!(futures::executor::block_on(afd_a.readable())); +} + +#[test] +fn driver_shutdown_wakes_future_pending() { + let rt = rt(); + + let (a, _b) = socketpair(); + let afd_a = { + let _enter = rt.enter(); + AsyncFd::new(a).unwrap() + }; + + std::mem::drop(rt); + + assert_err!(futures::executor::block_on(afd_a.readable())); +} + +#[test] +fn driver_shutdown_wakes_pending_race() { + // TODO: make this a loom test + for _ in 0..100 { + let rt = rt(); + + let (a, _b) = socketpair(); + let afd_a = { + let _enter = rt.enter(); + AsyncFd::new(a).unwrap() + }; + + let _ = std::thread::spawn(move || std::mem::drop(rt)); + + // This may or may not return an error (but will be awoken) + let _ = futures::executor::block_on(afd_a.readable()); + + // However retrying will always return an error + assert_err!(futures::executor::block_on(afd_a.readable())); + } +} + +async fn poll_readable(fd: &AsyncFd) -> std::io::Result> { + futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await +} + +async fn poll_writable(fd: &AsyncFd) -> std::io::Result> { + futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await +} + +#[test] +fn driver_shutdown_wakes_currently_pending_polls() { + let rt = rt(); + + let (a, _b) = socketpair(); + let afd_a = { + let _enter = rt.enter(); + AsyncFd::new(a).unwrap() + }; + + while afd_a.inner().write(&[0; 512]).is_ok() {} // make not writable + + let readable = assert_pending(poll_readable(&afd_a)); + let writable = assert_pending(poll_writable(&afd_a)); + + std::mem::drop(rt); + + // Attempting to poll readiness when the rt is dropped is an error + assert_err!(futures::executor::block_on(readable)); + assert_err!(futures::executor::block_on(writable)); +} + +#[test] +fn driver_shutdown_wakes_poll() { + let rt = rt(); + + let (a, _b) = socketpair(); + let afd_a = { + let _enter = rt.enter(); + AsyncFd::new(a).unwrap() + }; + + std::mem::drop(rt); + + assert_err!(futures::executor::block_on(poll_readable(&afd_a))); + assert_err!(futures::executor::block_on(poll_writable(&afd_a))); +} + +#[test] +fn driver_shutdown_wakes_poll_race() { + // TODO: make this a loom test + for _ in 0..100 { + let rt = rt(); + + let (a, _b) = socketpair(); + let afd_a = { + let _enter = rt.enter(); + AsyncFd::new(a).unwrap() + }; + + while afd_a.inner().write(&[0; 512]).is_ok() {} // make not writable + + let _ = std::thread::spawn(move || std::mem::drop(rt)); + + // The poll variants will always return an error in this case + assert_err!(futures::executor::block_on(poll_readable(&afd_a))); + assert_err!(futures::executor::block_on(poll_writable(&afd_a))); + } +} From 5b09dbff86c807d0ab916ce03732998d554dfbee Mon Sep 17 00:00:00 2001 From: Bryan Donlan Date: Wed, 21 Oct 2020 19:30:29 +0000 Subject: [PATCH 04/11] async_fd: fixing a couple of PR nits --- tokio/src/io/async_fd.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 1bfc5046e85..04ebe3f5a9d 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -78,9 +78,7 @@ impl std::fmt::Debug for AsyncFd { } } -const fn all_interest() -> mio::Interest { - mio::Interest::READABLE.add(mio::Interest::WRITABLE) -} +const ALL_INTEREST: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); /// Represents an IO-ready event detected on a particular file descriptor, which /// has not yet been acknowledged. This is a `must_use` structure to help ensure @@ -93,7 +91,7 @@ pub struct ReadyGuard<'a, T> { impl<'a, T: std::fmt::Debug> std::fmt::Debug for ReadyGuard<'a, T> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ClearReady") + f.debug_struct("ReadyGuard") .field("async_fd", self.async_fd) .finish() } @@ -218,7 +216,7 @@ impl AsyncFd { pub(crate) fn new_with_handle(inner: T, fd: RawFd, handle: Handle) -> io::Result { let shared = if let Some(inner) = handle.inner() { - inner.add_source(&mut SourceFd(&fd), all_interest())? + inner.add_source(&mut SourceFd(&fd), ALL_INTEREST)? } else { return Err(io::Error::new( io::ErrorKind::Other, From 73333379a9bf869ded95eee835841ff5d2c28b23 Mon Sep 17 00:00:00 2001 From: Bryan Donlan Date: Wed, 21 Oct 2020 21:05:34 +0000 Subject: [PATCH 05/11] driver: fix unused code warning on non-unix platforms --- tokio/src/io/driver/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 2bf174326a6..37a9960ea80 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -291,8 +291,10 @@ impl Handle { self.inner.upgrade() } - pub(super) fn is_alive(&self) -> bool { - self.inner.strong_count() > 0 + cfg_net_unix! { + pub(super) fn is_alive(&self) -> bool { + self.inner.strong_count() > 0 + } } } From f4398a91708483879a6949394ca3ed52b8cd6571 Mon Sep 17 00:00:00 2001 From: Bryan Donlan Date: Wed, 21 Oct 2020 21:10:48 +0000 Subject: [PATCH 06/11] fixing doc and clippy lints --- tokio/src/io/async_fd.rs | 8 ++++++++ tokio/src/io/driver/mod.rs | 6 +++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 04ebe3f5a9d..fd3653d44e9 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -258,6 +258,10 @@ impl AsyncFd { /// This function is intended for cases where creating and pinning a future /// via [`readable`] is not feasible. Where possible, using [`readable`] is /// preferred, as this supports polling from multiple tasks at once. + /// + /// [`poll_read_ready`]: method@self::poll_read_ready + /// [`poll_write_ready`]: method@self::poll_write_ready + /// [`readable`]: method@self::readable pub fn poll_read_ready<'a, 'cx>( &'a self, cx: &mut Context<'cx>, @@ -287,6 +291,10 @@ impl AsyncFd { /// This function is intended for cases where creating and pinning a future /// via [`writable`] is not feasible. Where possible, using [`writable`] is /// preferred, as this supports polling from multiple tasks at once. + /// + /// [`poll_read_ready`]: method@self::poll_read_ready + /// [`poll_write_ready`]: method@self::poll_write_ready + /// [`writable`]: method@self::writable pub fn poll_write_ready<'a, 'cx>( &'a self, cx: &mut Context<'cx>, diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 37a9960ea80..a0d8e6f2382 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -202,14 +202,14 @@ impl Drop for Inner { fn drop(&mut self) { let resources = self.resources.lock().take(); - resources.map(|mut slab| { + if let Some(mut slab) = resources { slab.for_each(|io| { // If a task is waiting on the I/O resource, notify it. The task // will then attempt to use the I/O resource and fail due to the // driver being shutdown. io.shutdown(); - }) - }); + }); + } } } From fc9cb0370348de870530bea4fc2ede9c56768892 Mon Sep 17 00:00:00 2001 From: Bryan Donlan Date: Wed, 21 Oct 2020 21:26:51 +0000 Subject: [PATCH 07/11] fix doc errors --- tokio/src/io/async_fd.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index fd3653d44e9..3bbcdee6931 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -259,9 +259,9 @@ impl AsyncFd { /// via [`readable`] is not feasible. Where possible, using [`readable`] is /// preferred, as this supports polling from multiple tasks at once. /// - /// [`poll_read_ready`]: method@self::poll_read_ready - /// [`poll_write_ready`]: method@self::poll_write_ready - /// [`readable`]: method@self::readable + /// [`poll_read_ready`]: method@Self::poll_read_ready + /// [`poll_write_ready`]: method@Self::poll_write_ready + /// [`readable`]: method@Self::readable pub fn poll_read_ready<'a, 'cx>( &'a self, cx: &mut Context<'cx>, @@ -292,9 +292,9 @@ impl AsyncFd { /// via [`writable`] is not feasible. Where possible, using [`writable`] is /// preferred, as this supports polling from multiple tasks at once. /// - /// [`poll_read_ready`]: method@self::poll_read_ready - /// [`poll_write_ready`]: method@self::poll_write_ready - /// [`writable`]: method@self::writable + /// [`poll_read_ready`]: method@Self::poll_read_ready + /// [`poll_write_ready`]: method@Self::poll_write_ready + /// [`writable`]: method@Self::writable pub fn poll_write_ready<'a, 'cx>( &'a self, cx: &mut Context<'cx>, From 1f2d7ee72c3a6a610bd7c44a18d698c7282fda88 Mon Sep 17 00:00:00 2001 From: Bryan Donlan Date: Wed, 21 Oct 2020 21:29:26 +0000 Subject: [PATCH 08/11] fix additional clippy lints --- tokio/tests/io_async_fd.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs index e21a527c628..2fee4d63132 100644 --- a/tokio/tests/io_async_fd.rs +++ b/tokio/tests/io_async_fd.rs @@ -51,7 +51,7 @@ impl TestWaker { self.inner.awoken.swap(false, Ordering::SeqCst) } - fn context<'a>(&'a self) -> Context<'a> { + fn context(&self) -> Context<'_> { Context::from_waker(&self.waker) } } @@ -226,7 +226,7 @@ async fn reset_readable() { #[tokio::test] async fn reset_writable() { - let (a, mut b) = socketpair(); + let (a, b) = socketpair(); let afd_a = AsyncFd::new(a).unwrap(); @@ -250,7 +250,7 @@ async fn reset_writable() { } // Read from the other side; we should become writable now. - drain(&mut b); + drain(&b); let _ = writable.await.unwrap(); } From c4b73dee3dd31f85d61bcbff5ae9aaff87070879 Mon Sep 17 00:00:00 2001 From: Bryan Donlan Date: Wed, 21 Oct 2020 22:55:54 +0000 Subject: [PATCH 09/11] async_fd: docs clarification --- tokio/src/io/async_fd.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 3bbcdee6931..d7e5c801baf 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -9,7 +9,9 @@ use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo}; use crate::util::slab; /// Associates an IO object backed by a Unix file descriptor with the tokio -/// reactor, allowing for readiness to be polled. +/// reactor, allowing for readiness to be polled. The file descriptor must be of +/// a type that can be used with the OS polling facilities (ie, `poll`, `epoll`, +/// `kqueue`, etc), such as a network socket or pipe. /// /// Creating an AsyncFd registers the file descriptor with the current tokio /// Reactor, allowing you to directly await the file descriptor being readable From cb2839292e9cd4f0515ee13b16911aa1a21438e9 Mon Sep 17 00:00:00 2001 From: Bryan Donlan Date: Thu, 22 Oct 2020 17:43:39 +0000 Subject: [PATCH 10/11] async_fd: changes requested in PR --- tokio/src/io/async_fd.rs | 80 +++++++++++++++----------------------- tokio/src/io/mod.rs | 5 ++- tokio/tests/io_async_fd.rs | 33 ++++++++++------ 3 files changed, 57 insertions(+), 61 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index d7e5c801baf..1063c6d2b0e 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -25,6 +25,11 @@ use crate::util::slab; /// The [`AsyncFd::into_inner`] function can be used to extract the inner object /// to retake control from the tokio IO reactor. /// +/// The inner object is required to implement [`AsRawFd`]. This file descriptor +/// must not change while [`AsyncFd`] owns the inner object. Changing the file +/// descriptor results in unspecified behavior in the IO driver, which may +/// include breaking notifications for other sockets/etc. +/// /// Polling for readiness is done by calling the async functions [`readable`] /// and [`writable`]. These functions complete when the associated readiness /// condition is observed. Any number of tasks can query the same `AsyncFd` in @@ -58,23 +63,21 @@ use crate::util::slab; /// [`writable`]: method@Self::writable /// [`ReadyGuard`]: struct@self::ReadyGuard /// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream -pub struct AsyncFd { +pub struct AsyncFd { handle: Handle, - fd: RawFd, shared: slab::Ref, inner: Option, } -impl AsRawFd for AsyncFd { +impl AsRawFd for AsyncFd { fn as_raw_fd(&self) -> RawFd { - self.fd + self.inner.as_ref().unwrap().as_raw_fd() } } -impl std::fmt::Debug for AsyncFd { +impl std::fmt::Debug for AsyncFd { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("AsyncFd") - .field("fd", &self.fd) .field("inner", &self.inner) .finish() } @@ -86,20 +89,20 @@ const ALL_INTEREST: mio::Interest = mio::Interest::READABLE.add(mio::Interest::W /// has not yet been acknowledged. This is a `must_use` structure to help ensure /// that you do not forget to explicitly clear (or not clear) the event. #[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"] -pub struct ReadyGuard<'a, T> { +pub struct ReadyGuard<'a, T: AsRawFd> { async_fd: &'a AsyncFd, event: Option, } -impl<'a, T: std::fmt::Debug> std::fmt::Debug for ReadyGuard<'a, T> { +impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for ReadyGuard<'a, T> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ReadyGuard") - .field("async_fd", self.async_fd) + .field("async_fd", &self.async_fd) .finish() } } -impl<'a, T> ReadyGuard<'a, T> { +impl<'a, Inner: AsRawFd> ReadyGuard<'a, Inner> { /// Indicates to tokio that the file descriptor is no longer ready. The /// internal readiness flag will be cleared, and tokio will wait for the /// next edge-triggered readiness notification from the OS. @@ -137,26 +140,12 @@ impl<'a, T> ReadyGuard<'a, T> { /// create this `ReadyGuard`. /// /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock - pub fn with_io(&mut self, f: impl FnOnce() -> Result) -> Result - where - E: std::error::Error + 'static, - { - use std::error::Error; - + pub fn with_io(&mut self, f: impl FnOnce() -> io::Result) -> io::Result { let result = f(); if let Err(e) = result.as_ref() { - // Is this a WouldBlock error? - let mut error_ref: Option<&(dyn Error + 'static)> = Some(e); - - while let Some(current) = error_ref { - if let Some(e) = Error::downcast_ref::(current) { - if e.kind() == std::io::ErrorKind::WouldBlock { - self.clear_ready(); - break; - } - } - error_ref = current.source(); + if e.kind() == io::ErrorKind::WouldBlock { + self.clear_ready(); } } @@ -186,15 +175,18 @@ impl<'a, T> ReadyGuard<'a, T> { } } -impl Drop for AsyncFd { +impl Drop for AsyncFd { fn drop(&mut self) { - if let Some(inner) = self.handle.inner() { - let _ = inner.deregister_source(&mut SourceFd(&self.fd)); + if let Some(driver) = self.handle.inner() { + if let Some(inner) = self.inner.as_ref() { + let fd = inner.as_raw_fd(); + let _ = driver.deregister_source(&mut SourceFd(&fd)); + } } } } -impl AsyncFd { +impl AsyncFd { /// Creates an AsyncFd backed by (and taking ownership of) an object /// implementing [`AsRawFd`]. The backing file descriptor is cached at the /// time of creation. @@ -204,19 +196,12 @@ impl AsyncFd { where T: AsRawFd, { - let fd = inner.as_raw_fd(); - Self::new_with_fd(inner, fd) + Self::new_with_handle(inner, Handle::current()) } - /// Constructs a new AsyncFd, explicitly specifying the backing file - /// descriptor. - /// - /// This function must be called in the context of a tokio runtime. - pub fn new_with_fd(inner: T, fd: RawFd) -> io::Result { - Self::new_with_handle(inner, fd, Handle::current()) - } + pub(crate) fn new_with_handle(inner: T, handle: Handle) -> io::Result { + let fd = inner.as_raw_fd(); - pub(crate) fn new_with_handle(inner: T, fd: RawFd, handle: Handle) -> io::Result { let shared = if let Some(inner) = handle.inner() { inner.add_source(&mut SourceFd(&fd), ALL_INTEREST)? } else { @@ -228,7 +213,6 @@ impl AsyncFd { Ok(AsyncFd { handle, - fd, shared, inner: Some(inner), }) @@ -236,13 +220,13 @@ impl AsyncFd { /// Returns a shared reference to the backing object of this [`AsyncFd`] #[inline] - pub fn inner(&self) -> &T { + pub fn get_ref(&self) -> &T { self.inner.as_ref().unwrap() } /// Returns a mutable reference to the backing object of this [`AsyncFd`] #[inline] - pub fn inner_mut(&mut self) -> &mut T { + pub fn get_mut(&mut self) -> &mut T { self.inner.as_mut().unwrap() } @@ -264,9 +248,9 @@ impl AsyncFd { /// [`poll_read_ready`]: method@Self::poll_read_ready /// [`poll_write_ready`]: method@Self::poll_write_ready /// [`readable`]: method@Self::readable - pub fn poll_read_ready<'a, 'cx>( + pub fn poll_read_ready<'a>( &'a self, - cx: &mut Context<'cx>, + cx: &mut Context<'_>, ) -> Poll>> { let event = ready!(self.shared.poll_readiness(cx, Direction::Read)); @@ -297,9 +281,9 @@ impl AsyncFd { /// [`poll_read_ready`]: method@Self::poll_read_ready /// [`poll_write_ready`]: method@Self::poll_write_ready /// [`writable`]: method@Self::writable - pub fn poll_write_ready<'a, 'cx>( + pub fn poll_write_ready<'a>( &'a self, - cx: &mut Context<'cx>, + cx: &mut Context<'_>, ) -> Poll>> { let event = ready!(self.shared.poll_readiness(cx, Direction::Write)); diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index b782577604d..9c7087f4dc4 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -218,7 +218,10 @@ cfg_io_driver! { cfg_net_unix! { mod async_fd; - pub use self::async_fd::{AsyncFd, ReadyGuard}; + pub mod unix { + //! Asynchronous IO structures specific to Unix-like operating systems. + pub use super::async_fd::{AsyncFd, ReadyGuard}; + } } cfg_io_std! { diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs index 2fee4d63132..2d42b7a68ca 100644 --- a/tokio/tests/io_async_fd.rs +++ b/tokio/tests/io_async_fd.rs @@ -18,7 +18,7 @@ use nix::unistd::{close, read, write}; use futures::{poll, FutureExt}; -use tokio::io::{AsyncFd, ReadyGuard}; +use tokio::io::unix::{AsyncFd, ReadyGuard}; use tokio_test::{assert_err, assert_pending}; struct TestWaker { @@ -201,7 +201,7 @@ async fn reset_readable() { let mut guard = readable.await.unwrap(); - guard.with_io(|| afd_a.inner().read(&mut [0])).unwrap(); + guard.with_io(|| afd_a.get_ref().read(&mut [0])).unwrap(); // `a` is not readable, but the reactor still thinks it is // (because we have not observed a not-ready error yet) @@ -234,7 +234,7 @@ async fn reset_writable() { // Write until we get a WouldBlock. This also clears the ready state. loop { - if let Err(e) = guard.with_io(|| afd_a.inner().write(&[0; 512][..])) { + if let Err(e) = guard.with_io(|| afd_a.get_ref().write(&[0; 512][..])) { assert_eq!(ErrorKind::WouldBlock, e.kind()); break; } @@ -255,6 +255,14 @@ async fn reset_writable() { let _ = writable.await.unwrap(); } +#[derive(Debug)] +struct ArcFd(Arc); +impl AsRawFd for ArcFd { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + #[tokio::test] async fn drop_closes() { let (a, mut b) = socketpair(); @@ -283,7 +291,8 @@ async fn drop_closes() { // Drop closure behavior is delegated to the inner object let (a, mut b) = socketpair(); - let afd_a = AsyncFd::new_with_fd((), a.fd).unwrap(); + let arc_fd = Arc::new(a); + let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap(); std::mem::drop(afd_a); assert_eq!( @@ -304,7 +313,7 @@ async fn with_poll() { let mut guard = afd_a.readable().await.unwrap(); - afd_a.inner().read_exact(&mut [0]).unwrap(); + afd_a.get_ref().read_exact(&mut [0]).unwrap(); // Should not clear the readable state let _ = guard.with_poll(|| Poll::Ready(())); @@ -383,7 +392,7 @@ async fn poll_fns() { let afd_b = Arc::new(AsyncFd::new(b).unwrap()); // Fill up the write side of A - while afd_a.inner().write(&[0; 512]).is_ok() {} + while afd_a.get_ref().write(&[0; 512]).is_ok() {} let waker = TestWaker::new(); @@ -429,7 +438,7 @@ async fn poll_fns() { } // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly - afd_b.inner().write_all(b"0").unwrap(); + afd_b.get_ref().write_all(b"0").unwrap(); let _ = tokio::join!(readable, read_fut); @@ -443,7 +452,7 @@ async fn poll_fns() { } // Make it writable now - drain(afd_b.inner()); + drain(afd_b.get_ref()); // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side) let _ = write_fut.await; @@ -524,11 +533,11 @@ fn driver_shutdown_wakes_pending_race() { } } -async fn poll_readable(fd: &AsyncFd) -> std::io::Result> { +async fn poll_readable(fd: &AsyncFd) -> std::io::Result> { futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await } -async fn poll_writable(fd: &AsyncFd) -> std::io::Result> { +async fn poll_writable(fd: &AsyncFd) -> std::io::Result> { futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await } @@ -542,7 +551,7 @@ fn driver_shutdown_wakes_currently_pending_polls() { AsyncFd::new(a).unwrap() }; - while afd_a.inner().write(&[0; 512]).is_ok() {} // make not writable + while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable let readable = assert_pending(poll_readable(&afd_a)); let writable = assert_pending(poll_writable(&afd_a)); @@ -582,7 +591,7 @@ fn driver_shutdown_wakes_poll_race() { AsyncFd::new(a).unwrap() }; - while afd_a.inner().write(&[0; 512]).is_ok() {} // make not writable + while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable let _ = std::thread::spawn(move || std::mem::drop(rt)); From 029d89e149883b9ad37debdd5e1f0a2596553b0e Mon Sep 17 00:00:00 2001 From: Bryan Donlan Date: Thu, 22 Oct 2020 17:58:23 +0000 Subject: [PATCH 11/11] async_fd: rename ReadyGuard -> AsyncFdReadyGuard, fix clippy lint --- tokio/src/io/async_fd.rs | 46 +++++++++++++++++++------------------- tokio/src/io/mod.rs | 2 +- tokio/tests/io_async_fd.rs | 8 ++++--- 3 files changed, 29 insertions(+), 27 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 1063c6d2b0e..e5ad2ab4314 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -45,9 +45,9 @@ use crate::util::slab; /// /// This however does mean that it is critical to ensure that this ready flag is /// cleared when (and only when) the file descriptor ceases to be ready. The -/// [`ReadyGuard`] returned from readiness checking functions serves this +/// [`AsyncFdReadyGuard`] returned from readiness checking functions serves this /// function; after calling a readiness-checking async function, you must use -/// this [`ReadyGuard`] to signal to tokio whether the file descriptor is no +/// this [`AsyncFdReadyGuard`] to signal to tokio whether the file descriptor is no /// longer in a ready state. /// /// ## Use with to a poll-based API @@ -61,7 +61,7 @@ use crate::util::slab; /// /// [`readable`]: method@Self::readable /// [`writable`]: method@Self::writable -/// [`ReadyGuard`]: struct@self::ReadyGuard +/// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard /// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream pub struct AsyncFd { handle: Handle, @@ -89,12 +89,12 @@ const ALL_INTEREST: mio::Interest = mio::Interest::READABLE.add(mio::Interest::W /// has not yet been acknowledged. This is a `must_use` structure to help ensure /// that you do not forget to explicitly clear (or not clear) the event. #[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"] -pub struct ReadyGuard<'a, T: AsRawFd> { +pub struct AsyncFdReadyGuard<'a, T: AsRawFd> { async_fd: &'a AsyncFd, event: Option, } -impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for ReadyGuard<'a, T> { +impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ReadyGuard") .field("async_fd", &self.async_fd) @@ -102,7 +102,7 @@ impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for ReadyGuard<'a, T> { } } -impl<'a, Inner: AsRawFd> ReadyGuard<'a, Inner> { +impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> { /// Indicates to tokio that the file descriptor is no longer ready. The /// internal readiness flag will be cleared, and tokio will wait for the /// next edge-triggered readiness notification from the OS. @@ -123,7 +123,7 @@ impl<'a, Inner: AsRawFd> ReadyGuard<'a, Inner> { /// ready flag asserted. /// /// While this function is itself a no-op, it satisfies the `#[must_use]` - /// constraint on the [`ReadyGuard`] type. + /// constraint on the [`AsyncFdReadyGuard`] type. pub fn retain_ready(&mut self) { // no-op } @@ -136,8 +136,8 @@ impl<'a, Inner: AsRawFd> ReadyGuard<'a, Inner> { /// clearing the tokio-side state only when a [`WouldBlock`] condition /// occurs. It is the responsibility of the caller to ensure that `f` /// returns [`WouldBlock`] only if the file descriptor that originated this - /// `ReadyGuard` no longer expresses the readiness state that was queried to - /// create this `ReadyGuard`. + /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to + /// create this `AsyncFdReadyGuard`. /// /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn with_io(&mut self, f: impl FnOnce() -> io::Result) -> io::Result { @@ -160,8 +160,8 @@ impl<'a, Inner: AsRawFd> ReadyGuard<'a, Inner> { /// clearing the tokio-side state only when a [`Pending`] condition occurs. /// It is the responsibility of the caller to ensure that `f` returns /// [`Pending`] only if the file descriptor that originated this - /// `ReadyGuard` no longer expresses the readiness state that was queried to - /// create this `ReadyGuard`. + /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to + /// create this `AsyncFdReadyGuard`. /// /// [`Pending`]: std::task::Poll::Pending pub fn with_poll(&mut self, f: impl FnOnce() -> std::task::Poll) -> std::task::Poll { @@ -251,7 +251,7 @@ impl AsyncFd { pub fn poll_read_ready<'a>( &'a self, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { let event = ready!(self.shared.poll_readiness(cx, Direction::Read)); if !self.handle.is_alive() { @@ -262,7 +262,7 @@ impl AsyncFd { .into(); } - Ok(ReadyGuard { + Ok(AsyncFdReadyGuard { async_fd: self, event: Some(event), }) @@ -284,7 +284,7 @@ impl AsyncFd { pub fn poll_write_ready<'a>( &'a self, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { let event = ready!(self.shared.poll_readiness(cx, Direction::Write)); if !self.handle.is_alive() { @@ -295,14 +295,14 @@ impl AsyncFd { .into(); } - Ok(ReadyGuard { + Ok(AsyncFdReadyGuard { async_fd: self, event: Some(event), }) .into() } - async fn readiness(&self, interest: mio::Interest) -> io::Result> { + async fn readiness(&self, interest: mio::Interest) -> io::Result> { let event = self.shared.readiness(interest); if !self.handle.is_alive() { @@ -313,25 +313,25 @@ impl AsyncFd { } let event = event.await; - Ok(ReadyGuard { + Ok(AsyncFdReadyGuard { async_fd: self, event: Some(event), }) } /// Waits for the file descriptor to become readable, returning a - /// [`ReadyGuard`] that must be dropped to resume read-readiness polling. + /// [`AsyncFdReadyGuard`] that must be dropped to resume read-readiness polling. /// - /// [`ReadyGuard`]: struct@self::ReadyGuard - pub async fn readable(&self) -> io::Result> { + /// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard + pub async fn readable(&self) -> io::Result> { self.readiness(mio::Interest::READABLE).await } /// Waits for the file descriptor to become writable, returning a - /// [`ReadyGuard`] that must be dropped to resume write-readiness polling. + /// [`AsyncFdReadyGuard`] that must be dropped to resume write-readiness polling. /// - /// [`ReadyGuard`]: struct@self::ReadyGuard - pub async fn writable(&self) -> io::Result> { + /// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard + pub async fn writable(&self) -> io::Result> { self.readiness(mio::Interest::WRITABLE).await } } diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 9c7087f4dc4..20d92233c73 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -220,7 +220,7 @@ cfg_net_unix! { pub mod unix { //! Asynchronous IO structures specific to Unix-like operating systems. - pub use super::async_fd::{AsyncFd, ReadyGuard}; + pub use super::async_fd::{AsyncFd, AsyncFdReadyGuard}; } } diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs index 2d42b7a68ca..0303eff6612 100644 --- a/tokio/tests/io_async_fd.rs +++ b/tokio/tests/io_async_fd.rs @@ -18,7 +18,7 @@ use nix::unistd::{close, read, write}; use futures::{poll, FutureExt}; -use tokio::io::unix::{AsyncFd, ReadyGuard}; +use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard}; use tokio_test::{assert_err, assert_pending}; struct TestWaker { @@ -299,6 +299,8 @@ async fn drop_closes() { ErrorKind::WouldBlock, b.read(&mut [0]).err().unwrap().kind() ); + + std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning } #[tokio::test] @@ -533,11 +535,11 @@ fn driver_shutdown_wakes_pending_race() { } } -async fn poll_readable(fd: &AsyncFd) -> std::io::Result> { +async fn poll_readable(fd: &AsyncFd) -> std::io::Result> { futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await } -async fn poll_writable(fd: &AsyncFd) -> std::io::Result> { +async fn poll_writable(fd: &AsyncFd) -> std::io::Result> { futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await }