Skip to content

Commit

Permalink
Start over with new requirements
Browse files Browse the repository at this point in the history
  • Loading branch information
davidpdrsn committed Mar 13, 2021
1 parent ef56cd8 commit d85ea74
Show file tree
Hide file tree
Showing 10 changed files with 607 additions and 46 deletions.
25 changes: 25 additions & 0 deletions tokio/src/io/driver/mod.rs
Expand Up @@ -14,6 +14,7 @@ pub(crate) use registration::Registration;
mod scheduled_io;
use scheduled_io::ScheduledIo;

use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::park::{Park, Unpark};
use crate::util::slab::{self, Slab};
use crate::{loom::sync::Mutex, util::bit};
Expand Down Expand Up @@ -73,6 +74,9 @@ pub(super) struct Inner {

/// Used to wake up the reactor from a call to `turn`
waker: mio::Waker,

/// Whether the driver is shutdown.
is_shutdown: AtomicBool,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
Expand Down Expand Up @@ -129,6 +133,7 @@ impl Driver {
registry,
io_dispatch: allocator,
waker,
is_shutdown: AtomicBool::new(false),
}),
})
}
Expand Down Expand Up @@ -208,6 +213,8 @@ impl Drop for Driver {

impl Drop for Inner {
fn drop(&mut self) {
self.is_shutdown.store(true, Ordering::SeqCst);

let resources = self.resources.lock().take();

if let Some(mut slab) = resources {
Expand Down Expand Up @@ -297,6 +304,24 @@ impl Handle {
pub(super) fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
}

pub(crate) fn shutdown(self) {
if let Some(inner) = self.inner.upgrade() {
inner
.is_shutdown
.store(true, crate::loom::sync::atomic::Ordering::SeqCst);
}
}

pub(crate) fn is_shutdown(&self) -> bool {
if let Some(inner) = self.inner.upgrade() {
inner.is_shutdown.load(Ordering::SeqCst)
} else {
// if the inner type has been dropped then its `Drop` impl will have been called which
// sets `Inner.is_shutdown` to `true`. So therefore it must have been shutdown.
true
}
}
}

impl Unpark for Handle {
Expand Down
11 changes: 9 additions & 2 deletions tokio/src/io/driver/registration.rs
Expand Up @@ -54,7 +54,7 @@ unsafe impl Sync for Registration {}

impl Registration {
/// Registers the I/O resource with the default reactor, for a specific
/// `Interest`. `new_with_interest` should be used over `new` when you need
/// `Interest`. `new_with_interest` should be ucrate::util::error::RUNTIME_SHUTTING_DOWN_ERRORsed over `new` when you need
/// control over the readiness state, such as when a file descriptor only
/// allows reads. This does not add `hup` or `error` so if you are
/// interested in those states, you will need to add them to the readiness
Expand All @@ -69,6 +69,13 @@ impl Registration {
interest: Interest,
handle: Handle,
) -> io::Result<Registration> {
if handle.is_shutdown() {
return Err(io::Error::new(
io::ErrorKind::Other,
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
));
}

let shared = if let Some(inner) = handle.inner() {
inner.add_source(io, interest)?
} else {
Expand Down Expand Up @@ -232,7 +239,7 @@ cfg_io_readiness! {
pin!(fut);

crate::future::poll_fn(|cx| {
if self.handle.inner().is_none() {
if self.handle.inner().is_none() || self.handle.is_shutdown() {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
}

Expand Down
12 changes: 11 additions & 1 deletion tokio/src/runtime/handle.rs
Expand Up @@ -211,7 +211,17 @@ impl Handle {
let mut _blocking_enter = crate::runtime::enter(true);

// Block on the future
_blocking_enter.block_on(future).expect("failed to park thread")
_blocking_enter
.block_on(future)
.expect("failed to park thread")
}

pub(crate) fn shutdown(mut self) {
self.spawner.shutdown();

if let Some(io_handle) = self.io_handle {
io_handle.shutdown();
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/mod.rs
Expand Up @@ -526,7 +526,7 @@ cfg_rt! {
/// ```
pub fn shutdown_timeout(mut self, duration: Duration) {
// Wakeup and shutdown all the worker threads
self.handle.spawner.shutdown();
self.handle.shutdown();
self.blocking_pool.shutdown(Some(duration));
}

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/time/driver/entry.rs
Expand Up @@ -543,6 +543,10 @@ impl TimerEntry {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), super::Error>> {
if self.driver.is_shutdown() {
panic!(crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR);
}

if let Some(deadline) = self.initial_deadline {
self.as_mut().reset(deadline);
}
Expand Down
19 changes: 12 additions & 7 deletions tokio/src/time/driver/handle.rs
@@ -1,18 +1,18 @@
use crate::loom::sync::{Arc, Mutex};
use crate::loom::sync::Arc;
use crate::time::driver::ClockTime;
use std::fmt;

/// Handle to time driver instance.
#[derive(Clone)]
pub(crate) struct Handle {
time_source: ClockTime,
inner: Arc<Mutex<super::Inner>>,
inner: Arc<super::Inner>,
}

impl Handle {
/// Creates a new timer `Handle` from a shared `Inner` timer state.
pub(super) fn new(inner: Arc<Mutex<super::Inner>>) -> Self {
let time_source = inner.lock().time_source.clone();
pub(super) fn new(inner: Arc<super::Inner>) -> Self {
let time_source = inner.state.lock().time_source.clone();
Handle { time_source, inner }
}

Expand All @@ -21,9 +21,14 @@ impl Handle {
&self.time_source
}

/// Locks the driver's inner structure
pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, super::Inner> {
self.inner.lock()
/// Access the driver's inner structure
pub(super) fn get(&self) -> &super::Inner {
&*self.inner
}

// Check whether the driver has been shutdown
pub(super) fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
}
}

Expand Down
71 changes: 43 additions & 28 deletions tokio/src/time/driver/mod.rs
Expand Up @@ -16,6 +16,7 @@ mod wheel;

pub(super) mod sleep;

use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Arc, Mutex};
use crate::park::{Park, Unpark};
use crate::time::error::Error;
Expand Down Expand Up @@ -86,7 +87,7 @@ pub(crate) struct Driver<P: Park + 'static> {
time_source: ClockTime,

/// Shared state
inner: Handle,
handle: Handle,

/// Parker to delegate to
park: P,
Expand Down Expand Up @@ -132,7 +133,16 @@ impl ClockTime {
}

/// Timer state shared between `Driver`, `Handle`, and `Registration`.
pub(self) struct Inner {
struct Inner {
// The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
pub(super) state: Mutex<InnerState>,

/// True if the driver is being shutdown
pub(super) is_shutdown: AtomicBool,
}

/// Time state shared which must be protected by a `Mutex`
struct InnerState {
/// Timing backend in use
time_source: ClockTime,

Expand All @@ -145,9 +155,6 @@ pub(self) struct Inner {
/// Timer wheel
wheel: wheel::Wheel,

/// True if the driver is being shutdown
is_shutdown: bool,

/// Unparker that can be used to wake the time driver
unpark: Box<dyn Unpark>,
}
Expand All @@ -169,7 +176,7 @@ where

Driver {
time_source,
inner: Handle::new(Arc::new(Mutex::new(inner))),
handle: Handle::new(Arc::new(inner)),
park,
}
}
Expand All @@ -181,15 +188,15 @@ where
/// `with_default`, setting the timer as the default timer for the execution
/// context.
pub(crate) fn handle(&self) -> Handle {
self.inner.clone()
self.handle.clone()
}

fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
let clock = &self.time_source.clock;

let mut lock = self.inner.lock();
let mut lock = self.handle.get().state.lock();

assert!(!lock.is_shutdown);
assert!(!self.handle.is_shutdown());

let next_wake = lock.wheel.next_expiration_time();
lock.next_wake =
Expand Down Expand Up @@ -237,7 +244,7 @@ where
}

// Process pending timers after waking up
self.inner.process();
self.handle.process();

Ok(())
}
Expand All @@ -255,7 +262,7 @@ impl Handle {
let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;

let mut lock = self.lock();
let mut lock = self.get().lock();

assert!(now >= lock.elapsed);

Expand All @@ -278,7 +285,7 @@ impl Handle {

waker_idx = 0;

lock = self.lock();
lock = self.get().lock();
}
}
}
Expand Down Expand Up @@ -309,7 +316,7 @@ impl Handle {
/// `add_entry` must not be called concurrently.
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
unsafe {
let mut lock = self.lock();
let mut lock = self.get().lock();

if entry.as_ref().might_be_registered() {
lock.wheel.remove(entry);
Expand All @@ -327,7 +334,7 @@ impl Handle {
/// the `TimerEntry`)
pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) {
let waker = unsafe {
let mut lock = self.lock();
let mut lock = self.get().lock();

// We may have raced with a firing/deregistration, so check before
// deregistering.
Expand All @@ -338,7 +345,7 @@ impl Handle {
// Now that we have exclusive control of this entry, mint a handle to reinsert it.
let entry = entry.as_ref().handle();

if lock.is_shutdown {
if self.is_shutdown() {
unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
} else {
entry.set_expiration(new_tick);
Expand Down Expand Up @@ -396,19 +403,15 @@ where
}

fn shutdown(&mut self) {
let mut lock = self.inner.lock();

if lock.is_shutdown {
if self.handle.is_shutdown() {
return;
}

lock.is_shutdown = true;

drop(lock);
self.handle.get().is_shutdown.store(true, Ordering::SeqCst);

// Advance time forward to the end of time.

self.inner.process_at_time(u64::MAX);
self.handle.process_at_time(u64::MAX);

self.park.shutdown();
}
Expand All @@ -428,14 +431,26 @@ where
impl Inner {
pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self {
Inner {
time_source,
elapsed: 0,
next_wake: None,
unpark,
wheel: wheel::Wheel::new(),
is_shutdown: false,
state: Mutex::new(InnerState {
time_source,
elapsed: 0,
next_wake: None,
unpark,
wheel: wheel::Wheel::new(),
}),
is_shutdown: AtomicBool::new(false),
}
}

/// Locks the driver's inner structure
pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> {
self.state.lock()
}

// Check whether the driver has been shutdown
pub(super) fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::SeqCst)
}
}

impl fmt::Debug for Inner {
Expand Down

0 comments on commit d85ea74

Please sign in to comment.