Skip to content

Commit

Permalink
Remove feature flag requirement for CachedThreadParker
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioFranco committed Aug 27, 2020
1 parent 099da4c commit fe4dcde
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 114 deletions.
6 changes: 3 additions & 3 deletions tokio/src/park/mod.rs
Expand Up @@ -42,9 +42,9 @@ cfg_resource_drivers! {
mod thread;
pub(crate) use self::thread::ParkThread;

cfg_block_on! {
pub(crate) use self::thread::{CachedParkThread, ParkError};
}
// cfg_block_on! {
pub(crate) use self::thread::{CachedParkThread, ParkError};
// }

use std::sync::Arc;
use std::time::Duration;
Expand Down
170 changes: 83 additions & 87 deletions tokio/src/park/thread.rs
Expand Up @@ -212,118 +212,114 @@ impl Unpark for UnparkThread {
}
}

cfg_block_on! {
use std::marker::PhantomData;
use std::rc::Rc;
use std::marker::PhantomData;
use std::rc::Rc;

use std::mem;
use std::task::{RawWaker, RawWakerVTable, Waker};
use std::mem;
use std::task::{RawWaker, RawWakerVTable, Waker};

/// Blocks the current thread using a condition variable.
#[derive(Debug)]
pub(crate) struct CachedParkThread {
_anchor: PhantomData<Rc<()>>,
}

impl CachedParkThread {
/// Create a new `ParkThread` handle for the current thread.
///
/// This type cannot be moved to other threads, so it should be created on
/// the thread that the caller intends to park.
pub(crate) fn new() -> CachedParkThread {
CachedParkThread {
_anchor: PhantomData,
}
}

pub(crate) fn get_unpark(&self) -> Result<UnparkThread, ParkError> {
self.with_current(|park_thread| park_thread.unpark())
}
/// Blocks the current thread using a condition variable.
#[derive(Debug)]
pub(crate) struct CachedParkThread {
_anchor: PhantomData<Rc<()>>,
}

/// Get a reference to the `ParkThread` handle for this thread.
fn with_current<F, R>(&self, f: F) -> Result<R, ParkError>
where
F: FnOnce(&ParkThread) -> R,
{
CURRENT_PARKER.try_with(|inner| f(inner))
.map_err(|_| ())
impl CachedParkThread {
/// Create a new `ParkThread` handle for the current thread.
///
/// This type cannot be moved to other threads, so it should be created on
/// the thread that the caller intends to park.
pub(crate) fn new() -> CachedParkThread {
CachedParkThread {
_anchor: PhantomData,
}
}

impl Park for CachedParkThread {
type Unpark = UnparkThread;
type Error = ParkError;
pub(crate) fn get_unpark(&self) -> Result<UnparkThread, ParkError> {
self.with_current(|park_thread| park_thread.unpark())
}

fn unpark(&self) -> Self::Unpark {
self.get_unpark().unwrap()
}
/// Get a reference to the `ParkThread` handle for this thread.
fn with_current<F, R>(&self, f: F) -> Result<R, ParkError>
where
F: FnOnce(&ParkThread) -> R,
{
CURRENT_PARKER.try_with(|inner| f(inner)).map_err(|_| ())
}
}

fn park(&mut self) -> Result<(), Self::Error> {
self.with_current(|park_thread| park_thread.inner.park())?;
Ok(())
}
impl Park for CachedParkThread {
type Unpark = UnparkThread;
type Error = ParkError;

fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
Ok(())
}
fn unpark(&self) -> Self::Unpark {
self.get_unpark().unwrap()
}

fn shutdown(&mut self) {
let _ = self.with_current(|park_thread| park_thread.inner.shutdown());
}
fn park(&mut self) -> Result<(), Self::Error> {
self.with_current(|park_thread| park_thread.inner.park())?;
Ok(())
}

fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
Ok(())
}

impl UnparkThread {
pub(crate) fn into_waker(self) -> Waker {
unsafe {
let raw = unparker_to_raw_waker(self.inner);
Waker::from_raw(raw)
}
}
fn shutdown(&mut self) {
let _ = self.with_current(|park_thread| park_thread.inner.shutdown());
}
}

impl Inner {
#[allow(clippy::wrong_self_convention)]
fn into_raw(this: Arc<Inner>) -> *const () {
Arc::into_raw(this) as *const ()
impl UnparkThread {
pub(crate) fn into_waker(self) -> Waker {
unsafe {
let raw = unparker_to_raw_waker(self.inner);
Waker::from_raw(raw)
}
}
}

unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
Arc::from_raw(ptr as *const Inner)
}
impl Inner {
#[allow(clippy::wrong_self_convention)]
fn into_raw(this: Arc<Inner>) -> *const () {
Arc::into_raw(this) as *const ()
}

unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
RawWaker::new(
Inner::into_raw(unparker),
&RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
)
unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
Arc::from_raw(ptr as *const Inner)
}
}

unsafe fn clone(raw: *const ()) -> RawWaker {
let unparker = Inner::from_raw(raw);
unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
RawWaker::new(
Inner::into_raw(unparker),
&RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
)
}

// Increment the ref count
mem::forget(unparker.clone());
unsafe fn clone(raw: *const ()) -> RawWaker {
let unparker = Inner::from_raw(raw);

unparker_to_raw_waker(unparker)
}
// Increment the ref count
mem::forget(unparker.clone());

unsafe fn drop_waker(raw: *const ()) {
let _ = Inner::from_raw(raw);
}
unparker_to_raw_waker(unparker)
}

unsafe fn wake(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();
}
unsafe fn drop_waker(raw: *const ()) {
let _ = Inner::from_raw(raw);
}

unsafe fn wake_by_ref(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();
unsafe fn wake(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();
}

// We don't actually own a reference to the unparker
mem::forget(unparker);
}
unsafe fn wake_by_ref(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();

// We don't actually own a reference to the unparker
mem::forget(unparker);
}
46 changes: 22 additions & 24 deletions tokio/src/runtime/enter.rs
Expand Up @@ -138,31 +138,29 @@ cfg_rt_threaded! {
}
}

cfg_block_on! {
impl Enter {
/// Blocks the thread on the specified future, returning the value with
/// which that future completes.
pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, crate::park::ParkError>
where
F: std::future::Future,
{
use crate::park::{CachedParkThread, Park};
use std::task::Context;
use std::task::Poll::Ready;

let mut park = CachedParkThread::new();
let waker = park.get_unpark()?.into_waker();
let mut cx = Context::from_waker(&waker);

pin!(f);

loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}

park.park()?;
impl Enter {
/// Blocks the thread on the specified future, returning the value with
/// which that future completes.
pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, crate::park::ParkError>
where
F: std::future::Future,
{
use crate::park::{CachedParkThread, Park};
use std::task::Context;
use std::task::Poll::Ready;

let mut park = CachedParkThread::new();
let waker = park.get_unpark()?.into_waker();
let mut cx = Context::from_waker(&waker);

pin!(f);

loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}

park.park()?;
}
}
}
Expand Down

0 comments on commit fe4dcde

Please sign in to comment.