Skip to content

Commit

Permalink
Merge branch 'master' into command-process-group
Browse files Browse the repository at this point in the history
  • Loading branch information
HarveyHunt committed Oct 26, 2022
2 parents 9f56fb0 + 1ca17be commit 76e2def
Show file tree
Hide file tree
Showing 21 changed files with 181 additions and 112 deletions.
7 changes: 7 additions & 0 deletions tokio/src/macros/cfg.rs
Expand Up @@ -297,6 +297,13 @@ macro_rules! cfg_signal_internal {
}
}

macro_rules! cfg_signal_internal_and_unix {
($($item:item)*) => {
#[cfg(unix)]
cfg_signal_internal! { $($item)* }
}
}

macro_rules! cfg_not_signal_internal {
($($item:item)*) => {
$(
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/process/unix/driver.rs
Expand Up @@ -3,7 +3,7 @@
//! Process driver.

use crate::process::unix::GlobalOrphanQueue;
use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};
use crate::runtime::signal::{Driver as SignalDriver, Handle as SignalHandle};

use std::time::Duration;

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/process/unix/mod.rs
Expand Up @@ -32,7 +32,7 @@ use reap::Reaper;
use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
use crate::process::kill::Kill;
use crate::process::SpawnedChild;
use crate::signal::unix::driver::Handle as SignalHandle;
use crate::runtime::signal::Handle as SignalHandle;
use crate::signal::unix::{signal, Signal, SignalKind};

use mio::event::Source;
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/process/unix/orphan.rs
@@ -1,5 +1,5 @@
use crate::loom::sync::{Mutex, MutexGuard};
use crate::signal::unix::driver::Handle as SignalHandle;
use crate::runtime::signal::Handle as SignalHandle;
use crate::signal::unix::{signal_with_handle, SignalKind};
use crate::sync::watch;
use std::io;
Expand Down Expand Up @@ -132,7 +132,7 @@ where
pub(crate) mod test {
use super::*;
use crate::runtime::io::Driver as IoDriver;
use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};
use crate::runtime::signal::{Driver as SignalDriver, Handle as SignalHandle};
use crate::sync::watch;
use std::cell::{Cell, RefCell};
use std::io;
Expand Down
13 changes: 3 additions & 10 deletions tokio/src/runtime/driver.rs
Expand Up @@ -211,19 +211,12 @@ cfg_not_io_driver! {

// ===== signal driver =====

macro_rules! cfg_signal_internal_and_unix {
($($item:item)*) => {
#[cfg(unix)]
cfg_signal_internal! { $($item)* }
}
}

cfg_signal_internal_and_unix! {
type SignalDriver = crate::signal::unix::driver::Driver;
pub(crate) type SignalHandle = Option<crate::signal::unix::driver::Handle>;
type SignalDriver = crate::runtime::signal::Driver;
pub(crate) type SignalHandle = Option<crate::runtime::signal::Handle>;

fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> {
let driver = crate::signal::unix::driver::Driver::new(io_driver)?;
let driver = crate::runtime::signal::Driver::new(io_driver)?;
let handle = driver.handle();
Ok((driver, Some(handle)))
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/handle.rs
Expand Up @@ -114,6 +114,10 @@ impl Handle {
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
/// You do not have to `.await` the returned `JoinHandle` to make the
/// provided future start execution. It will start running in the background
/// immediately when `spawn` is called.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/mod.rs
Expand Up @@ -188,6 +188,10 @@ cfg_time! {
pub(crate) mod time;
}

cfg_signal_internal_and_unix! {
pub(crate) mod signal;
}

cfg_rt! {
pub(crate) mod enter;

Expand Down Expand Up @@ -382,6 +386,10 @@ cfg_rt! {
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
/// You do not have to `.await` the returned `JoinHandle` to make the
/// provided future start execution. It will start running in the
/// background immediately when `spawn` is called.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
Expand Down
Expand Up @@ -150,7 +150,7 @@ unsafe fn noop(_data: *const ()) {}
// ===== impl Handle =====

impl Handle {
pub(super) fn check_inner(&self) -> std_io::Result<()> {
pub(crate) fn check_inner(&self) -> std_io::Result<()> {
if self.inner.strong_count() > 0 {
Ok(())
} else {
Expand All @@ -170,7 +170,7 @@ cfg_rt! {
///
/// This function panics if there is no current signal driver set.
#[track_caller]
pub(super) fn current() -> Self {
pub(crate) fn current() -> Self {
crate::runtime::context::signal_handle().expect(
"there is no signal driver running, must be called from the context of Tokio runtime",
)
Expand All @@ -186,7 +186,7 @@ cfg_not_rt! {
///
/// This function panics if there is no current signal driver set.
#[track_caller]
pub(super) fn current() -> Self {
pub(crate) fn current() -> Self {
panic!(
"there is no signal driver running, must be called from the context of Tokio runtime or with\
`rt` enabled.",
Expand Down
6 changes: 4 additions & 2 deletions tokio/src/runtime/task/join.rs
Expand Up @@ -10,8 +10,10 @@ use std::task::{Context, Poll, Waker};
cfg_rt! {
/// An owned permission to join on a task (await its termination).
///
/// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for
/// a task rather than a thread.
/// This can be thought of as the equivalent of [`std::thread::JoinHandle`]
/// for a Tokio task rather than a thread. You do not need to `.await` the
/// `JoinHandle` to make the task execute — it will start running in the
/// background immediately.
///
/// A `JoinHandle` *detaches* the associated task when it is dropped, which
/// means that there is no longer any handle to the task, and no way to `join`
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/time/entry.rs
Expand Up @@ -576,7 +576,7 @@ impl TimerEntry {
this.inner().state.poll(cx.waker())
}

fn driver(&self) -> &super::Handle {
pub(crate) fn driver(&self) -> &super::Handle {
self.driver.time()
}
}
Expand Down
17 changes: 2 additions & 15 deletions tokio/src/runtime/time/handle.rs
@@ -1,31 +1,18 @@
use crate::loom::sync::Arc;
use crate::runtime::time::TimeSource;
use std::fmt;

/// Handle to time driver instance.
#[derive(Clone)]
pub(crate) struct Handle {
time_source: TimeSource,
pub(super) inner: Arc<super::Inner>,
pub(super) time_source: TimeSource,
pub(super) inner: super::Inner,
}

impl Handle {
/// Creates a new timer `Handle` from a shared `Inner` timer state.
pub(super) fn new(inner: Arc<super::Inner>) -> Self {
let time_source = inner.state.lock().time_source.clone();
Handle { time_source, inner }
}

/// Returns the time source associated with this handle.
pub(crate) fn time_source(&self) -> &TimeSource {
&self.time_source
}

/// Access the driver's inner structure.
pub(super) fn get(&self) -> &super::Inner {
&*self.inner
}

/// Checks whether the driver has been shutdown.
pub(super) fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
Expand Down
94 changes: 41 additions & 53 deletions tokio/src/runtime/time/mod.rs
Expand Up @@ -19,7 +19,7 @@ pub(crate) use source::TimeSource;
mod wheel;

use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Arc, Mutex};
use crate::loom::sync::Mutex;
use crate::runtime::driver::{self, IoHandle, IoStack};
use crate::time::error::Error;
use crate::time::{Clock, Duration};
Expand Down Expand Up @@ -84,20 +84,8 @@ use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
/// [interval]: crate::time::Interval
#[derive(Debug)]
pub(crate) struct Driver {
/// Timing backend in use.
time_source: TimeSource,

/// Parker to delegate to.
park: IoStack,

// When `true`, a call to `park_timeout` should immediately return and time
// should not advance. One reason for this to be `true` is if the task
// passed to `Runtime::block_on` called `task::yield_now()`.
//
// While it may look racy, it only has any effect when the clock is paused
// and pausing the clock is restricted to a single-threaded runtime.
#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
}

/// Timer state shared between `Driver`, `Handle`, and `Registration`.
Expand All @@ -108,15 +96,18 @@ struct Inner {
/// True if the driver is being shutdown.
pub(super) is_shutdown: AtomicBool,

// When `true`, a call to `park_timeout` should immediately return and time
// should not advance. One reason for this to be `true` is if the task
// passed to `Runtime::block_on` called `task::yield_now()`.
//
// While it may look racy, it only has any effect when the clock is paused
// and pausing the clock is restricted to a single-threaded runtime.
#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
did_wake: AtomicBool,
}

/// Time state shared which must be protected by a `Mutex`
struct InnerState {
/// Timing backend in use.
time_source: TimeSource,

/// The last published timer `elapsed` value.
elapsed: u64,

Expand All @@ -137,31 +128,23 @@ impl Driver {
pub(crate) fn new(park: IoStack, clock: Clock) -> (Driver, Handle) {
let time_source = TimeSource::new(clock);

#[cfg(feature = "test-util")]
let did_wake = Arc::new(AtomicBool::new(false));

let inner = Arc::new(Inner {
state: Mutex::new(InnerState {
time_source: time_source.clone(),
elapsed: 0,
next_wake: None,
wheel: wheel::Wheel::new(),
}),
is_shutdown: AtomicBool::new(false),

#[cfg(feature = "test-util")]
did_wake: did_wake.clone(),
});

let handle = Handle::new(inner);

let driver = Driver {
let handle = Handle {
time_source,
park,
#[cfg(feature = "test-util")]
did_wake,
inner: Inner {
state: Mutex::new(InnerState {
elapsed: 0,
next_wake: None,
wheel: wheel::Wheel::new(),
}),
is_shutdown: AtomicBool::new(false),

#[cfg(feature = "test-util")]
did_wake: AtomicBool::new(false),
},
};

let driver = Driver { park };

(driver, handle)
}

Expand All @@ -180,7 +163,7 @@ impl Driver {
return;
}

handle.get().is_shutdown.store(true, Ordering::SeqCst);
handle.inner.is_shutdown.store(true, Ordering::SeqCst);

// Advance time forward to the end of time.

Expand All @@ -191,7 +174,7 @@ impl Driver {

fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
let handle = rt_handle.time();
let mut lock = handle.get().state.lock();
let mut lock = handle.inner.state.lock();

assert!(!handle.is_shutdown());

Expand All @@ -203,11 +186,13 @@ impl Driver {

match next_wake {
Some(when) => {
let now = self.time_source.now();
let now = handle.time_source.now();
// Note that we effectively round up to 1ms here - this avoids
// very short-duration microsecond-resolution sleeps that the OS
// might treat as zero-length.
let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now));
let mut duration = handle
.time_source
.tick_to_duration(when.saturating_sub(now));

if duration > Duration::from_millis(0) {
if let Some(limit) = limit {
Expand All @@ -234,7 +219,8 @@ impl Driver {

cfg_test_util! {
fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let clock = &self.time_source.clock;
let handle = rt_handle.time();
let clock = &handle.time_source.clock;

if clock.is_paused() {
self.park.park_timeout(rt_handle, Duration::from_secs(0));
Expand All @@ -243,18 +229,14 @@ impl Driver {
// before the "duration" elapsed (usually caused by a
// yield in `Runtime::block_on`). In this case, we don't
// advance the clock.
if !self.did_wake() {
if !handle.did_wake() {
// Simulate advancing time
clock.advance(duration);
}
} else {
self.park.park_timeout(rt_handle, duration);
}
}

fn did_wake(&self) -> bool {
self.did_wake.swap(false, Ordering::SeqCst)
}
}

cfg_not_test_util! {
Expand All @@ -276,7 +258,7 @@ impl Handle {
let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;

let mut lock = self.get().lock();
let mut lock = self.inner.lock();

if now < lock.elapsed {
// Time went backwards! This normally shouldn't happen as the Rust language
Expand Down Expand Up @@ -307,7 +289,7 @@ impl Handle {

waker_idx = 0;

lock = self.get().lock();
lock = self.inner.lock();
}
}
}
Expand Down Expand Up @@ -338,7 +320,7 @@ impl Handle {
/// `add_entry` must not be called concurrently.
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
unsafe {
let mut lock = self.get().lock();
let mut lock = self.inner.lock();

if entry.as_ref().might_be_registered() {
lock.wheel.remove(entry);
Expand All @@ -361,7 +343,7 @@ impl Handle {
entry: NonNull<TimerShared>,
) {
let waker = unsafe {
let mut lock = self.get().lock();
let mut lock = self.inner.lock();

// We may have raced with a firing/deregistration, so check before
// deregistering.
Expand Down Expand Up @@ -408,6 +390,12 @@ impl Handle {
waker.wake();
}
}

cfg_test_util! {
fn did_wake(&self) -> bool {
self.inner.did_wake.swap(false, Ordering::SeqCst)
}
}
}

// ===== impl Inner =====
Expand Down

0 comments on commit 76e2def

Please sign in to comment.