diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index ac64653b46d..2c943ea4edb 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -1,4 +1,4 @@ -use crate::io::driver::{Direction, ReadyEvent}; +use crate::io::driver::{Direction, Handle, ReadyEvent}; use crate::io::registration::Registration; use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; @@ -90,6 +90,7 @@ where /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + #[cfg_attr(feature = "signal", allow(unused))] pub(crate) fn new(io: E) -> io::Result { PollEvented::new_with_ready(io, mio::Ready::all()) } @@ -118,8 +119,17 @@ where /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + #[cfg_attr(feature = "signal", allow(unused))] pub(crate) fn new_with_ready(io: E, ready: mio::Ready) -> io::Result { - let registration = Registration::new_with_ready(&io, ready)?; + Self::new_with_ready_and_handle(io, ready, Handle::current()) + } + + pub(crate) fn new_with_ready_and_handle( + io: E, + ready: mio::Ready, + handle: Handle, + ) -> io::Result { + let registration = Registration::new_with_ready_and_handle(&io, ready, handle)?; Ok(Self { io: Some(io), registration, @@ -128,7 +138,13 @@ where /// Returns a shared reference to the underlying I/O object this readiness /// stream is wrapping. - #[cfg(any(feature = "process", feature = "tcp", feature = "udp", feature = "uds",))] + #[cfg(any( + feature = "process", + feature = "tcp", + feature = "udp", + feature = "uds", + feature = "signal" + ))] pub(crate) fn get_ref(&self) -> &E { self.io.as_ref().unwrap() } diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 7396c4b87f0..e4ec096f297 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -38,7 +38,7 @@ cfg_io_driver! { /// [`poll_read_ready`]: method@Self::poll_read_ready` /// [`poll_write_ready`]: method@Self::poll_write_ready` #[derive(Debug)] - pub(super) struct Registration { + pub(crate) struct Registration { /// Handle to the associated driver. handle: Handle, @@ -74,23 +74,6 @@ impl Registration { /// /// - `Ok` if the registration happened successfully /// - `Err` if an error was encountered during registration - /// - /// - /// # Panics - /// - /// This function panics if thread-local runtime is not set. - /// - /// The runtime is usually set implicitly when this function is called - /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub(super) fn new_with_ready(io: &T, ready: mio::Ready) -> io::Result - where - T: Evented, - { - Self::new_with_ready_and_handle(io, ready, Handle::current()) - } - - /// Same as `new_with_ready` but also accepts an explicit handle. pub(crate) fn new_with_ready_and_handle( io: &T, ready: mio::Ready, diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 01a59c88191..1336c63de92 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -134,6 +134,15 @@ macro_rules! cfg_io_driver { } } +macro_rules! cfg_not_io_driver { + ($($item:item)*) => { + $( + #[cfg(not(feature = "io-driver"))] + $item + )* + } +} + macro_rules! cfg_io_readiness { ($($item:item)*) => { $( diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 68baa7f40ac..eb316b074d3 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -192,11 +192,6 @@ impl TryFrom for mio_uds::UnixListener { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: UnixListener) -> Result { value.io.into_inner() } diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index 37104314dc4..715ed7aa485 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -143,11 +143,6 @@ impl TryFrom for mio_uds::UnixStream { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: UnixStream) -> Result { value.io.into_inner() } diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs index 0458893cce4..639a483ef24 100644 --- a/tokio/src/signal/unix/driver.rs +++ b/tokio/src/signal/unix/driver.rs @@ -1,13 +1,15 @@ //! Signal driver use crate::io::driver::Driver as IoDriver; -use crate::io::Registration; +use crate::io::PollEvented; use crate::park::Park; use crate::runtime::context; use crate::signal::registry::globals; use mio_uds::UnixStream; use std::io::{self, Read}; +use std::ptr; use std::sync::{Arc, Weak}; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use std::time::Duration; /// Responsible for registering wakeups when an OS signal is received, and @@ -21,11 +23,7 @@ pub(crate) struct Driver { park: IoDriver, /// A pipe for receiving wake events from the signal handler - receiver: UnixStream, - - /// The actual registraiton for `receiver` when active. - /// Lazily bound at the first signal registration. - registration: Registration, + receiver: PollEvented, /// Shared state inner: Arc, @@ -58,13 +56,12 @@ impl Driver { // either, since we can't compare Handles or assume they will always // point to the exact same reactor. let receiver = globals().receiver.try_clone()?; - let registration = - Registration::new_with_ready_and_handle(&receiver, mio::Ready::all(), park.handle())?; + let receiver = + PollEvented::new_with_ready_and_handle(receiver, mio::Ready::all(), park.handle())?; Ok(Self { park, receiver, - registration, inner: Arc::new(Inner(())), }) } @@ -79,17 +76,23 @@ impl Driver { fn process(&self) { // Check if the pipe is ready to read and therefore has "woken" us up - match self.registration.take_read_ready() { - Ok(Some(ready)) => assert!(ready.is_readable()), - Ok(None) => return, // No wake has arrived, bail - Err(e) => panic!("reactor gone: {}", e), - } + // + // To do so, we will `poll_read_ready` with a noop waker, since we don't + // need to actually be notified when read ready... + let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) }; + let mut cx = Context::from_waker(&waker); + + let ev = match self.receiver.poll_read_ready(&mut cx) { + Poll::Ready(Ok(ev)) => ev, + Poll::Ready(Err(e)) => panic!("reactor gone: {}", e), + Poll::Pending => return, // No wake has arrived, bail + }; // Drain the pipe completely so we can receive a new readiness event // if another signal has come in. let mut buf = [0; 128]; loop { - match (&self.receiver).read(&mut buf) { + match self.receiver.get_ref().read(&mut buf) { Ok(0) => panic!("EOF on self-pipe"), Ok(_) => continue, // Keep reading Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, @@ -97,11 +100,21 @@ impl Driver { } } + self.receiver.clear_readiness(ev); + // Broadcast any signals which were received globals().broadcast(); } } +const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop); + +unsafe fn noop_clone(_data: *const ()) -> RawWaker { + RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE) +} + +unsafe fn noop(_data: *const ()) {} + // ===== impl Park for Driver ===== impl Park for Driver {