Skip to content

Commit

Permalink
preemption: add yield points to some leaf futures
Browse files Browse the repository at this point in the history
  • Loading branch information
jonhoo committed Jan 23, 2020
1 parent 8a6ea07 commit bf7ede7
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 0 deletions.
6 changes: 6 additions & 0 deletions tokio/src/io/registration.rs
Expand Up @@ -139,6 +139,9 @@ impl Registration {
///
/// This function will panic if called from outside of a task context.
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
// Keep track of task budget
ready!(crate::league::poll_cooperate(cx));

let v = self.poll_ready(Direction::Read, Some(cx))?;
match v {
Some(v) => Poll::Ready(Ok(v)),
Expand Down Expand Up @@ -190,6 +193,9 @@ impl Registration {
///
/// This function will panic if called from outside of a task context.
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
// Keep track of task budget
ready!(crate::league::poll_cooperate(cx));

let v = self.poll_ready(Direction::Write, Some(cx))?;
match v {
Some(v) => Poll::Ready(Ok(v)),
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/process/mod.rs
Expand Up @@ -700,6 +700,9 @@ where
type Output = Result<T, E>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Keep track of task budget
ready!(crate::league::poll_cooperate(cx));

let ret = Pin::new(&mut self.inner).poll(cx);

if let Poll::Ready(Ok(_)) = ret {
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/sync/mpsc/chan.rs
Expand Up @@ -265,6 +265,9 @@ where
pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
use super::block::Read::*;

// Keep track of task budget
ready!(crate::league::poll_cooperate(cx));

self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };

Expand Down
6 changes: 6 additions & 0 deletions tokio/src/sync/oneshot.rs
Expand Up @@ -196,6 +196,9 @@ impl<T> Sender<T> {

#[doc(hidden)] // TODO: remove
pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// Keep track of task budget
ready!(crate::league::poll_cooperate(cx));

let inner = self.inner.as_ref().unwrap();

let mut state = State::load(&inner.state, Acquire);
Expand Down Expand Up @@ -544,6 +547,9 @@ impl<T> Inner<T> {
}

fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
// Keep track of task budget
ready!(crate::league::poll_cooperate(cx));

// Load the state
let mut state = State::load(&self.state, Acquire);

Expand Down
6 changes: 6 additions & 0 deletions tokio/src/sync/semaphore_ll.rs
Expand Up @@ -185,6 +185,9 @@ impl Semaphore {
num_permits: u16,
permit: &mut Permit,
) -> Poll<Result<(), AcquireError>> {
// Keep track of task budget
ready!(crate::league::poll_cooperate(cx));

self.poll_acquire2(num_permits, || {
let waiter = permit.waiter.get_or_insert_with(|| Box::new(Waiter::new()));

Expand Down Expand Up @@ -630,6 +633,9 @@ impl Permit {

match self.state {
Waiting(requested) => {
// Keep track of task budget
ready!(crate::league::poll_cooperate(cx));

// There must be a waiter
let waiter = self.waiter.as_ref().unwrap();

Expand Down
3 changes: 3 additions & 0 deletions tokio/src/task/join.rs
Expand Up @@ -102,6 +102,9 @@ impl<T> Future for JoinHandle<T> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use std::mem::MaybeUninit;

// Keep track of task budget
ready!(crate::league::poll_cooperate(cx));

// Raw should always be set
let raw = self.raw.as_ref().unwrap();

Expand Down
3 changes: 3 additions & 0 deletions tokio/src/time/driver/registration.rs
Expand Up @@ -39,6 +39,9 @@ impl Registration {
}

pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
// Keep track of task budget
ready!(crate::league::poll_cooperate(cx));

self.entry.poll_elapsed(cx)
}
}
Expand Down

0 comments on commit bf7ede7

Please sign in to comment.