Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: rm internal Unpark types for Handle types #5027

Merged
merged 1 commit into from Sep 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 0 additions & 5 deletions tokio/src/process/unix/driver.rs
Expand Up @@ -3,7 +3,6 @@
//! Process driver.

use crate::process::unix::GlobalOrphanQueue;
use crate::runtime::io::Handle;
use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};

use std::time::Duration;
Expand All @@ -28,10 +27,6 @@ impl Driver {
}
}

pub(crate) fn unpark(&self) -> Handle {
self.park.unpark()
}

pub(crate) fn park(&mut self) {
self.park.park();
GlobalOrphanQueue::reap_orphans(&self.signal_handle);
Expand Down
73 changes: 28 additions & 45 deletions tokio/src/runtime/driver.rs
Expand Up @@ -38,8 +38,6 @@ pub(crate) struct Cfg {
pub(crate) start_paused: bool,
}

pub(crate) type Unpark = TimerUnpark;

impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;
Expand All @@ -60,10 +58,6 @@ impl Driver {
))
}

pub(crate) fn unpark(&self) -> TimerUnpark {
self.inner.unpark()
}

pub(crate) fn park(&mut self) {
self.inner.park()
}
Expand All @@ -77,11 +71,21 @@ impl Driver {
}
}

impl Handle {
pub(crate) fn unpark(&self) {
#[cfg(feature = "time")]
if let Some(handle) = &self.time {
handle.unpark();
}

self.io.unpark();
}
}

// ===== io driver =====

