From fe4dcdeca53c88f799d00b249214d6c8dd72fe4f Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Thu, 27 Aug 2020 16:21:33 +0000 Subject: [PATCH] Remove feature flag requirement for `CachedThreadParker` --- tokio/src/park/mod.rs | 6 +- tokio/src/park/thread.rs | 170 ++++++++++++++++++------------------- tokio/src/runtime/enter.rs | 46 +++++----- 3 files changed, 108 insertions(+), 114 deletions(-) diff --git a/tokio/src/park/mod.rs b/tokio/src/park/mod.rs index 2cfef8c2dd8..a3404de85c7 100644 --- a/tokio/src/park/mod.rs +++ b/tokio/src/park/mod.rs @@ -42,9 +42,9 @@ cfg_resource_drivers! { mod thread; pub(crate) use self::thread::ParkThread; -cfg_block_on! { - pub(crate) use self::thread::{CachedParkThread, ParkError}; -} +// cfg_block_on! { +pub(crate) use self::thread::{CachedParkThread, ParkError}; +// } use std::sync::Arc; use std::time::Duration; diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs index 44174d3519f..9ed41310fa5 100644 --- a/tokio/src/park/thread.rs +++ b/tokio/src/park/thread.rs @@ -212,118 +212,114 @@ impl Unpark for UnparkThread { } } -cfg_block_on! { - use std::marker::PhantomData; - use std::rc::Rc; +use std::marker::PhantomData; +use std::rc::Rc; - use std::mem; - use std::task::{RawWaker, RawWakerVTable, Waker}; +use std::mem; +use std::task::{RawWaker, RawWakerVTable, Waker}; - /// Blocks the current thread using a condition variable. - #[derive(Debug)] - pub(crate) struct CachedParkThread { - _anchor: PhantomData>, - } - - impl CachedParkThread { - /// Create a new `ParkThread` handle for the current thread. - /// - /// This type cannot be moved to other threads, so it should be created on - /// the thread that the caller intends to park. - pub(crate) fn new() -> CachedParkThread { - CachedParkThread { - _anchor: PhantomData, - } - } - - pub(crate) fn get_unpark(&self) -> Result { - self.with_current(|park_thread| park_thread.unpark()) - } +/// Blocks the current thread using a condition variable. +#[derive(Debug)] +pub(crate) struct CachedParkThread { + _anchor: PhantomData>, +} - /// Get a reference to the `ParkThread` handle for this thread. - fn with_current(&self, f: F) -> Result - where - F: FnOnce(&ParkThread) -> R, - { - CURRENT_PARKER.try_with(|inner| f(inner)) - .map_err(|_| ()) +impl CachedParkThread { + /// Create a new `ParkThread` handle for the current thread. + /// + /// This type cannot be moved to other threads, so it should be created on + /// the thread that the caller intends to park. + pub(crate) fn new() -> CachedParkThread { + CachedParkThread { + _anchor: PhantomData, } } - impl Park for CachedParkThread { - type Unpark = UnparkThread; - type Error = ParkError; + pub(crate) fn get_unpark(&self) -> Result { + self.with_current(|park_thread| park_thread.unpark()) + } - fn unpark(&self) -> Self::Unpark { - self.get_unpark().unwrap() - } + /// Get a reference to the `ParkThread` handle for this thread. + fn with_current(&self, f: F) -> Result + where + F: FnOnce(&ParkThread) -> R, + { + CURRENT_PARKER.try_with(|inner| f(inner)).map_err(|_| ()) + } +} - fn park(&mut self) -> Result<(), Self::Error> { - self.with_current(|park_thread| park_thread.inner.park())?; - Ok(()) - } +impl Park for CachedParkThread { + type Unpark = UnparkThread; + type Error = ParkError; - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?; - Ok(()) - } + fn unpark(&self) -> Self::Unpark { + self.get_unpark().unwrap() + } - fn shutdown(&mut self) { - let _ = self.with_current(|park_thread| park_thread.inner.shutdown()); - } + fn park(&mut self) -> Result<(), Self::Error> { + self.with_current(|park_thread| park_thread.inner.park())?; + Ok(()) } + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?; + Ok(()) + } - impl UnparkThread { - pub(crate) fn into_waker(self) -> Waker { - unsafe { - let raw = unparker_to_raw_waker(self.inner); - Waker::from_raw(raw) - } - } + fn shutdown(&mut self) { + let _ = self.with_current(|park_thread| park_thread.inner.shutdown()); } +} - impl Inner { - #[allow(clippy::wrong_self_convention)] - fn into_raw(this: Arc) -> *const () { - Arc::into_raw(this) as *const () +impl UnparkThread { + pub(crate) fn into_waker(self) -> Waker { + unsafe { + let raw = unparker_to_raw_waker(self.inner); + Waker::from_raw(raw) } + } +} - unsafe fn from_raw(ptr: *const ()) -> Arc { - Arc::from_raw(ptr as *const Inner) - } +impl Inner { + #[allow(clippy::wrong_self_convention)] + fn into_raw(this: Arc) -> *const () { + Arc::into_raw(this) as *const () } - unsafe fn unparker_to_raw_waker(unparker: Arc) -> RawWaker { - RawWaker::new( - Inner::into_raw(unparker), - &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker), - ) + unsafe fn from_raw(ptr: *const ()) -> Arc { + Arc::from_raw(ptr as *const Inner) } +} - unsafe fn clone(raw: *const ()) -> RawWaker { - let unparker = Inner::from_raw(raw); +unsafe fn unparker_to_raw_waker(unparker: Arc) -> RawWaker { + RawWaker::new( + Inner::into_raw(unparker), + &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker), + ) +} - // Increment the ref count - mem::forget(unparker.clone()); +unsafe fn clone(raw: *const ()) -> RawWaker { + let unparker = Inner::from_raw(raw); - unparker_to_raw_waker(unparker) - } + // Increment the ref count + mem::forget(unparker.clone()); - unsafe fn drop_waker(raw: *const ()) { - let _ = Inner::from_raw(raw); - } + unparker_to_raw_waker(unparker) +} - unsafe fn wake(raw: *const ()) { - let unparker = Inner::from_raw(raw); - unparker.unpark(); - } +unsafe fn drop_waker(raw: *const ()) { + let _ = Inner::from_raw(raw); +} - unsafe fn wake_by_ref(raw: *const ()) { - let unparker = Inner::from_raw(raw); - unparker.unpark(); +unsafe fn wake(raw: *const ()) { + let unparker = Inner::from_raw(raw); + unparker.unpark(); +} - // We don't actually own a reference to the unparker - mem::forget(unparker); - } +unsafe fn wake_by_ref(raw: *const ()) { + let unparker = Inner::from_raw(raw); + unparker.unpark(); + + // We don't actually own a reference to the unparker + mem::forget(unparker); } diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index 56a7c57b6c6..bb6e6be012b 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -138,31 +138,29 @@ cfg_rt_threaded! { } } -cfg_block_on! { - impl Enter { - /// Blocks the thread on the specified future, returning the value with - /// which that future completes. - pub(crate) fn block_on(&mut self, f: F) -> Result - where - F: std::future::Future, - { - use crate::park::{CachedParkThread, Park}; - use std::task::Context; - use std::task::Poll::Ready; - - let mut park = CachedParkThread::new(); - let waker = park.get_unpark()?.into_waker(); - let mut cx = Context::from_waker(&waker); - - pin!(f); - - loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { - return Ok(v); - } - - park.park()?; +impl Enter { + /// Blocks the thread on the specified future, returning the value with + /// which that future completes. + pub(crate) fn block_on(&mut self, f: F) -> Result + where + F: std::future::Future, + { + use crate::park::{CachedParkThread, Park}; + use std::task::Context; + use std::task::Poll::Ready; + + let mut park = CachedParkThread::new(); + let waker = park.get_unpark()?.into_waker(); + let mut cx = Context::from_waker(&waker); + + pin!(f); + + loop { + if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + return Ok(v); } + + park.park()?; } } }