cfg_io_driver! {
pub(crate) type IoDriver = crate::runtime::io::Driver;
pub(crate) type IoHandle = IoUnpark;

#[derive(Debug)]
pub(crate) enum IoStack {
Expand All @@ -90,7 +94,7 @@ cfg_io_driver! {
}

#[derive(Debug, Clone)]
pub(crate) enum IoUnpark {
pub(crate) enum IoHandle {
Enabled(crate::runtime::io::Handle),
Disabled(UnparkThread),
}
Expand All @@ -106,23 +110,25 @@ cfg_io_driver! {
let (signal_driver, signal_handle) = create_signal_driver(io_driver)?;
let process_driver = create_process_driver(signal_driver);

(IoStack::Enabled(process_driver), IoUnpark::Enabled(io_handle), signal_handle)
(IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), signal_handle)
} else {
let park_thread = ParkThread::new();
let unpark_thread = park_thread.unpark();
(IoStack::Disabled(park_thread), IoUnpark::Disabled(unpark_thread), Default::default())
(IoStack::Disabled(park_thread), IoHandle::Disabled(unpark_thread), Default::default())
};

Ok(ret)
}

impl IoStack {
pub(crate) fn unpark(&self) -> IoUnpark {
/*
pub(crate) fn handle(&self) -> IoHandle {
match self {
IoStack::Enabled(v) => IoUnpark::Enabled(v.unpark()),
IoStack::Disabled(v) => IoUnpark::Disabled(v.unpark()),
IoStack::Enabled(v) => IoHandle::Enabled(v.handle()),
IoStack::Disabled(v) => IoHandle::Disabled(v.unpark()),
}
}
}]
*/

pub(crate) fn park(&mut self) {
match self {
Expand All @@ -146,37 +152,36 @@ cfg_io_driver! {
}
}

impl IoUnpark {
impl IoHandle {
pub(crate) fn unpark(&self) {
match self {
IoUnpark::Enabled(v) => v.unpark(),
IoUnpark::Disabled(v) => v.unpark(),
IoHandle::Enabled(handle) => handle.unpark(),
IoHandle::Disabled(handle) => handle.unpark(),
}
}

#[track_caller]
pub(crate) fn expect(self, msg: &'static str) -> crate::runtime::io::Handle {
match self {
IoUnpark::Enabled(v) => v,
IoUnpark::Disabled(..) => panic!("{}", msg),
IoHandle::Enabled(v) => v,
IoHandle::Disabled(..) => panic!("{}", msg),
}
}

cfg_unstable! {
pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> {
match self {
IoUnpark::Enabled(v) => Some(v),
IoUnpark::Disabled(..) => None,
IoHandle::Enabled(v) => Some(v),
IoHandle::Disabled(..) => None,
}
}
}
}
}

cfg_not_io_driver! {
pub(crate) type IoHandle = IoUnpark;
pub(crate) type IoHandle = UnparkThread;
pub(crate) type IoStack = ParkThread;
pub(crate) type IoUnpark = UnparkThread;

fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
let park_thread = ParkThread::new();
Expand Down Expand Up @@ -249,11 +254,6 @@ cfg_time! {
Disabled(IoStack),
}

pub(crate) enum TimerUnpark {
Enabled(crate::runtime::time::TimerUnpark),
Disabled(IoUnpark),
}

pub(crate) type Clock = crate::time::Clock;
pub(crate) type TimeHandle = Option<crate::runtime::time::Handle>;

Expand All @@ -276,13 +276,6 @@ cfg_time! {
}

impl TimeDriver {
pub(crate) fn unpark(&self) -> TimerUnpark {
match self {
TimeDriver::Enabled { driver, .. } => TimerUnpark::Enabled(driver.unpark()),
TimeDriver::Disabled(v) => TimerUnpark::Disabled(v.unpark()),
}
}

pub(crate) fn park(&mut self) {
match self {
TimeDriver::Enabled { driver, handle } => driver.park(handle),
Expand All @@ -304,20 +297,10 @@ cfg_time! {
}
}
}

impl TimerUnpark {
pub(crate) fn unpark(&self) {
match self {
TimerUnpark::Enabled(v) => v.unpark(),
TimerUnpark::Disabled(v) => v.unpark(),
}
}
}
}

cfg_not_time! {
type TimeDriver = IoStack;
type TimerUnpark = IoUnpark;

pub(crate) type Clock = ();
pub(crate) type TimeHandle = ();
Expand Down
5 changes: 0 additions & 5 deletions tokio/src/runtime/io/mod.rs
Expand Up @@ -144,11 +144,6 @@ impl Driver {
}
}

// TODO: remove this in a later refactor
pub(crate) fn unpark(&self) -> Handle {
self.handle()
}

pub(crate) fn park(&mut self) {
self.turn(None);
}
Expand Down
12 changes: 3 additions & 9 deletions tokio/src/runtime/scheduler/current_thread.rs
Expand Up @@ -2,7 +2,7 @@ use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::{self, Driver, Unpark};
use crate::runtime::driver::{self, Driver};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::{blocking, Config};
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
Expand Down Expand Up @@ -82,9 +82,6 @@ struct Shared {
/// Collection of all active tasks spawned onto this executor.
owned: OwnedTasks<Arc<Handle>>,

/// Unpark the blocked thread.
unpark: Unpark,

/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,

Expand Down Expand Up @@ -122,13 +119,10 @@ impl CurrentThread {
seed_generator: RngSeedGenerator,
config: Config,
) -> CurrentThread {
let unpark = driver.unpark();

let handle = Arc::new(Handle {
shared: Shared {
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
owned: OwnedTasks::new(),
unpark,
woken: AtomicBool::new(false),
config,
scheduler_metrics: SchedulerMetrics::new(),
Expand Down Expand Up @@ -467,7 +461,7 @@ impl Schedule for Arc<Handle> {
if let Some(queue) = guard.as_mut() {
queue.push_back(task);
drop(guard);
self.shared.unpark.unpark();
self.driver.unpark();
}
}
});
Expand Down Expand Up @@ -511,7 +505,7 @@ impl Wake for Handle {
/// Wake by reference
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.shared.woken.store(true, Release);
arc_self.shared.unpark.unpark();
arc_self.driver.unpark();
}
}

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Expand Up @@ -35,7 +35,7 @@ impl Handle {
}

pub(crate) fn shutdown(&self) {
self.shared.close();
self.close();
}

pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
Expand All @@ -46,7 +46,7 @@ impl Handle {
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);

if let Some(notified) = notified {
me.shared.schedule(notified, false);
me.schedule_task(notified, false);
}

handle
Expand Down
2 changes: 0 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Expand Up @@ -39,13 +39,11 @@ impl MultiThread {
seed_generator: RngSeedGenerator,
config: Config,
) -> (MultiThread, Launch) {
let driver_unpark = driver.unpark();
let parker = Parker::new(driver);
let (handle, launch) = worker::create(
size,
parker,
driver_handle,
driver_unpark,
blocking_spawner,
seed_generator,
config,
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/park.rs
Expand Up @@ -96,7 +96,7 @@ impl Clone for Parker {
}

impl Unparker {
pub(crate) fn unpark(&self, driver: &driver::Unpark) {
pub(crate) fn unpark(&self, driver: &driver::Handle) {
self.inner.unpark(driver);
}
}
Expand Down Expand Up @@ -195,7 +195,7 @@ impl Inner {
}
}

fn unpark(&self, driver: &driver::Unpark) {
fn unpark(&self, driver: &driver::Handle) {
// To ensure the unparked thread will observe any writes we made before
// this call, we must perform a release operation that `park` can
// synchronize with. To do that we must write `NOTIFIED` even if `state`
Expand Down